123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497 |
- #coding=utf-8
- # evaluate为该方法的入口函数,必须用这个名字
- from odps.udf import annotate
- from odps.distcache import get_cache_archive
- from odps.distcache import get_cache_file
- from odps.udf import BaseUDTF
- # 配置pandas依赖包
- def include_package_path(res_name):
- import os, sys
- archive_files = get_cache_archive(res_name)
- dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
- if '.dist_info' not in f.name], key=lambda v: len(v))
- sys.path.append(dir_names[0])
- return os.path.dirname(dir_names[0])
- # 可能出现类似RuntimeError: xxx has been blocked by sandbox
- # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
- def include_file(file_name):
- import os, sys
- so_file = get_cache_file(file_name)
- sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
- def include_so(file_name):
- import os, sys
- so_file = get_cache_file(file_name)
- with open(so_file.name, 'rb') as fp:
- content=fp.read()
- so = open(file_name, "wb")
- so.write(content)
- so.flush()
- so.close()
- #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
- def init_env(list_files,package_name):
- import os,sys
- if len(list_files)==1:
- so_file = get_cache_file(list_files[0])
- cmd_line = os.path.abspath(so_file.name)
- os.system("unzip -o %s -d %s"%(cmd_line,package_name))
- elif len(list_files)>1:
- cmd_line = "cat"
- for _file in list_files:
- so_file = get_cache_file(_file)
- cmd_line += " "+os.path.abspath(so_file.name)
- cmd_line += " > temp.zip"
- os.system(cmd_line)
- os.system("unzip -o temp.zip -d %s"%(package_name))
- # os.system("rm -rf %s/*.dist-info"%(package_name))
- # return os.listdir(os.path.abspath("local_package"))
- # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
- # os.system("source ~/.bashrc")
- sys.path.insert(0,os.path.abspath(package_name))
- # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
- # UDF主程序
- # 由于Series可能在多处调用,所以先在__init__中将其定义为全局类。
- @annotate("string->string")
- class JiebaCut(object):
- def __init__(self):
- # zip_01 = include_package_path('testB01.zip')
- # zip_02 = include_package_path('testB02.zip')
- # self.cat_cmd = "cat %s %s > %s"%(zip_01+"/files/*",zip_02+"/files/*","testH.zip")
- # import os
- import sys
- # os.system(self.cat_cmd)
- # self.out = str(os.path.getsize("testH.zip"))
- # # self.out = str(os.listdir(zip_01+"/files/testB01/"))
- # os.system("mkdir jieba_t")
- # os.system("unzip testH.zip -d jieba_t")
- # self.out = str(os.listdir("jieba_t"))
- # sys.path.append(".")
- # # sys.path.append(os.path.dirname(os.path.normpath("jieba_test")))
- # # import jieba_test
- # from jieba_t import cut
- include_package_path("jiebaA.zip")
- import jieba
- reload(sys)
- sys.setdefaultencoding('utf-8')
- global jieba
- def evaluate(self, x):
- import os
- # return self.out
- # return str(os.listdir("jieba_test"))
- # return self.cat_cmd
- return '--'.join(jieba.cut(x))
- @annotate("string->string")
- class Preprocess(BaseUDTF):
- def __init__(self):
- # init_env(["gensim_package.zip.env"],"local_package1")
- import sys
- import uuid
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = include_package_path("envs_py37.env.zip")
- # self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- import BiddingKG.dl.entityLink.entityLink as entityLink
- import BiddingKG.dl.interface.getAttributes as getAttributes
- global Preprocessing,entityLink,predictor,uuid,getAttributes
- # import gensim
- # include_package_path("numpy.zip")
- # init_env(["tensorflow-1.14.0-cp37-cp37m-manylinux1_x86_64.whl"])
- # so_file = get_cache_file("tensorflow-1.14.0-cp37-cp37m-manylinux1_x86_64.whl")
- # import os
- # self.out = os.path.abspath(so_file.name)
- # import tensorflow
- def process(self, x):
- k = str(uuid.uuid4())
- list_articles = Preprocessing.get_preprocessed_articles([[k,x,"","_doc_id",""]],useselffool=True)
- self.forward(list_articles[0].toJson())
- # list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_preprocessed([[k,x,"","_doc_id",""]],useselffool=True)
- #
- # codeName = predictor.getPredictor("codeName").predict(list_sentences,MAX_AREA=2000,list_entitys=list_entitys)
- #
- # predictor.getPredictor("prem").predict(list_sentences,list_entitys)
- #
- # predictor.getPredictor("roleRule").predict(list_articles,list_sentences, list_entitys,codeName)
- #
- # predictor.getPredictor("epc").predict(list_sentences,list_entitys)
- #
- # entityLink.link_entitys(list_entitys)
- #
- # prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
- # # return str(self.out)
- # return "1"
- # list_articles,list_sentences,list_entitys,_ = Preprocessing.get_articles_processed([["doc_id",x,"","",""]],useselffool=True)
- # if len(list_articles)==1:
- # json_article = list_articles[0]
- # self.forward(list_sentences[0][0].sentence_text)
- @annotate("string -> string,string")
- class Preprocess_article(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import uuid
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- import BiddingKG.dl.entityLink.entityLink as entityLink
- import BiddingKG.dl.interface.getAttributes as getAttributes
- global Preprocessing,entityLink,predictor,uuid,getAttributes
- def process(self, x):
- if x is not None:
- k = str(uuid.uuid4())
- list_articles = Preprocessing.get_preprocessed_article([[k,x,"","_doc_id",""]])
- self.forward(list_articles[0].id,list_articles[0].toJson())
- @annotate("string->string,string")
- class Preprocess_sentences(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- import BiddingKG.dl.interface.Entitys as Entitys
- import json
- global Preprocessing,Entitys,json
- def process(self,x):
- _article = Entitys.Article.fromJson(x)
- list_sentences = Preprocessing.get_preprocessed_sentences([_article],True)
- list_out = []
- for _sentence in list_sentences[0]:
- list_out.append(_sentence.toJson())
- self.forward(_article.id,json.dumps(list_out))
- @annotate("string->string,string")
- class Preprocess_entitys(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- import BiddingKG.dl.interface.Entitys as Entitys
- import json
- global Preprocessing,Entitys,json
- def process(self,x):
- list_sentence = []
- for _x in json.loads(x):
- list_sentence.append(Entitys.Sentences.fromJson(_x))
- list_out = []
- list_entitys = Preprocessing.get_preprocessed_entitys([list_sentence],True)
- for _entity in list_entitys[0]:
- list_out.append(_entity.toJson())
- self.forward(list_sentence[0].doc_id,json.dumps(list_out))
- @annotate("string->string,string")
- class Predict_codename(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Entitys as Entitys
- import json
- global predictor,Entitys,json
- def process(self,x):
- list_sentence = []
- for _x in json.loads(x):
- list_sentence.append(Entitys.Sentences.fromJson(_x))
- codename = predictor.getPredictor("codeName").predict([list_sentence],MAX_AREA=2000)
- self.forward(codename[0][0],json.dumps(codename[0]))
- @annotate("string,string->string,string")
- class Predict_role(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Entitys as Entitys
- import json
- global predictor,Entitys,json
- def process(self,x,y):
- list_sentence = []
- list_entity = []
- for _x in json.loads(x):
- list_sentence.append(Entitys.Sentences.fromJson(_x))
- for _y in json.loads(y):
- list_entity.append(Entitys.Entity.fromJson(_y))
- predictor.getPredictor("prem").predict_role([list_sentence],[list_entity])
- list_out = []
- for _entity in list_entity:
- if _entity.label is not None:
- list_out.append(_entity.toJson())
- self.forward(list_sentence[0].doc_id,json.dumps(list_out))
- @annotate("string,string->string,string")
- class Predict_money(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Entitys as Entitys
- import json
- global predictor,Entitys,json
- def process(self,x,y):
- list_sentence = []
- list_entity = []
- for _x in json.loads(x):
- list_sentence.append(Entitys.Sentences.fromJson(_x))
- for _y in json.loads(y):
- list_entity.append(Entitys.Entity.fromJson(_y))
- predictor.getPredictor("prem").predict_money([list_sentence],[list_entity])
- list_out = []
- for _entity in list_entity:
- if _entity.label is not None:
- list_out.append(_entity.toJson())
- self.forward(list_sentence[0].doc_id,json.dumps(list_out))
- @annotate("string,string->string,string")
- class Predict_person(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Entitys as Entitys
- import json
- global predictor,Entitys,json
- def process(self,x,y):
- list_sentence = []
- list_entity = []
- for _x in json.loads(x):
- list_sentence.append(Entitys.Sentences.fromJson(_x))
- for _y in json.loads(y):
- list_entity.append(Entitys.Entity.fromJson(_y))
- predictor.getPredictor("epc").predict_person([list_sentence],[list_entity])
- list_out = []
- for _entity in list_entity:
- if _entity.label is not None:
- list_out.append(_entity.toJson())
- self.forward(list_sentence[0].doc_id,json.dumps(list_out))
- @annotate("string,string,string,string,string,string,string->string,string,string")
- class ContentUnion(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- self.out = init_env(["BiddingKG.zip.env"],"local_package")
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- self.out = init_env(["envs_py37.zip.env"],"local_package")
- self.out = init_env(["so.env"],".")
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Entitys as Entitys
- import BiddingKG.dl.interface.getAttributes as getAttributes
- import BiddingKG.dl.entityLink.entityLink as entityLink
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- import json
- global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing
- #自定义jsonEncoder
- class MyEncoder(json.JSONEncoder):
- def __init__(self):
- import numpy as np
- global np
- def default(self, obj):
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- elif isinstance(obj, bytes):
- return str(obj, encoding='utf-8')
- elif isinstance(obj, (np.float_, np.float16, np.float32,
- np.float64)):
- return float(obj)
- return json.JSONEncoder.default(self, obj)
- def process(self,json_article,list_json_sentence,list_json_entity,list_json_entity_role,list_json_entity_money,list_json_entity_person,json_codename):
- dict_entity = {}
- list_sentence = []
- list_entity = []
- _article = Entitys.Article.fromJson(json_article)
- for list_json in [list_json_entity_role,list_json_entity_money,list_json_entity_person]:
- for _json_entity in json.loads(list_json):
- _entity = Entitys.Entity.fromJson(_json_entity)
- _key = "%s-%s-%s"%(str(_entity.doc_id),str(_entity.entity_id),str(_entity.entity_type))
- dict_entity[_key] = _entity
- for _json_sentence in json.loads(list_json_sentence):
- list_sentence.append(Entitys.Sentences.fromJson(_json_sentence))
- for _json_entity in json.loads(list_json_entity):
- _entity = Entitys.Entity.fromJson(_json_entity)
- _key = "%s-%s-%s"%(str(_entity.doc_id),str(_entity.entity_id),str(_entity.entity_type))
- if _key in dict_entity:
- list_entity.append(dict_entity[_key])
- else:
- list_entity.append(_entity)
- codeName = json.loads(json_codename)
- predictor.getPredictor("roleRule").predict([_article],[list_sentence], [list_entity],[codeName])
- entityLink.link_entitys([list_entity])
- prem = getAttributes.getPREMs([list_sentence],[list_entity],[_article])
- # result = json.dumps(Preprocessing.union_result([codeName], prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False)
- result = json.dumps(Preprocessing.union_result([codeName], prem)[0][1],ensure_ascii=False)
- self.forward(_article.id,_article.doc_id,result)
- @annotate("string,bigint,string,string->string,bigint,string")
- class Extract(BaseUDTF):
- def __init__(self):
- # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
- import uuid
- global uuid
- import logging
- import datetime
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logging.info("time1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- self.out = init_env(["BiddingKG.zip.env"],str(uuid.uuid4()))
- logging.info("time2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
- logging.info("time3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4()))
- self.out = include_package_path("envs_py37.env.zip")
- logging.info("time4"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- self.out = init_env(["so.env"],".")
- logging.info("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- import BiddingKG.dl.interface.predictor as predictor
- logging.info("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- import BiddingKG.dl.interface.Entitys as Entitys
- logging.info("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- import BiddingKG.dl.interface.getAttributes as getAttributes
- logging.info("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- import BiddingKG.dl.entityLink.entityLink as entityLink
- logging.info("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- logging.info("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- import json
- import time
- from BiddingKG.dl.common.Utils import log
- logging.info("time7"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
- import numpy as np
- global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing,time,log,MyEncoder,np
- class MyEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- elif isinstance(obj, bytes):
- return str(obj, encoding='utf-8')
- elif isinstance(obj, (np.float_, np.float16, np.float32,
- np.float64)):
- return float(obj)
- elif isinstance(obj,(np.int64)):
- return int(obj)
- return json.JSONEncoder.default(self, obj)
- def process(self,content,_doc_id,_title,page_time):
- if content is not None and _doc_id not in [105677700,126694044,126795572,126951461]:
- k = str(uuid.uuid4())
- cost_time = dict()
- start_time = time.time()
- log("start process doc %s"%(str(_doc_id)))
- try:
- list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_preprocessed([[k,content,"",str(_doc_id),str(_title)]],useselffool=True)
- log("get preprocessed done of doc_id%s"%(_doc_id))
- cost_time["preprocess"] = time.time()-start_time
- cost_time.update(_cost_time)
- '''
- for articles in list_articles:
- print(articles.content)
-
- '''
- start_time = time.time()
- codeName = predictor.getPredictor("codeName").predict(list_sentences,MAX_AREA=2000,list_entitys=list_entitys)
- log("get codename done of doc_id%s"%(_doc_id))
- cost_time["codename"] = time.time()-start_time
- start_time = time.time()
- predictor.getPredictor("prem").predict(list_sentences,list_entitys)
- log("get prem done of doc_id%s"%(_doc_id))
- cost_time["prem"] = time.time()-start_time
- start_time = time.time()
- predictor.getPredictor("roleRule").predict(list_articles,list_sentences, list_entitys,codeName)
- cost_time["rule"] = time.time()-start_time
- start_time = time.time()
- predictor.getPredictor("epc").predict(list_sentences,list_entitys)
- log("get epc done of doc_id%s"%(_doc_id))
- cost_time["person"] = time.time()-start_time
- start_time = time.time()
- entityLink.link_entitys(list_entitys)
- '''
- for list_entity in list_entitys:
- for _entity in list_entity:
- for _ent in _entity.linked_entitys:
- print(_entity.entity_text,_ent.entity_text)
- '''
- prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
- log("get attributes done of doc_id%s"%(_doc_id))
- cost_time["attrs"] = time.time()-start_time
- #print(prem)
- data_res = Preprocessing.union_result(codeName, prem)[0][1]
- data_res["cost_time"] = cost_time
- data_res["success"] = True
- _article = list_articles[0]
- self.forward(page_time,int(_article.doc_id),json.dumps(data_res,cls=MyEncoder,ensure_ascii=False))
- except Exception as e:
- log("%s===error docid:%s"%(str(e),str(_doc_id)))
|