#coding:utf8 from odps.udf import annotate from odps.distcache import get_cache_archive from odps.distcache import get_cache_file from odps.udf import BaseUDTF,BaseUDAF import threading import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import time import os def log(msg): logging.info(msg) # 配置pandas依赖包 def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) log("add path:%s"%(dir_names[0])) sys.path.append(dir_names[0]) return os.path.dirname(dir_names[0]) # 可能出现类似RuntimeError: xxx has been blocked by sandbox # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true def include_file(file_name): import os, sys so_file = get_cache_file(file_name) sys.path.append(os.path.dirname(os.path.abspath(so_file.name))) def include_so(file_name): import os, sys so_file = get_cache_file(file_name) with open(so_file.name, 'rb') as fp: content=fp.read() so = open(file_name, "wb") so.write(content) so.flush() so.close() #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入 def init_env(list_files,package_name): import os,sys if len(list_files)==1: so_file = get_cache_file(list_files[0]) cmd_line = os.path.abspath(so_file.name) os.system("unzip -o %s -d %s"%(cmd_line,package_name)) elif len(list_files)>1: cmd_line = "cat" for _file in list_files: so_file = get_cache_file(_file) cmd_line += " "+os.path.abspath(so_file.name) cmd_line += " > temp.zip" os.system(cmd_line) os.system("unzip -o temp.zip -d %s"%(package_name)) # os.system("rm -rf %s/*.dist-info"%(package_name)) # return os.listdir(os.path.abspath("local_package")) # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package"))) # os.system("source ~/.bashrc") sys.path.insert(0,os.path.abspath(package_name)) # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real")) import platform def getPlatform(): return platform.platform() @annotate('->string') class f_getPlatform(object): def evaluate(self): return getPlatform() @annotate('string->string,string,bigint') class f_strip_filemd5(BaseUDTF): def process(self,filemd5): split_filemd5 = filemd5.split("-") filemd5_strip = split_filemd5[0] if len(split_filemd5)==1: parts = 0 else: parts = int(split_filemd5[1]) self.forward(filemd5,filemd5_strip,parts) @annotate('string,bigint->string') class f_group_filemd5(BaseUDAF): def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,filemd5,part): buffer[0].append([filemd5,part]) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = buffer[0] list_group.sort(key=lambda x:x[1]) list_filemd5 = [] for item in list_group: list_filemd5.append(item[0]) return json.dumps(list_filemd5) @annotate('string->string,string,string,string,string,string,string,string,string') class f_split_filemd5(BaseUDTF): def __init__(self): import json from uuid import uuid4 global json,uuid4 def process(self,filemd5s): list_filemd5 = json.loads(filemd5s) list_result = [uuid4().hex[:19] for i in range(max(9,len(list_filemd5)))] logging.info(str(list_filemd5)) for i in range(len(list_filemd5)): list_result[i] = list_filemd5[i] self.forward(list_result[0],list_result[1],list_result[2],list_result[3],list_result[4], list_result[5],list_result[6],list_result[7],list_result[8]) def downloadFile(bucket,objectPath,localPath): try: start_time = time.time() # bucket.get_object_to_file(objectPath, localPath) oss2.resumable_download(bucket, objectPath, localPath, store=oss2.ResumableDownloadStore(root="/home/admin"), multiget_threshold=200*1024, part_size=200*1024, num_threads=5) log("download %s takes %d"%(objectPath,time.time()-start_time)) return True except Exception as e: log("download object failed of %s"%str(objectPath)) return False @annotate('->string') class f_test_download(BaseUDTF): def __init__(self): include_package_path("oss_env.zip") import json from uuid import uuid4 import logging import oss2 global json,uuid4,oss2 self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com" self.attachment_bucket_name = "attachment-hub" self.auth = oss2.Auth("LTAI4FyUT7ZcQFZPjVtw5y9b", "2zscfFTvy3JWavtCeCOthLxF8bDNH3") self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name) def process(self): downloadFile(self.bucket,"049c/20210701/2021-07-01/03755/1625135745231.zip","/home/admin/1.pdf") @annotate('string->string') class f_test_exit(BaseUDTF): def __init__(self): import json from uuid import uuid4 import logging def process(self,s): for i in range(3): time.sleep(10) log("jump heart") self.forward("1") @annotate('bigint->string') class f_getRandomStr(object): def __init__(self): import random global random self.result_s = "" def evaluate(self,count): if self.result_s=="": list_c = [chr(ord('a')+i) for i in range(26)] result_s = "" for i in range(count): index = random.randint(0,len(list_c)-1) result_s += list_c[index] self.result_s = result_s for i in range(count//200): index = random.randint(0,len(self.result_s)-1) index_1 = random.randint(0,len(self.result_s)-1) self.result_s = self.result_s[:index]+self.result_s[index_1:index_1+1]+self.result_s[index+1:] return self.result_s @annotate('string->string') class f_extract_pageAttachments(BaseUDTF): def __init__(self): include_package_path("envs_py37.env.zip") import json from uuid import uuid4 from bs4 import BeautifulSoup import logging global json,BeautifulSoup def process(self,_html): if _html is not None: page_attachments = self.extract_pageAttachments(_html) if len(page_attachments)>0: self.forward(json.dumps(page_attachments,ensure_ascii=False)) def extract_pageAttachments(self,_html): fileSuffix = [".zip", ".rar", ".tar", ".7z", ".wim", ".docx", ".doc", ".xlsx", ".xls", ".pdf", ".txt", ".hnzf", ".bmp", ".jpg", ".jpeg", ".png", ".tif", ".swf"] _soup = BeautifulSoup(_html,"lxml") list_a = _soup.find_all("a") list_img = _soup.find_all("img") page_attachments = [] for _a in list_a: _text =_a.get_text() _url = _a.attrs.get("href","") if _url.find("http://www.bidizhaobiao.com")>=0: continue is_attach = False for suf in fileSuffix: if _text.find(suf)>=0 or _url.find(suf)>=0: is_attach = True if is_attach: page_attachments.append({"fileLink":_url,"fileTitle":_text}) for _a in list_img: _text =_a.get_text() _url = _a.attrs.get("src","") if _url.find("http://www.bidizhaobiao.com")>=0: continue is_attach = False for suf in fileSuffix: if _text.find(suf)>=0 or _url.find(suf)>=0: is_attach = True if is_attach: page_attachments.append({"fileLink":_url,"fileTitle":_text}) return page_attachments