#coding=utf-8 # evaluate为该方法的入口函数,必须用这个名字 from odps.udf import annotate from odps.distcache import get_cache_archive from odps.distcache import get_cache_file from odps.udf import BaseUDTF import threading import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') import time def log(msg): logging.info(msg) # 配置pandas依赖包 def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) _path = dir_names[0].split(".zip/files")[0]+".zip/files" log("add path:%s"%(_path)) sys.path.append(_path) return _path # 可能出现类似RuntimeError: xxx has been blocked by sandbox # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true def include_file(file_name): import os, sys so_file = get_cache_file(file_name) sys.path.append(os.path.dirname(os.path.abspath(so_file.name))) def include_so(file_name): import os, sys so_file = get_cache_file(file_name) with open(so_file.name, 'rb') as fp: content=fp.read() so = open(file_name, "wb") so.write(content) so.flush() so.close() #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入 def init_env(list_files,package_name): import os,sys if len(list_files)==1: so_file = get_cache_file(list_files[0]) cmd_line = os.path.abspath(so_file.name) os.system("unzip -o %s -d %s"%(cmd_line,package_name)) elif len(list_files)>1: cmd_line = "cat" for _file in list_files: so_file = get_cache_file(_file) cmd_line += " "+os.path.abspath(so_file.name) cmd_line += " > temp.zip" os.system(cmd_line) os.system("unzip -o temp.zip -d %s"%(package_name)) # os.system("rm -rf %s/*.dist-info"%(package_name)) # return os.listdir(os.path.abspath("local_package")) # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package"))) # os.system("source ~/.bashrc") sys.path.insert(0,os.path.abspath(package_name)) # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real")) def multiLoadEnv(): def load_project(): start_time = time.time() ## init_env(["BiddingKG.zip.env.baseline"],str(uuid.uuid4())) # init_env(["BiddingKG.zip.env.backup"],str(uuid.uuid4())) #改为zip引入 log("=======") include_package_path("BiddingKG.baseline.zip") logging.info("init biddingkg.zip.env.line cost %d"%(time.time()-start_time)) def load_vector(): start_time = time.time() init_env(["wiki_128_word_embedding_new.vector.env"],".") logging.info("init wiki_128_word_embedding_new cost %d"%(time.time()-start_time)) start_time = time.time() init_env(["enterprise.zip.env"],".") # init_env(["LEGAL_ENTERPRISE.zip.env"],".") logging.info("init legal_enterprise.zip.env cost %d"%(time.time()-start_time)) start_time = time.time() init_env(["so.env"],".") logging.info("init so.env cost %d"%(time.time()-start_time)) def load_py(): start_time = time.time() # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4())) include_package_path("envs_py37.env.zip") logging.info("init envs_py37 cost %d"%(time.time()-start_time)) load_project() load_vector() load_py() @annotate("string,bigint,string,string->string,bigint,string") class Extract(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") import uuid global uuid import logging import datetime import time global time logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') multiLoadEnv() import BiddingKG.dl.common.nerUtils log("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.interface.predictor as predictor log("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.interface.Entitys as Entitys log("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.interface.getAttributes as getAttributes log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.entityLink.entityLink as entityLink log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.interface.Preprocessing as Preprocessing log("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) log("=======") time.sleep(5) from BiddingKG.dl.interface.extract import predict as predict log("=======import done") import json import numpy as np global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing,MyEncoder,np,predict class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) elif isinstance(obj,(np.int64)): return int(obj) return json.JSONEncoder.default(self, obj) def process(self,content,_doc_id,_title,page_time): if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]: result_json = predict(str(_doc_id),content,str(_title)) self.forward(page_time,int(_doc_id),result_json)