#coding:utf8 from odps.udf import annotate from odps.distcache import get_cache_archive from odps.distcache import get_cache_file from odps.udf import BaseUDTF,BaseUDAF import threading import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import time import os def log(msg): logging.info(msg) # 配置pandas依赖包 def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) log("add path:%s"%(dir_names[0])) sys.path.append(dir_names[0]) return os.path.dirname(dir_names[0]) # 可能出现类似RuntimeError: xxx has been blocked by sandbox # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true def include_file(file_name): import os, sys so_file = get_cache_file(file_name) sys.path.append(os.path.dirname(os.path.abspath(so_file.name))) def include_so(file_name): import os, sys so_file = get_cache_file(file_name) with open(so_file.name, 'rb') as fp: content=fp.read() so = open(file_name, "wb") so.write(content) so.flush() so.close() #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入 def init_env(list_files,package_name): import os,sys if len(list_files)==1: so_file = get_cache_file(list_files[0]) cmd_line = os.path.abspath(so_file.name) os.system("unzip -o %s -d %s"%(cmd_line,package_name)) elif len(list_files)>1: cmd_line = "cat" for _file in list_files: so_file = get_cache_file(_file) cmd_line += " "+os.path.abspath(so_file.name) cmd_line += " > temp.zip" os.system(cmd_line) os.system("unzip -o temp.zip -d %s"%(package_name)) # os.system("rm -rf %s/*.dist-info"%(package_name)) # return os.listdir(os.path.abspath("local_package")) # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package"))) # os.system("source ~/.bashrc") sys.path.insert(0,os.path.abspath(package_name)) # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real")) import platform def getPlatform(): return platform.platform() @annotate('->string') class f_getPlatform(object): def evaluate(self): return getPlatform() @annotate('string->string,string,bigint') class f_strip_filemd5(BaseUDTF): def process(self,filemd5): split_filemd5 = filemd5.split("-") filemd5_strip = split_filemd5[0] if len(split_filemd5)==1: parts = 0 else: parts = int(split_filemd5[1]) self.forward(filemd5,filemd5_strip,parts) @annotate('string,bigint->string') class f_group_filemd5(BaseUDAF): def __init__(self): import json global json def new_buffer(self): return [[]] def iterate(self, buffer,filemd5,part): buffer[0].append([filemd5,part]) def merge(self, buffer, pbuffer): buffer[0].extend(pbuffer[0]) def terminate(self, buffer): list_group = buffer[0] list_group.sort(key=lambda x:x[1]) list_filemd5 = [] for item in list_group: list_filemd5.append(item[0]) return json.dumps(list_filemd5) @annotate('string->string,string,string,string,string,string,string,string,string') class f_split_filemd5(BaseUDTF): def __init__(self): import json from uuid import uuid4 global json,uuid4 def process(self,filemd5s): list_filemd5 = json.loads(filemd5s) list_result = [uuid4().hex[:19] for i in range(max(9,len(list_filemd5)))] logging.info(str(list_filemd5)) for i in range(len(list_filemd5)): list_result[i] = list_filemd5[i] self.forward(list_result[0],list_result[1],list_result[2],list_result[3],list_result[4], list_result[5],list_result[6],list_result[7],list_result[8]) def downloadFile(bucket,objectPath,localPath): try: start_time = time.time() # bucket.get_object_to_file(objectPath, localPath) oss2.resumable_download(bucket, objectPath, localPath, store=oss2.ResumableDownloadStore(root="/home/admin"), multiget_threshold=200*1024, part_size=200*1024, num_threads=5) log("download %s takes %d"%(objectPath,time.time()-start_time)) return True except Exception as e: log("download object failed of %s"%str(objectPath)) return False @annotate('->string') class f_test_download(BaseUDTF): def __init__(self): include_package_path("oss_env.zip") import json from uuid import uuid4 import logging import oss2 global json,uuid4,oss2 self.bucket_url = "http://oss-cn-hangzhou-internal.aliyuncs.com" self.attachment_bucket_name = "attachment-hub" self.auth = oss2.Auth("LTAI4FyUT7ZcQFZPjVtw5y9b", "2zscfFTvy3JWavtCeCOthLxF8bDNH3") self.bucket = oss2.Bucket(self.auth,self.bucket_url,self.attachment_bucket_name) def process(self): downloadFile(self.bucket,"049c/20210701/2021-07-01/03755/1625135745231.zip","/home/admin/1.pdf") @annotate('string->string') class f_test_exit(BaseUDTF): def __init__(self): import json from uuid import uuid4 import logging def process(self,s): for i in range(3): time.sleep(10) log("jump heart") self.forward("1") @annotate('bigint->string') class f_getRandomStr(object): def __init__(self): import random global random def evaluate(self,count): result_s = "" for i in range(count): result_s+= "a" for i in range(5): index = random.randint(0,len(result_s)-1) result_s[index] = 'b' return result_s