evaluates.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. #coding=utf-8
  2. # evaluate为该方法的入口函数,必须用这个名字
  3. from odps.udf import annotate
  4. from odps.distcache import get_cache_archive
  5. from odps.distcache import get_cache_file
  6. from odps.udf import BaseUDTF
  7. # 配置pandas依赖包
  8. def include_package_path(res_name):
  9. import os, sys
  10. archive_files = get_cache_archive(res_name)
  11. dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
  12. if '.dist_info' not in f.name], key=lambda v: len(v))
  13. sys.path.append(dir_names[0])
  14. return os.path.dirname(dir_names[0])
  15. # 可能出现类似RuntimeError: xxx has been blocked by sandbox
  16. # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
  17. def include_file(file_name):
  18. import os, sys
  19. so_file = get_cache_file(file_name)
  20. sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
  21. def include_so(file_name):
  22. import os, sys
  23. so_file = get_cache_file(file_name)
  24. with open(so_file.name, 'rb') as fp:
  25. content=fp.read()
  26. so = open(file_name, "wb")
  27. so.write(content)
  28. so.flush()
  29. so.close()
  30. #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
  31. def init_env(list_files,package_name):
  32. import os,sys
  33. if len(list_files)==1:
  34. so_file = get_cache_file(list_files[0])
  35. cmd_line = os.path.abspath(so_file.name)
  36. os.system("unzip -o %s -d %s"%(cmd_line,package_name))
  37. elif len(list_files)>1:
  38. cmd_line = "cat"
  39. for _file in list_files:
  40. so_file = get_cache_file(_file)
  41. cmd_line += " "+os.path.abspath(so_file.name)
  42. cmd_line += " > temp.zip"
  43. os.system(cmd_line)
  44. os.system("unzip -o temp.zip -d %s"%(package_name))
  45. # os.system("rm -rf %s/*.dist-info"%(package_name))
  46. # return os.listdir(os.path.abspath("local_package"))
  47. # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
  48. # os.system("source ~/.bashrc")
  49. sys.path.insert(0,os.path.abspath(package_name))
  50. # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
  51. @annotate("string,bigint,string,string->string,bigint,string")
  52. class Extract(BaseUDTF):
  53. def __init__(self):
  54. # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
  55. import uuid
  56. global uuid
  57. import logging
  58. import datetime
  59. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  60. logging.info("time1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  61. self.out = init_env(["BiddingKG.zip.env"],str(uuid.uuid4()))
  62. logging.info("time2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  63. self.out = init_env(["wiki_128_word_embedding_new.vector.env"],".")
  64. logging.info("time3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  65. # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4()))
  66. self.out = include_package_path("envs_py37.env.zip")
  67. logging.info("time4"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  68. self.out = init_env(["so.env"],".")
  69. logging.info("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  70. import BiddingKG.dl.interface.predictor as predictor
  71. logging.info("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  72. import BiddingKG.dl.interface.Entitys as Entitys
  73. logging.info("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  74. import BiddingKG.dl.interface.getAttributes as getAttributes
  75. logging.info("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  76. import BiddingKG.dl.entityLink.entityLink as entityLink
  77. logging.info("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  78. import BiddingKG.dl.interface.Preprocessing as Preprocessing
  79. logging.info("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  80. import json
  81. import time
  82. from BiddingKG.dl.common.Utils import log
  83. import numpy as np
  84. global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing,time,log,MyEncoder,np
  85. class MyEncoder(json.JSONEncoder):
  86. def default(self, obj):
  87. if isinstance(obj, np.ndarray):
  88. return obj.tolist()
  89. elif isinstance(obj, bytes):
  90. return str(obj, encoding='utf-8')
  91. elif isinstance(obj, (np.float_, np.float16, np.float32,
  92. np.float64)):
  93. return float(obj)
  94. elif isinstance(obj,(np.int64)):
  95. return int(obj)
  96. return json.JSONEncoder.default(self, obj)
  97. def process(self,content,_doc_id,_title,page_time):
  98. if content is not None and _doc_id not in [105677700,126694044,126795572,126951461]:
  99. k = str(uuid.uuid4())
  100. cost_time = dict()
  101. start_time = time.time()
  102. log("start process doc %s"%(str(_doc_id)))
  103. try:
  104. list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_preprocessed([[k,content,"",str(_doc_id),str(_title)]],useselffool=True)
  105. log("get preprocessed done of doc_id%s"%(_doc_id))
  106. cost_time["preprocess"] = time.time()-start_time
  107. cost_time.update(_cost_time)
  108. '''
  109. for articles in list_articles:
  110. print(articles.content)
  111. '''
  112. start_time = time.time()
  113. codeName = predictor.getPredictor("codeName").predict(list_sentences,MAX_AREA=2000,list_entitys=list_entitys)
  114. log("get codename done of doc_id%s"%(_doc_id))
  115. cost_time["codename"] = time.time()-start_time
  116. start_time = time.time()
  117. predictor.getPredictor("prem").predict(list_sentences,list_entitys)
  118. log("get prem done of doc_id%s"%(_doc_id))
  119. cost_time["prem"] = time.time()-start_time
  120. start_time = time.time()
  121. predictor.getPredictor("roleRule").predict(list_articles,list_sentences, list_entitys,codeName)
  122. cost_time["rule"] = time.time()-start_time
  123. start_time = time.time()
  124. predictor.getPredictor("epc").predict(list_sentences,list_entitys)
  125. log("get epc done of doc_id%s"%(_doc_id))
  126. cost_time["person"] = time.time()-start_time
  127. start_time = time.time()
  128. entityLink.link_entitys(list_entitys)
  129. '''
  130. for list_entity in list_entitys:
  131. for _entity in list_entity:
  132. for _ent in _entity.linked_entitys:
  133. print(_entity.entity_text,_ent.entity_text)
  134. '''
  135. prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
  136. log("get attributes done of doc_id%s"%(_doc_id))
  137. cost_time["attrs"] = time.time()-start_time
  138. #print(prem)
  139. data_res = Preprocessing.union_result(codeName, prem)[0][1]
  140. data_res["cost_time"] = cost_time
  141. data_res["success"] = True
  142. _article = list_articles[0]
  143. self.forward(page_time,int(_article.doc_id),json.dumps(data_res,cls=MyEncoder,ensure_ascii=False))
  144. except Exception as e:
  145. log("%s===error docid:%s"%(str(e),str(_doc_id)))