app.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. '''
  2. Created on 2019年12月3日
  3. @author: User
  4. '''
  5. import allspark
  6. import sys
  7. import os
  8. import json
  9. import re
  10. import time
  11. import uuid
  12. from BiddingKG.dl.common.Utils import log
  13. import BiddingKG.dl.interface.predictor as predictor
  14. import BiddingKG.dl.interface.Preprocessing as Preprocessing
  15. import BiddingKG.dl.interface.getAttributes as getAttributes
  16. import BiddingKG.dl.entityLink.entityLink as entityLink
  17. import numpy as np
  18. import ctypes
  19. import inspect
  20. from threading import Thread
  21. import traceback
  22. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  23. os.environ["CUDA_VISIBLE_DEVICES"] = ""
  24. sys.path.append(os.path.abspath("."))
  25. #自定义jsonEncoder
  26. class MyEncoder(json.JSONEncoder):
  27. def default(self, obj):
  28. if isinstance(obj, np.ndarray):
  29. return obj.tolist()
  30. elif isinstance(obj, bytes):
  31. return str(obj, encoding='utf-8')
  32. elif isinstance(obj, (np.float_, np.float16, np.float32,
  33. np.float64)):
  34. return float(obj)
  35. return json.JSONEncoder.default(self, obj)
  36. def _async_raise(tid, exctype):
  37. """raises the exception, performs cleanup if needed"""
  38. tid = ctypes.c_long(tid)
  39. if not inspect.isclass(exctype):
  40. exctype = type(exctype)
  41. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  42. if res == 0:
  43. raise ValueError("invalid thread id")
  44. elif res != 1:
  45. ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
  46. raise SystemError("PyThreadState_SetAsyncExc failed")
  47. def stop_thread(thread):
  48. _async_raise(thread.ident, SystemExit)
  49. class MyProcessor(allspark.BaseProcessor):
  50. """ MyProcessor is a example
  51. you can send mesage like this to predict
  52. curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105'
  53. """
  54. def run_thread(self,data,list_result):
  55. # data = data.decode("utf8")
  56. # data = json.loads(data,encoding="utf8")
  57. k = str(uuid.uuid4())
  58. cost_time = dict()
  59. if "doc_id" in data:
  60. _doc_id = data['doc_id']
  61. else:
  62. _doc_id = ""
  63. if "title" in data:
  64. _title = data["title"]
  65. else:
  66. _title = ""
  67. data_res = ""
  68. try:
  69. if "content" in data:
  70. log("get request of doc_id:%s"%(_doc_id))
  71. k = str(uuid.uuid4())
  72. cost_time = dict()
  73. content = data['content']
  74. start_time = time.time()
  75. list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_preprocessed([[k,content,"",_doc_id,_title]],useselffool=True)
  76. log("get preprocessed done of doc_id%s"%(_doc_id))
  77. cost_time["preprocess"] = time.time()-start_time
  78. cost_time.update(_cost_time)
  79. '''
  80. for articles in list_articles:
  81. print(articles.content)
  82. '''
  83. start_time = time.time()
  84. codeName = self.codeNamePredict.predict(list_sentences,list_entitys=list_entitys)
  85. log("get codename done of doc_id%s"%(_doc_id))
  86. cost_time["codename"] = time.time()-start_time
  87. start_time = time.time()
  88. self.premPredict.predict(list_sentences,list_entitys)
  89. self.premPredict.predict(list_sentences,list_entitys)
  90. log("get prem done of doc_id%s"%(_doc_id))
  91. cost_time["prem"] = time.time()-start_time
  92. start_time = time.time()
  93. self.roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
  94. # self.roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
  95. cost_time["rule"] = time.time()-start_time
  96. start_time = time.time()
  97. self.epcPredict.predict(list_sentences,list_entitys)
  98. log("get epc done of doc_id%s"%(_doc_id))
  99. cost_time["person"] = time.time()-start_time
  100. start_time = time.time()
  101. entityLink.link_entitys(list_entitys)
  102. '''
  103. for list_entity in list_entitys:
  104. for _entity in list_entity:
  105. for _ent in _entity.linked_entitys:
  106. print(_entity.entity_text,_ent.entity_text)
  107. '''
  108. prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
  109. log("get attributes done of doc_id%s"%(_doc_id))
  110. cost_time["attrs"] = time.time()-start_time
  111. '''
  112. for entitys in list_entitys:
  113. for entity in entitys:
  114. print(entity.entity_text,entity.entity_type,entity.sentence_index,entity.begin_index,entity.label,entity.values)
  115. '''
  116. #print(prem)
  117. data_res = Preprocessing.union_result(codeName, prem)[0][1]
  118. data_res["cost_time"] = cost_time
  119. data_res["success"] = True
  120. #return json.dumps(Preprocessing.union_result(codeName, prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False)
  121. else:
  122. data_res = {"success":False,"msg":"content not passed"}
  123. except Exception as e:
  124. traceback.print_exc()
  125. data_res = {"success":False,"msg":str(e)}
  126. # 以json形式返回结果
  127. #_resp = json.dumps(data_res,cls=MyEncoder)
  128. #log(str(data["flag"])+str(data))
  129. log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res)))
  130. list_result.append(data_res)
  131. def initialize(self):
  132. """ load module, executed once at the start of the service
  133. do service intialization and load models in this function.
  134. """'''
  135. '''
  136. self.codeNamePredict = predictor.CodeNamePredict()
  137. self.premPredict = predictor.PREMPredict()
  138. self.epcPredict = predictor.EPCPredict()
  139. self.roleRulePredict = predictor.RoleRulePredictor()
  140. self.timeout = 60
  141. self.status_types = 5
  142. self.timeOfType = self.timeout//self.status_types
  143. def pre_proccess(self, data):
  144. """ data format pre process
  145. """
  146. x, y = data.split(b' ')
  147. return int(x), int(y)
  148. def post_process(self, data):
  149. """ proccess after process
  150. """
  151. return bytes(data, encoding='utf8')
  152. def process(self, data):
  153. """ process the request data
  154. """
  155. data = data.decode("utf8")
  156. data = json.loads(data,encoding="utf8")
  157. # k = str(uuid.uuid4())
  158. # cost_time = dict()
  159. # if "doc_id" in data:
  160. # _doc_id = data['doc_id']
  161. # else:
  162. # _doc_id = ""
  163. # if "title" in data:
  164. # _title = data["title"]
  165. # else:
  166. # _title = ""
  167. # data_res = ""
  168. # try:
  169. # if "content" in data:
  170. # log("get request of doc_id:%s"%(_doc_id))
  171. # k = str(uuid.uuid4())
  172. # cost_time = dict()
  173. # content = data['content']
  174. # start_time = time.time()
  175. # list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_articles_processed([[k,content,"",_doc_id,_title]],useselffool=True)
  176. # log("get preprocessed done of doc_id%s"%(_doc_id))
  177. # cost_time["preprocess"] = time.time()-start_time
  178. # cost_time.update(_cost_time)
  179. # '''
  180. # for articles in list_articles:
  181. # print(articles.content)
  182. #
  183. # '''
  184. # start_time = time.time()
  185. # codeName = self.codeNamePredict.predict(list_articles,MAX_AREA=2000)
  186. # log("get codename done of doc_id%s"%(_doc_id))
  187. # cost_time["codename"] = time.time()-start_time
  188. #
  189. # start_time = time.time()
  190. # self.premPredict.predict(list_sentences,list_entitys)
  191. # log("get prem done of doc_id%s"%(_doc_id))
  192. # cost_time["prem"] = time.time()-start_time
  193. # start_time = time.time()
  194. # self.roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
  195. # cost_time["rule"] = time.time()-start_time
  196. # start_time = time.time()
  197. # self.epcPredict.predict(list_sentences,list_entitys)
  198. # log("get epc done of doc_id%s"%(_doc_id))
  199. # cost_time["person"] = time.time()-start_time
  200. # start_time = time.time()
  201. # entityLink.link_entitys(list_entitys)
  202. # '''
  203. # for list_entity in list_entitys:
  204. # for _entity in list_entity:
  205. # for _ent in _entity.linked_entitys:
  206. # print(_entity.entity_text,_ent.entity_text)
  207. # '''
  208. # prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
  209. # log("get attributes done of doc_id%s"%(_doc_id))
  210. # cost_time["attrs"] = time.time()-start_time
  211. #
  212. #
  213. # '''
  214. #
  215. #
  216. # for entitys in list_entitys:
  217. # for entity in entitys:
  218. # print(entity.entity_text,entity.entity_type,entity.sentence_index,entity.begin_index,entity.label,entity.values)
  219. # '''
  220. # #print(prem)
  221. # data_res = Preprocessing.union_result(codeName, prem)[0][1]
  222. # data_res["cost_time"] = cost_time
  223. # data_res["success"] = True
  224. # #return json.dumps(Preprocessing.union_result(codeName, prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False)
  225. # else:
  226. # data_res = {"success":False,"msg":"content not passed"}
  227. #
  228. #
  229. # except Exception as e:
  230. # data_res = {"success":False,"msg":str(e)}
  231. # # 以json形式返回结果
  232. # _resp = json.dumps(data_res,cls=MyEncoder)
  233. # #log(str(data["flag"])+str(data))
  234. # log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res)))
  235. _timeout = self.timeout
  236. status_code = 200
  237. if "timeout" in data:
  238. _timeout = data["timeout"]
  239. list_result = []
  240. t = Thread(target=self.run_thread,args=(data,list_result))
  241. start_time = time.time()
  242. t.start()
  243. t.join(_timeout)
  244. if t.is_alive():
  245. stop_thread(t)
  246. status_code = 302#超时被kill
  247. data_res = {"success":False,"msg":"timeout"}
  248. else:
  249. status_code += int((time.time()-start_time)//self.timeOfType+1)
  250. data_res = list_result[0]
  251. _resp = json.dumps(data_res,cls=MyEncoder)
  252. return self.post_process(_resp),status_code
  253. if __name__ == '__main__':
  254. # paramter worker_threads indicates concurrency of processing
  255. #本地运行
  256. allspark.default_properties().put("rpc.keepalive", 60000)
  257. runner = MyProcessor(worker_threads=5,worker_processes=1,endpoint="0.0.0.0:15030")
  258. #PAI平台运行
  259. # runner = MyProcessor()
  260. runner.run()