123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- #coding:utf8
- from odps.udf import annotate
- from odps.distcache import get_cache_archive
- from odps.distcache import get_cache_file
- from odps.udf import BaseUDTF,BaseUDAF
- import threading
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- import time
- import os
- def log(msg):
- logging.info(msg)
- # 配置pandas依赖包
- def include_package_path(res_name):
- import os, sys
- archive_files = get_cache_archive(res_name)
- dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
- if '.dist_info' not in f.name], key=lambda v: len(v))
- log("add path:%s"%(dir_names[0]))
- sys.path.append(dir_names[0])
- return os.path.dirname(dir_names[0])
- # 可能出现类似RuntimeError: xxx has been blocked by sandbox
- # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
- def include_file(file_name):
- import os, sys
- so_file = get_cache_file(file_name)
- sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
- def include_so(file_name):
- import os, sys
- so_file = get_cache_file(file_name)
- with open(so_file.name, 'rb') as fp:
- content=fp.read()
- so = open(file_name, "wb")
- so.write(content)
- so.flush()
- so.close()
- #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
- def init_env(list_files,package_name):
- import os,sys
- if len(list_files)==1:
- so_file = get_cache_file(list_files[0])
- cmd_line = os.path.abspath(so_file.name)
- os.system("unzip -o %s -d %s"%(cmd_line,package_name))
- elif len(list_files)>1:
- cmd_line = "cat"
- for _file in list_files:
- so_file = get_cache_file(_file)
- cmd_line += " "+os.path.abspath(so_file.name)
- cmd_line += " > temp.zip"
- os.system(cmd_line)
- os.system("unzip -o temp.zip -d %s"%(package_name))
- # os.system("rm -rf %s/*.dist-info"%(package_name))
- # return os.listdir(os.path.abspath("local_package"))
- # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
- # os.system("source ~/.bashrc")
- sys.path.insert(0,os.path.abspath(package_name))
- # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
- import platform
- def getPlatform():
- return platform.platform()
- @annotate('->string')
- class f_getPlatform(object):
- def evaluate(self):
- return getPlatform()
- @annotate('string->string,string,bigint')
- class f_strip_filemd5(BaseUDTF):
- def process(self,filemd5):
- split_filemd5 = filemd5.split("-")
- filemd5_strip = split_filemd5[0]
- if len(split_filemd5)==1:
- parts = 0
- else:
- parts = int(split_filemd5[1])
- self.forward(filemd5,filemd5_strip,parts)
- @annotate('string,bigint->string')
- class f_group_filemd5(BaseUDAF):
- def __init__(self):
- import json
- global json
- def new_buffer(self):
- return [[]]
- def iterate(self, buffer,filemd5,part):
- buffer[0].append([filemd5,part])
- def merge(self, buffer, pbuffer):
- buffer[0].extend(pbuffer[0])
- def terminate(self, buffer):
- list_group = buffer[0]
- list_group.sort(key=lambda x:x[1])
- list_filemd5 = []
- for item in list_group:
- list_filemd5.append(item[0])
- return json.dumps(list_filemd5)
- @annotate('string->string,string,string,string,string,string,string,string,string')
- class f_split_filemd5(BaseUDTF):
- def __init__(self):
- import json
- from uuid import uuid4
- global json,uuid4
- def process(self,filemd5s):
- list_filemd5 = json.loads(filemd5s)
- list_result = [uuid4().hex[:19] for i in range(max(9,len(list_filemd5)))]
- logging.info(str(list_filemd5))
- for i in range(len(list_filemd5)):
- list_result[i] = list_filemd5[i]
- self.forward(list_result[0],list_result[1],list_result[2],list_result[3],list_result[4],
- list_result[5],list_result[6],list_result[7],list_result[8])
- def downloadFile(bucket,objectPath,localPath):
- try:
- start_time = time.time()
- # bucket.get_object_to_file(objectPath, localPath)
- oss2.resumable_download(bucket, objectPath, localPath,
- store=oss2.ResumableDownloadStore(root="/home/admin"),
- multiget_threshold=200*1024,
- part_size=200*1024,
- num_threads=5)
- log("download %s takes %d"%(objectPath,time.time()-start_time))
- return True
- except Exception as e:
- log("download object failed of %s"%str(objectPath))
- return False
- @annotate('->string')
- class f_test_download(BaseUDTF):
- def __init__(self):
- include_package_path("oss_env.zip")
- import json
- from uuid import uuid4
- import logging
- import oss2
- global json,uuid4,oss2
- self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com"
- self.attachment_bucket_name = "attachment-hub"
- self.auth = oss2.Auth("LTAI4FyUT7ZcQFZPjVtw5y9b", "2zscfFTvy3JWavtCeCOthLxF8bDNH3")
- self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name)
- def process(self):
- downloadFile(self.bucket,"049c/20210701/2021-07-01/03755/1625135745231.zip","/home/admin/1.pdf")
- @annotate('string->string')
- class f_test_exit(BaseUDTF):
- def __init__(self):
- import json
- from uuid import uuid4
- import logging
- def process(self,s):
- for i in range(3):
- time.sleep(10)
- log("jump heart")
- self.forward("1")
- @annotate('bigint->string')
- class f_getRandomStr(object):
- def __init__(self):
- import random
- global random
- self.result_s = ""
- def evaluate(self,count):
- if self.result_s=="":
- list_c = [chr(ord('a')+i) for i in range(26)]
- result_s = ""
- for i in range(count):
- index = random.randint(0,len(list_c)-1)
- result_s += list_c[index]
- self.result_s = result_s
- for i in range(count//200):
- index = random.randint(0,len(self.result_s)-1)
- index_1 = random.randint(0,len(self.result_s)-1)
- self.result_s = self.result_s[:index]+self.result_s[index_1:index_1+1]+self.result_s[index+1:]
- return self.result_s
- @annotate('string->string')
- class f_extract_pageAttachments(BaseUDTF):
- def __init__(self):
- include_package_path("envs_py37.env.zip")
- import json
- from uuid import uuid4
- from bs4 import BeautifulSoup
- import logging
- global json,BeautifulSoup
- def process(self,_html):
- if _html is not None:
- page_attachments = self.extract_pageAttachments(_html)
- if len(page_attachments)>0:
- self.forward(json.dumps(page_attachments,ensure_ascii=False))
- def extract_pageAttachments(self,_html):
- fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"]
- _soup = BeautifulSoup(_html,"lxml")
- list_a = _soup.find_all("a")
- list_img = _soup.find_all("img")
- page_attachments = []
- for _a in list_a:
- _text =_a.get_text()
- _url = _a.attrs.get("href","")
- if _url.find("http://www.bidizhaobiao.com")>=0:
- continue
- is_attach = False
- for suf in fileSuffix:
- if _text.find(suf)>=0 or _url.find(suf)>=0:
- is_attach = True
- if is_attach:
- page_attachments.append({"fileLink":_url,"fileTitle":_text})
- for _a in list_img:
- _text =_a.get_text()
- _url = _a.attrs.get("src","")
- if _url.find("http://www.bidizhaobiao.com")>=0:
- continue
- is_attach = False
- for suf in fileSuffix:
- if _text.find(suf)>=0 or _url.find(suf)>=0:
- is_attach = True
- if is_attach:
- page_attachments.append({"fileLink":_url,"fileTitle":_text})
- return page_attachments
|