app.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. '''
  2. Created on 2019年12月3日
  3. @author: User
  4. '''
  5. import allspark
  6. import sys
  7. import os
  8. sys.path.append(os.path.dirname(__file__)+"/..")
  9. os.environ["KERAS_BACKEND"] = "tensorflow"
  10. import json
  11. import re
  12. import time
  13. import uuid
  14. from BiddingKG.dl.common.Utils import log
  15. from BiddingKG.dl.interface.extract import predict
  16. import numpy as np
  17. import ctypes
  18. import inspect
  19. from threading import Thread
  20. import traceback
  21. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  22. os.environ["CUDA_VISIBLE_DEVICES"] = ""
  23. sys.path.append(os.path.abspath("."))
  24. #自定义jsonEncoder
  25. class MyEncoder(json.JSONEncoder):
  26. def default(self, obj):
  27. if isinstance(obj, np.ndarray):
  28. return obj.tolist()
  29. elif isinstance(obj, bytes):
  30. return str(obj, encoding='utf-8')
  31. elif isinstance(obj, (np.float_, np.float16, np.float32,
  32. np.float64)):
  33. return float(obj)
  34. return json.JSONEncoder.default(self, obj)
  35. def _async_raise(tid, exctype):
  36. """raises the exception, performs cleanup if needed"""
  37. tid = ctypes.c_long(tid)
  38. if not inspect.isclass(exctype):
  39. exctype = type(exctype)
  40. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  41. if res == 0:
  42. raise ValueError("invalid thread id")
  43. elif res != 1:
  44. ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
  45. raise SystemError("PyThreadState_SetAsyncExc failed")
  46. def stop_thread(thread):
  47. _async_raise(thread.ident, SystemExit)
  48. class MyProcessor(allspark.BaseProcessor):
  49. """ MyProcessor is a example
  50. you can send mesage like this to predict
  51. curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105'
  52. """
  53. def run_thread(self,data,list_result):
  54. # data = data.decode("utf8")
  55. # data = json.loads(data,encoding="utf8")
  56. k = str(uuid.uuid4())
  57. cost_time = dict()
  58. _doc_id = data.get("doc_id","")
  59. _title = data.get("title","")
  60. _content = data.get("content","")
  61. _page_time = data.get("page_time","")
  62. data_res = ""
  63. try:
  64. if "content" in data:
  65. content = data['content']
  66. data_res = predict(_doc_id,_content,_title,_page_time)
  67. # log("get request of doc_id:%s"%(_doc_id))
  68. # k = str(uuid.uuid4())
  69. # cost_time = dict()
  70. #
  71. # start_time = time.time()
  72. # list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_preprocessed([[k,content,"",_doc_id,_title]],useselffool=True)
  73. # log("get preprocessed done of doc_id%s"%(_doc_id))
  74. # cost_time["preprocess"] = time.time()-start_time
  75. # cost_time.update(_cost_time)
  76. # '''
  77. # for articles in list_articles:
  78. # print(articles.content)
  79. #
  80. # '''
  81. # start_time = time.time()
  82. # codeName = self.codeNamePredict.predict(list_sentences,list_entitys=list_entitys)
  83. # log("get codename done of doc_id%s"%(_doc_id))
  84. # cost_time["codename"] = time.time()-start_time
  85. #
  86. # start_time = time.time()
  87. # self.premPredict.predict(list_sentences,list_entitys)
  88. #
  89. # self.premPredict.predict(list_sentences,list_entitys)
  90. # log("get prem done of doc_id%s"%(_doc_id))
  91. # cost_time["prem"] = time.time()-start_time
  92. # start_time = time.time()
  93. # self.roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
  94. # # self.roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
  95. # cost_time["rule"] = time.time()-start_time
  96. # start_time = time.time()
  97. # self.epcPredict.predict(list_sentences,list_entitys)
  98. # log("get epc done of doc_id%s"%(_doc_id))
  99. # cost_time["person"] = time.time()-start_time
  100. # start_time = time.time()
  101. # entityLink.link_entitys(list_entitys)
  102. # '''
  103. # for list_entity in list_entitys:
  104. # for _entity in list_entity:
  105. # for _ent in _entity.linked_entitys:
  106. # print(_entity.entity_text,_ent.entity_text)
  107. # '''
  108. # prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
  109. # log("get attributes done of doc_id%s"%(_doc_id))
  110. # cost_time["attrs"] = time.time()-start_time
  111. #
  112. #
  113. # '''
  114. #
  115. #
  116. # for entitys in list_entitys:
  117. # for entity in entitys:
  118. # print(entity.entity_text,entity.entity_type,entity.sentence_index,entity.begin_index,entity.label,entity.values)
  119. # '''
  120. # #print(prem)
  121. # data_res = predict(docid)
  122. # data_res["cost_time"] = cost_time
  123. # data_res["success"] = True
  124. #return json.dumps(Preprocessing.union_result(codeName, prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False)
  125. else:
  126. data_res = json.dumps({"success":False,"msg":"content not passed"})
  127. except Exception as e:
  128. traceback.print_exc()
  129. data_res = json.dumps({"success":False,"msg":str(e)})
  130. # 以json形式返回结果
  131. #_resp = json.dumps(data_res,cls=MyEncoder)
  132. #log(str(data["flag"])+str(data))
  133. log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res)))
  134. list_result.append(data_res)
  135. def initialize(self):
  136. """ load module, executed once at the start of the service
  137. do service intialization and load models in this function.
  138. """'''
  139. '''
  140. self.timeout = 60
  141. self.status_types = 5
  142. self.timeOfType = self.timeout//self.status_types
  143. def pre_proccess(self, data):
  144. """ data format pre process
  145. """
  146. x, y = data.split(b' ')
  147. return int(x), int(y)
  148. def post_process(self, data):
  149. """ proccess after process
  150. """
  151. return bytes(data, encoding='utf8')
  152. def process(self, data):
  153. """ process the request data
  154. """
  155. data = data.decode("utf8")
  156. data = json.loads(data,encoding="utf8")
  157. _doc_id = data.get("doc_id","")
  158. _title = data.get("title","")
  159. _content = data.get("content","")
  160. _page_time = data.get("page_time","")
  161. status_code = 200
  162. list_result = []
  163. _timeout = data.get("timeout",self.timeout)
  164. t = Thread(target=self.run_thread,args=(data,list_result))
  165. start_time = time.time()
  166. t.start()
  167. t.join(_timeout)
  168. if t.is_alive():
  169. stop_thread(t)
  170. status_code = 302#超时被kill
  171. data_res = json.dumps({"success":False,"msg":"timeout"})
  172. else:
  173. status_code += int((time.time()-start_time)//self.timeOfType+1)
  174. data_res = list_result[0]
  175. _resp = data_res
  176. # _resp = predict(doc_id=_doc_id,text=_content,title=_title,page_time=_page_time)
  177. return self.post_process(_resp),status_code
  178. if __name__ == '__main__':
  179. # paramter worker_threads indicates concurrency of processing
  180. #本地运行
  181. # allspark.default_properties().put("rpc.keepalive", 180000)
  182. #
  183. #
  184. # runner = MyProcessor(worker_threads=5,worker_processes=1,endpoint="0.0.0.0:15030")
  185. #PAI平台运行
  186. runner = MyProcessor()
  187. runner.run()