run_extract_server.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Fri Jun 1 18:03:03 2018
  4. @author: DONG
  5. """
  6. import sys
  7. import os
  8. from flask import Flask, jsonify
  9. from flask import abort
  10. from flask import request
  11. sys.path.append(os.path.dirname(__file__)+"/..")
  12. os.environ["KERAS_BACKEND"] = "tensorflow"
  13. app = Flask(__name__)
  14. app.config['JSON_AS_ASCII'] = False
  15. import time
  16. import uuid
  17. from BiddingKG.dl.common.Utils import log
  18. from BiddingKG.dl.interface.extract import predict
  19. import numpy as np
  20. import ctypes
  21. import inspect
  22. from threading import Thread
  23. import traceback
  24. import json
  25. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  26. os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
  27. sys.path.append(os.path.abspath("."))
  28. #自定义jsonEncoder
  29. class MyEncoder(json.JSONEncoder):
  30. def default(self, obj):
  31. if isinstance(obj, np.ndarray):
  32. return obj.tolist()
  33. elif isinstance(obj, bytes):
  34. return str(obj, encoding='utf-8')
  35. elif isinstance(obj, (np.float_, np.float16, np.float32,
  36. np.float64)):
  37. return float(obj)
  38. return json.JSONEncoder.default(self, obj)
  39. def _async_raise(tid, exctype):
  40. """raises the exception, performs cleanup if needed"""
  41. tid = ctypes.c_long(tid)
  42. if not inspect.isclass(exctype):
  43. exctype = type(exctype)
  44. res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
  45. if res == 0:
  46. raise ValueError("invalid thread id")
  47. elif res != 1:
  48. ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
  49. raise SystemError("PyThreadState_SetAsyncExc failed")
  50. def stop_thread(thread):
  51. _async_raise(thread.ident, SystemExit)
  52. def run_thread(data,list_result):
  53. # data = data.decode("utf8")
  54. # data = json.loads(data,encoding="utf8")
  55. k = str(uuid.uuid4())
  56. cost_time = dict()
  57. _doc_id = data.get("doc_id","")
  58. _title = data.get("title","")
  59. _content = data.get("content","")
  60. _page_time = data.get("page_time","")
  61. data_res = ""
  62. web_source_no = data.get("web_source_no","")
  63. original_docchannel = data.get("original_docchannel","")
  64. is_fail = False
  65. try:
  66. if _content!="":
  67. data_res = predict(_doc_id,_content,_title,_page_time,web_source_no,original_docchannel)
  68. else:
  69. data_res = json.dumps({"success":False,"msg":"content not passed"})
  70. # is_fail = True
  71. except Exception as e:
  72. traceback.print_exc()
  73. data_res = json.dumps({"success":False,"msg":str(e)})
  74. is_fail = True
  75. # 以json形式返回结果
  76. #_resp = json.dumps(data_res,cls=MyEncoder)
  77. #log(str(data["flag"])+str(data))
  78. log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res)))
  79. list_result.append(data_res)
  80. if is_fail:
  81. list_result.append(is_fail)
  82. @app.route("/test",methods=['POST'])
  83. def test():
  84. _time = time.time()
  85. a = request.form.get("content")
  86. log("get form takes %.2fs"%(time.time()-_time))
  87. return json.dumps(sys.getsizeof(request.form)),201
  88. @app.route('/content_extract', methods=['POST'])
  89. def text_predict():
  90. _time = time.time()
  91. data = request.json
  92. status_code = 200
  93. list_result = []
  94. _timeout = data.get("timeout",400)
  95. log("get data cost:%.2fs"%((time.time()-_time)))
  96. t = Thread(target=run_thread,args=(data,list_result))
  97. start_time = time.time()
  98. t.start()
  99. t.join(_timeout)
  100. if t.is_alive():
  101. stop_thread(t)
  102. status_code = 302#超时被kill
  103. data_res = json.dumps({"success":False,"msg":"timeout"})
  104. else:
  105. # status_code += int((time.time()-start_time)%10+1)
  106. status_code = 201
  107. data_res = list_result[0]
  108. if len(list_result)>1 and list_result[1] ==True:
  109. status_code = 500
  110. _resp = data_res
  111. # _resp = predict(doc_id=_doc_id,text=_content,title=_title,page_time=_page_time)
  112. return _resp,status_code
  113. def getPort(argv):
  114. port = 15030
  115. for item in argv:
  116. _l = str(item).split("port=")
  117. if len(_l)>1:
  118. port = int(_l[-1])
  119. break
  120. return port
  121. if __name__ == '__main__':
  122. port = getPort(argv=sys.argv)
  123. app.run(host='0.0.0.0', port=port, threaded=True, debug=False)
  124. log("ContentExtractor running")
  125. # app.run()