#coding=utf-8 # evaluate为该方法的入口函数,必须用这个名字 from odps.udf import annotate from odps.distcache import get_cache_archive from odps.distcache import get_cache_file from odps.udf import BaseUDTF # 配置pandas依赖包 def include_package_path(res_name): import os, sys archive_files = get_cache_archive(res_name) dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files if '.dist_info' not in f.name], key=lambda v: len(v)) sys.path.append(dir_names[0]) return os.path.dirname(dir_names[0]) # 可能出现类似RuntimeError: xxx has been blocked by sandbox # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true def include_file(file_name): import os, sys so_file = get_cache_file(file_name) sys.path.append(os.path.dirname(os.path.abspath(so_file.name))) def include_so(file_name): import os, sys so_file = get_cache_file(file_name) with open(so_file.name, 'rb') as fp: content=fp.read() so = open(file_name, "wb") so.write(content) so.flush() so.close() #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入 def init_env(list_files,package_name): import os,sys if len(list_files)==1: so_file = get_cache_file(list_files[0]) cmd_line = os.path.abspath(so_file.name) os.system("unzip -o %s -d %s"%(cmd_line,package_name)) elif len(list_files)>1: cmd_line = "cat" for _file in list_files: so_file = get_cache_file(_file) cmd_line += " "+os.path.abspath(so_file.name) cmd_line += " > temp.zip" os.system(cmd_line) os.system("unzip -o temp.zip -d %s"%(package_name)) # os.system("rm -rf %s/*.dist-info"%(package_name)) # return os.listdir(os.path.abspath("local_package")) # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package"))) # os.system("source ~/.bashrc") sys.path.insert(0,os.path.abspath(package_name)) # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real")) # UDF主程序 # 由于Series可能在多处调用,所以先在__init__中将其定义为全局类。 @annotate("string->string") class JiebaCut(object): def __init__(self): # zip_01 = include_package_path('testB01.zip') # zip_02 = include_package_path('testB02.zip') # self.cat_cmd = "cat %s %s > %s"%(zip_01+"/files/*",zip_02+"/files/*","testH.zip") # import os import sys # os.system(self.cat_cmd) # self.out = str(os.path.getsize("testH.zip")) # # self.out = str(os.listdir(zip_01+"/files/testB01/")) # os.system("mkdir jieba_t") # os.system("unzip testH.zip -d jieba_t") # self.out = str(os.listdir("jieba_t")) # sys.path.append(".") # # sys.path.append(os.path.dirname(os.path.normpath("jieba_test"))) # # import jieba_test # from jieba_t import cut include_package_path("jiebaA.zip") import jieba reload(sys) sys.setdefaultencoding('utf-8') global jieba def evaluate(self, x): import os # return self.out # return str(os.listdir("jieba_test")) # return self.cat_cmd return '--'.join(jieba.cut(x)) @annotate("string->string") class Preprocess(BaseUDTF): def __init__(self): # init_env(["gensim_package.zip.env"],"local_package1") import sys import uuid self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = include_package_path("envs_py37.env.zip") # self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Preprocessing as Preprocessing import BiddingKG.dl.entityLink.entityLink as entityLink import BiddingKG.dl.interface.getAttributes as getAttributes global Preprocessing,entityLink,predictor,uuid,getAttributes # import gensim # include_package_path("numpy.zip") # init_env(["tensorflow-1.14.0-cp37-cp37m-manylinux1_x86_64.whl"]) # so_file = get_cache_file("tensorflow-1.14.0-cp37-cp37m-manylinux1_x86_64.whl") # import os # self.out = os.path.abspath(so_file.name) # import tensorflow def process(self, x): k = str(uuid.uuid4()) list_articles = Preprocessing.get_preprocessed_articles([[k,x,"","_doc_id",""]],useselffool=True) self.forward(list_articles[0].toJson()) # list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_preprocessed([[k,x,"","_doc_id",""]],useselffool=True) # # codeName = predictor.getPredictor("codeName").predict(list_sentences,MAX_AREA=2000,list_entitys=list_entitys) # # predictor.getPredictor("prem").predict(list_sentences,list_entitys) # # predictor.getPredictor("roleRule").predict(list_articles,list_sentences, list_entitys,codeName) # # predictor.getPredictor("epc").predict(list_sentences,list_entitys) # # entityLink.link_entitys(list_entitys) # # prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles) # # return str(self.out) # return "1" # list_articles,list_sentences,list_entitys,_ = Preprocessing.get_articles_processed([["doc_id",x,"","",""]],useselffool=True) # if len(list_articles)==1: # json_article = list_articles[0] # self.forward(list_sentences[0][0].sentence_text) @annotate("string -> string,string") class Preprocess_article(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import uuid import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Preprocessing as Preprocessing import BiddingKG.dl.entityLink.entityLink as entityLink import BiddingKG.dl.interface.getAttributes as getAttributes global Preprocessing,entityLink,predictor,uuid,getAttributes def process(self, x): if x is not None: k = str(uuid.uuid4()) list_articles = Preprocessing.get_preprocessed_article([[k,x,"","_doc_id",""]]) self.forward(list_articles[0].id,list_articles[0].toJson()) @annotate("string->string,string") class Preprocess_sentences(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import BiddingKG.dl.interface.Preprocessing as Preprocessing import BiddingKG.dl.interface.Entitys as Entitys import json global Preprocessing,Entitys,json def process(self,x): _article = Entitys.Article.fromJson(x) list_sentences = Preprocessing.get_preprocessed_sentences([_article],True) list_out = [] for _sentence in list_sentences[0]: list_out.append(_sentence.toJson()) self.forward(_article.id,json.dumps(list_out)) @annotate("string->string,string") class Preprocess_entitys(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import BiddingKG.dl.interface.Preprocessing as Preprocessing import BiddingKG.dl.interface.Entitys as Entitys import json global Preprocessing,Entitys,json def process(self,x): list_sentence = [] for _x in json.loads(x): list_sentence.append(Entitys.Sentences.fromJson(_x)) list_out = [] list_entitys = Preprocessing.get_preprocessed_entitys([list_sentence],True) for _entity in list_entitys[0]: list_out.append(_entity.toJson()) self.forward(list_sentence[0].doc_id,json.dumps(list_out)) @annotate("string->string,string") class Predict_codename(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Entitys as Entitys import json global predictor,Entitys,json def process(self,x): list_sentence = [] for _x in json.loads(x): list_sentence.append(Entitys.Sentences.fromJson(_x)) codename = predictor.getPredictor("codeName").predict([list_sentence],MAX_AREA=2000) self.forward(codename[0][0],json.dumps(codename[0])) @annotate("string,string->string,string") class Predict_role(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Entitys as Entitys import json global predictor,Entitys,json def process(self,x,y): list_sentence = [] list_entity = [] for _x in json.loads(x): list_sentence.append(Entitys.Sentences.fromJson(_x)) for _y in json.loads(y): list_entity.append(Entitys.Entity.fromJson(_y)) predictor.getPredictor("prem").predict_role([list_sentence],[list_entity]) list_out = [] for _entity in list_entity: if _entity.label is not None: list_out.append(_entity.toJson()) self.forward(list_sentence[0].doc_id,json.dumps(list_out)) @annotate("string,string->string,string") class Predict_money(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Entitys as Entitys import json global predictor,Entitys,json def process(self,x,y): list_sentence = [] list_entity = [] for _x in json.loads(x): list_sentence.append(Entitys.Sentences.fromJson(_x)) for _y in json.loads(y): list_entity.append(Entitys.Entity.fromJson(_y)) predictor.getPredictor("prem").predict_money([list_sentence],[list_entity]) list_out = [] for _entity in list_entity: if _entity.label is not None: list_out.append(_entity.toJson()) self.forward(list_sentence[0].doc_id,json.dumps(list_out)) @annotate("string,string->string,string") class Predict_person(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Entitys as Entitys import json global predictor,Entitys,json def process(self,x,y): list_sentence = [] list_entity = [] for _x in json.loads(x): list_sentence.append(Entitys.Sentences.fromJson(_x)) for _y in json.loads(y): list_entity.append(Entitys.Entity.fromJson(_y)) predictor.getPredictor("epc").predict_person([list_sentence],[list_entity]) list_out = [] for _entity in list_entity: if _entity.label is not None: list_out.append(_entity.toJson()) self.forward(list_sentence[0].doc_id,json.dumps(list_out)) @annotate("string,string,string,string,string,string,string->string,string,string") class ContentUnion(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") self.out = init_env(["BiddingKG.zip.env"],"local_package") self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") self.out = init_env(["envs_py37.zip.env"],"local_package") self.out = init_env(["so.env"],".") import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Entitys as Entitys import BiddingKG.dl.interface.getAttributes as getAttributes import BiddingKG.dl.entityLink.entityLink as entityLink import BiddingKG.dl.interface.Preprocessing as Preprocessing import json global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing #自定义jsonEncoder class MyEncoder(json.JSONEncoder): def __init__(self): import numpy as np global np def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) return json.JSONEncoder.default(self, obj) def process(self,json_article,list_json_sentence,list_json_entity,list_json_entity_role,list_json_entity_money,list_json_entity_person,json_codename): dict_entity = {} list_sentence = [] list_entity = [] _article = Entitys.Article.fromJson(json_article) for list_json in [list_json_entity_role,list_json_entity_money,list_json_entity_person]: for _json_entity in json.loads(list_json): _entity = Entitys.Entity.fromJson(_json_entity) _key = "%s-%s-%s"%(str(_entity.doc_id),str(_entity.entity_id),str(_entity.entity_type)) dict_entity[_key] = _entity for _json_sentence in json.loads(list_json_sentence): list_sentence.append(Entitys.Sentences.fromJson(_json_sentence)) for _json_entity in json.loads(list_json_entity): _entity = Entitys.Entity.fromJson(_json_entity) _key = "%s-%s-%s"%(str(_entity.doc_id),str(_entity.entity_id),str(_entity.entity_type)) if _key in dict_entity: list_entity.append(dict_entity[_key]) else: list_entity.append(_entity) codeName = json.loads(json_codename) predictor.getPredictor("roleRule").predict([_article],[list_sentence], [list_entity],[codeName]) entityLink.link_entitys([list_entity]) prem = getAttributes.getPREMs([list_sentence],[list_entity],[_article]) # result = json.dumps(Preprocessing.union_result([codeName], prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False) result = json.dumps(Preprocessing.union_result([codeName], prem)[0][1],ensure_ascii=False) self.forward(_article.id,_article.doc_id,result) @annotate("string,bigint,string,string->string,bigint,string") class Extract(BaseUDTF): def __init__(self): # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package") import uuid global uuid import logging import datetime logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.info("time1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) self.out = init_env(["BiddingKG.zip.env"],str(uuid.uuid4())) logging.info("time2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".") logging.info("time3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4())) self.out = include_package_path("envs_py37.env.zip") logging.info("time4"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) self.out = init_env(["so.env"],".") logging.info("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.interface.predictor as predictor logging.info("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.interface.Entitys as Entitys logging.info("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.interface.getAttributes as getAttributes logging.info("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.entityLink.entityLink as entityLink logging.info("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import BiddingKG.dl.interface.Preprocessing as Preprocessing logging.info("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import json import time from BiddingKG.dl.common.Utils import log logging.info("time7"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S'))) import numpy as np global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing,time,log,MyEncoder,np class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) elif isinstance(obj,(np.int64)): return int(obj) return json.JSONEncoder.default(self, obj) def process(self,content,_doc_id,_title,page_time): if content is not None and _doc_id not in [105677700,126694044,126795572,126951461]: k = str(uuid.uuid4()) cost_time = dict() start_time = time.time() log("start process doc %s"%(str(_doc_id))) try: list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_preprocessed([[k,content,"",str(_doc_id),str(_title)]],useselffool=True) log("get preprocessed done of doc_id%s"%(_doc_id)) cost_time["preprocess"] = time.time()-start_time cost_time.update(_cost_time) ''' for articles in list_articles: print(articles.content) ''' start_time = time.time() codeName = predictor.getPredictor("codeName").predict(list_sentences,MAX_AREA=2000,list_entitys=list_entitys) log("get codename done of doc_id%s"%(_doc_id)) cost_time["codename"] = time.time()-start_time start_time = time.time() predictor.getPredictor("prem").predict(list_sentences,list_entitys) log("get prem done of doc_id%s"%(_doc_id)) cost_time["prem"] = time.time()-start_time start_time = time.time() predictor.getPredictor("roleRule").predict(list_articles,list_sentences, list_entitys,codeName) cost_time["rule"] = time.time()-start_time start_time = time.time() predictor.getPredictor("epc").predict(list_sentences,list_entitys) log("get epc done of doc_id%s"%(_doc_id)) cost_time["person"] = time.time()-start_time start_time = time.time() entityLink.link_entitys(list_entitys) ''' for list_entity in list_entitys: for _entity in list_entity: for _ent in _entity.linked_entitys: print(_entity.entity_text,_ent.entity_text) ''' prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles) log("get attributes done of doc_id%s"%(_doc_id)) cost_time["attrs"] = time.time()-start_time #print(prem) data_res = Preprocessing.union_result(codeName, prem)[0][1] data_res["cost_time"] = cost_time data_res["success"] = True _article = list_articles[0] self.forward(page_time,int(_article.doc_id),json.dumps(data_res,cls=MyEncoder,ensure_ascii=False)) except Exception as e: log("%s===error docid:%s"%(str(e),str(_doc_id)))