#coding=utf-8 # evaluate为该方法的入口函数,必须用这个名字 from odps.udf import annotate from odps.distcache import get_cache_archive from odps.distcache import get_cache_file from odps.udf import BaseUDTF import threading import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import time from multiprocessing import Process,Queue def log(msg): logging.info(msg) # 配置pandas依赖包 def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) _path = dir_names[0].split(".zip/files")[0]+".zip/files" log("add path:%s"%(_path)) sys.path.append(_path) return _path # 可能出现类似RuntimeError: xxx has been blocked by sandbox # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true def include_file(file_name): import os, sys so_file = get_cache_file(file_name) sys.path.append(os.path.dirname(os.path.abspath(so_file.name))) def include_so(file_name): import os, sys so_file = get_cache_file(file_name) with open(so_file.name, 'rb') as fp: content=fp.read() so = open(file_name, "wb") so.write(content) so.flush() so.close() #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入 def init_env(list_files,package_name): import os,sys if len(list_files)==1: so_file = get_cache_file(list_files[0]) cmd_line = os.path.abspath(so_file.name) os.system("unzip -o %s -d %s"%(cmd_line,package_name)) elif len(list_files)>1: cmd_line = "cat" for _file in list_files: so_file = get_cache_file(_file) cmd_line += " "+os.path.abspath(so_file.name) cmd_line += " > temp.zip" os.system(cmd_line) os.system("unzip -o temp.zip -d %s"%(package_name)) # os.system("rm -rf %s/*.dist-info"%(package_name)) # return os.listdir(os.path.abspath("local_package")) # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package"))) # os.system("source ~/.bashrc") sys.path.insert(0,os.path.abspath(package_name)) # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real")) def multiLoadEnv(): def load_project(): start_time = time.time() ## init_env(["BiddingKG.zip.env.baseline"],str(uuid.uuid4())) # init_env(["BiddingKG.zip.env.backup"],str(uuid.uuid4())) #改为zip引入 log("=======") # include_package_path("BiddingKG.baseline.zip") include_package_path("BiddingKG.backup.zip") logging.info("init biddingkg.zip.env.line cost %d"%(time.time()-start_time)) def load_vector(): start_time = time.time() # init_env(["wiki_128_word_embedding_new.vector.env"],".") include_package_path("wiki.zip") logging.info("init wiki_128_word_embedding_new cost %d"%(time.time()-start_time)) start_time = time.time() # init_env(["enterprise.zip.env"],".") # init_env(["LEGAL_ENTERPRISE.zip.env"],".") include_package_path("enterprise.zip") logging.info("init legal_enterprise.zip.env cost %d"%(time.time()-start_time)) start_time = time.time() init_env(["so.env"],".") logging.info("init so.env cost %d"%(time.time()-start_time)) def load_py(): start_time = time.time() # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4())) include_package_path("envs_py37.env.zip") # include_package_path("envs_py35.zip") logging.info("init envs_py cost %d"%(time.time()-start_time)) load_project() load_vector() load_py() @annotate("string,bigint,string,string->string,bigint,string") class Extract(BaseUDTF): def f_queue_process(self,task_queue,result_queue): log("start import predict function") from BiddingKG.dl.interface.extract import predict as predict log("import done") while True: try: item = task_queue.get(True,timeout=10) result_json = predict(item.get("docid",""),item.get("content",""),item.get("title",""),item.get("page_time","")) result_queue.put(result_json) except: log("get data time out") pass def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") import uuid global uuid import logging import datetime import time global time logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') multiLoadEnv() # import BiddingKG.dl.common.nerUtils # log("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) # import BiddingKG.dl.interface.predictor as predictor # log("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) # import BiddingKG.dl.interface.Entitys as Entitys # log("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) # import BiddingKG.dl.interface.getAttributes as getAttributes # log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) # import BiddingKG.dl.entityLink.entityLink as entityLink # log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) # import BiddingKG.dl.interface.Preprocessing as Preprocessing # log("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) # log("start import predict function") # from BiddingKG.dl.interface.extract import predict as predict # log("import done") import json self.task_queue = Queue() self.result_queue = Queue() self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue)) self.deal_process.start() import numpy as np global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing,MyEncoder,np,predict class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) elif isinstance(obj,(np.int64)): return int(obj) return json.JSONEncoder.default(self, obj) def process(self,content,_doc_id,_title,page_time): # #直接处理 # if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]: # result_json = predict(str(_doc_id),content,str(_title)) # self.forward(page_time,int(_doc_id),result_json) if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]: #清除队列中的数据 try: while(self.task_queue.qsize()>0): self.task_queue.get(timeout=5) except Exception as e: pass try: while(self.result_queue.qsize()>0): self.result_queue.get(timeout=5) except Exception as e: pass _item = {"docid":_doc_id,"content":content,"title":_title,"page_time":page_time} try: _timeout = 60*4 if not self.deal_process.is_alive(): log("deal process is down") self.task_queue = Queue() self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue)) self.deal_process.start() _timeout += 60*4 log("putting item to task_queue with docid:%s"%(str(_doc_id))) self.task_queue.put(_item) result_json = self.result_queue.get(timeout=_timeout) self.forward(page_time,int(_doc_id),result_json) except Exception as e: log("dealing docid %s failed by timeout"%(str(_doc_id))) self.deal_process.kill()