evaluates.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. #coding=utf-8
  2. # evaluate为该方法的入口函数,必须用这个名字
  3. from odps.udf import annotate
  4. from odps.distcache import get_cache_archive
  5. from odps.distcache import get_cache_file
  6. from odps.udf import BaseUDTF
  7. import threading
  8. import logging
  9. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  10. import time
  11. from multiprocessing import Process,Queue
  12. def log(msg):
  13. logging.info(msg)
  14. # 配置pandas依赖包
  15. def include_package_path(res_name):
  16. import os, sys
  17. archive_files = get_cache_archive(res_name)
  18. dir_names = sorted([os.path.dirname(os.path.normpath(f.name)) for f in archive_files
  19. if '.dist_info' not in f.name], key=lambda v: len(v))
  20. _path = dir_names[0].split(".zip/files")[0]+".zip/files"
  21. log("add path:%s"%(_path))
  22. sys.path.append(_path)
  23. return _path
  24. # 可能出现类似RuntimeError: xxx has been blocked by sandbox
  25. # 这是因为包含C的库,会被沙盘block,可设置set odps.isolation.session.enable = true
  26. def include_file(file_name):
  27. import os, sys
  28. so_file = get_cache_file(file_name)
  29. sys.path.append(os.path.dirname(os.path.abspath(so_file.name)))
  30. def include_so(file_name):
  31. import os, sys
  32. so_file = get_cache_file(file_name)
  33. with open(so_file.name, 'rb') as fp:
  34. content=fp.read()
  35. so = open(file_name, "wb")
  36. so.write(content)
  37. so.flush()
  38. so.close()
  39. #初始化业务数据包,由于上传限制,python版本以及archive解压包不统一等各种问题,需要手动导入
  40. def init_env(list_files,package_name):
  41. import os,sys
  42. if len(list_files)==1:
  43. so_file = get_cache_file(list_files[0])
  44. cmd_line = os.path.abspath(so_file.name)
  45. os.system("unzip -o %s -d %s"%(cmd_line,package_name))
  46. elif len(list_files)>1:
  47. cmd_line = "cat"
  48. for _file in list_files:
  49. so_file = get_cache_file(_file)
  50. cmd_line += " "+os.path.abspath(so_file.name)
  51. cmd_line += " > temp.zip"
  52. os.system(cmd_line)
  53. os.system("unzip -o temp.zip -d %s"%(package_name))
  54. # os.system("rm -rf %s/*.dist-info"%(package_name))
  55. # return os.listdir(os.path.abspath("local_package"))
  56. # os.system("echo export LD_LIBRARY_PATH=%s >> ~/.bashrc"%(os.path.abspath("local_package")))
  57. # os.system("source ~/.bashrc")
  58. sys.path.insert(0,os.path.abspath(package_name))
  59. # sys.path.append(os.path.join(os.path.abspath("local_package"),"interface_real"))
  60. def multiLoadEnv():
  61. def load_project():
  62. start_time = time.time()
  63. ## init_env(["BiddingKG.zip.env.baseline"],str(uuid.uuid4()))
  64. # init_env(["BiddingKG.zip.env.backup"],str(uuid.uuid4()))
  65. #改为zip引入
  66. log("=======")
  67. include_package_path("BiddingKG.baseline.zip")
  68. # include_package_path("BiddingKG.backup.zip")
  69. logging.info("init biddingkg.zip.env.line cost %d"%(time.time()-start_time))
  70. def load_vector():
  71. start_time = time.time()
  72. # init_env(["wiki_128_word_embedding_new.vector.env"],".")
  73. include_package_path("wiki.zip")
  74. logging.info("init wiki_128_word_embedding_new cost %d"%(time.time()-start_time))
  75. start_time = time.time()
  76. # init_env(["enterprise.zip.env"],".")
  77. # init_env(["LEGAL_ENTERPRISE.zip.env"],".")
  78. include_package_path("enterprise.zip")
  79. logging.info("init legal_enterprise.zip.env cost %d"%(time.time()-start_time))
  80. start_time = time.time()
  81. init_env(["so.env"],".")
  82. logging.info("init so.env cost %d"%(time.time()-start_time))
  83. def load_py():
  84. start_time = time.time()
  85. # self.out = init_env(["envs_py37.zip.env"],str(uuid.uuid4()))
  86. include_package_path("envs_py37.env.zip")
  87. # include_package_path("envs_py35.zip")
  88. logging.info("init envs_py cost %d"%(time.time()-start_time))
  89. load_project()
  90. load_vector()
  91. load_py()
  92. @annotate("string,bigint,string,string->string,bigint,string")
  93. class Extract(BaseUDTF):
  94. def f_queue_process(self,task_queue,result_queue):
  95. log("start import predict function")
  96. from BiddingKG.dl.interface.extract import predict as predict
  97. log("import done")
  98. while True:
  99. try:
  100. item = task_queue.get(True,timeout=10)
  101. result_json = predict(item.get("docid",""),item.get("content",""),item.get("title",""))
  102. result_queue.put(result_json)
  103. except:
  104. log("get data time out")
  105. pass
  106. def __init__(self):
  107. # self.out = init_env(["BiddingKG.z01","BiddingKG.z02"],"local_package")
  108. import uuid
  109. global uuid
  110. import logging
  111. import datetime
  112. import time
  113. global time
  114. logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  115. multiLoadEnv()
  116. # import BiddingKG.dl.common.nerUtils
  117. # log("time5"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  118. # import BiddingKG.dl.interface.predictor as predictor
  119. # log("time6"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  120. # import BiddingKG.dl.interface.Entitys as Entitys
  121. # log("time6.1"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  122. # import BiddingKG.dl.interface.getAttributes as getAttributes
  123. # log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  124. # import BiddingKG.dl.entityLink.entityLink as entityLink
  125. # log("time6.2"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  126. # import BiddingKG.dl.interface.Preprocessing as Preprocessing
  127. # log("time6.3"+str(datetime.datetime.now().strftime('%y-%m-%d %H:%M:%S')))
  128. # log("start import predict function")
  129. # from BiddingKG.dl.interface.extract import predict as predict
  130. # log("import done")
  131. import json
  132. self.task_queue = Queue()
  133. self.result_queue = Queue()
  134. self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
  135. self.deal_process.start()
  136. import numpy as np
  137. global predictor,Entitys,getAttributes,entityLink,json,MyEncoder,Preprocessing,MyEncoder,np,predict
  138. class MyEncoder(json.JSONEncoder):
  139. def default(self, obj):
  140. if isinstance(obj, np.ndarray):
  141. return obj.tolist()
  142. elif isinstance(obj, bytes):
  143. return str(obj, encoding='utf-8')
  144. elif isinstance(obj, (np.float_, np.float16, np.float32,
  145. np.float64)):
  146. return float(obj)
  147. elif isinstance(obj,(np.int64)):
  148. return int(obj)
  149. return json.JSONEncoder.default(self, obj)
  150. def process(self,content,_doc_id,_title,page_time):
  151. # #直接处理
  152. # if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]:
  153. # result_json = predict(str(_doc_id),content,str(_title))
  154. # self.forward(page_time,int(_doc_id),result_json)
  155. if content is not None and _doc_id not in [105677700,126694044,126795572,126951461,71708072,137850637]:
  156. #清除队列中的数据
  157. try:
  158. while(self.task_queue.qsize()>0):
  159. self.task_queue.get(timeout=5)
  160. except Exception as e:
  161. pass
  162. try:
  163. while(self.result_queue.qsize()>0):
  164. self.result_queue.get(timeout=5)
  165. except Exception as e:
  166. pass
  167. _item = {"docid":_doc_id,"content":content,"title":_title,"page_time":page_time}
  168. try:
  169. _timeout = 60*4
  170. if not self.deal_process.is_alive():
  171. log("deal process is down")
  172. self.task_queue = Queue()
  173. self.deal_process = Process(target=self.f_queue_process,args=(self.task_queue,self.result_queue))
  174. self.deal_process.start()
  175. _timeout += 60*4
  176. log("putting item to task_queue with docid:%s"%(str(_doc_id)))
  177. self.task_queue.put(_item)
  178. result_json = self.result_queue.get(timeout=_timeout)
  179. self.forward(page_time,int(_doc_id),result_json)
  180. except Exception as e:
  181. log("dealing docid %s failed by timeout"%(str(_doc_id)))
  182. self.deal_process.kill()