123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- # -*- coding: utf-8 -*-
- """
- Created on Fri Jun 1 18:03:03 2018
- @author: DONG
- """
- import sys
- import os
- from flask import Flask, jsonify
- from flask import abort
- from flask import request
- sys.path.append(os.path.dirname(__file__)+"/..")
- os.environ["KERAS_BACKEND"] = "tensorflow"
- app = Flask(__name__)
- app.config['JSON_AS_ASCII'] = False
- limit_num = "4"
- os.environ["OMP_NUM_THREADS"] = limit_num # 1为一个核,设置为5的时候,系统显示用了10个核,不太清楚之间的具体数量关系
- os.environ["OMP_NUM_THREADS"] = limit_num # export OMP_NUM_THREADS=1
- os.environ["OPENBLAS_NUM_THREADS"] = limit_num # export OPENBLAS_NUM_THREADS=1
- os.environ["MKL_NUM_THREADS"] = limit_num # export MKL_NUM_THREADS=1
- os.environ["VECLIB_MAXIMUM_THREADS"] = limit_num # export VECLIB_MAXIMUM_THREADS=1
- os.environ["NUMEXPR_NUM_THREADS"] = limit_num # export NUMEXPR_NUM_THREADS=1
- import time
- import uuid
- import numpy as np
- import ctypes
- import inspect
- from threading import Thread
- import traceback
- import json
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- sys.path.append(os.path.abspath("."))
- #自定义jsonEncoder
- class MyEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- elif isinstance(obj, bytes):
- return str(obj, encoding='utf-8')
- elif isinstance(obj, (np.float_, np.float16, np.float32,
- np.float64)):
- return float(obj)
- return json.JSONEncoder.default(self, obj)
- def _async_raise(tid, exctype):
- """raises the exception, performs cleanup if needed"""
- tid = ctypes.c_long(tid)
- if not inspect.isclass(exctype):
- exctype = type(exctype)
- res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
- if res == 0:
- raise ValueError("invalid thread id")
- elif res != 1:
- ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
- raise SystemError("PyThreadState_SetAsyncExc failed")
- def stop_thread(thread):
- _async_raise(thread.ident, SystemExit)
- def run_thread(data,list_result):
- # data = data.decode("utf8")
- # data = json.loads(data,encoding="utf8")
- k = str(uuid.uuid4())
- cost_time = dict()
- _doc_id = data.get("doc_id","")
- _title = data.get("title","")
- _content = data.get("content","")
- _page_time = data.get("page_time","")
- data_res = ""
- web_source_no = data.get("web_source_no","")
- web_source_name = data.get("web_source_name","")
- original_docchannel = data.get("original_docchannel","")
- # print("web_source_name:",web_source_name)
- is_fail = False
- try:
- if _content!="":
- data_res = predict(_doc_id,_content,_title,_page_time,web_source_no=web_source_no,web_source_name=web_source_name,original_docchannel=original_docchannel)
- else:
- data_res = json.dumps({"success":False,"msg":"content not passed"})
- # is_fail = True
- except Exception as e:
- traceback.print_exc()
- data_res = json.dumps({"success":False,"msg":str(e)})
- is_fail = True
- # 以json形式返回结果
- #_resp = json.dumps(data_res,cls=MyEncoder)
- #log(str(data["flag"])+str(data))
- # log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res)))
- list_result.append(data_res)
- if is_fail:
- list_result.append(is_fail)
- @app.route("/test",methods=['POST'])
- def test():
- from BiddingKG.dl.common.Utils import log
- from BiddingKG.dl.interface.extract import predict
- global predict,log
- _time = time.time()
- a = request.form.get("content")
- log("get form takes %.2fs"%(time.time()-_time))
- return json.dumps(sys.getsizeof(request.form)),201
- @app.route('/content_extract', methods=['POST'])
- def text_predict():
- from BiddingKG.dl.common.Utils import log
- from BiddingKG.dl.interface.extract import predict
- global predict,log
- _time = time.time()
- data = request.json
- status_code = 200
- list_result = []
- _timeout = data.get("timeout",400)
- log("get data cost:%.2fs"%((time.time()-_time)))
- t = Thread(target=run_thread,args=(data,list_result))
- start_time = time.time()
- t.start()
- t.join(_timeout)
- if t.is_alive():
- stop_thread(t)
- status_code = 302#超时被kill
- data_res = json.dumps({"success":False,"msg":"timeout"})
- else:
- # status_code += int((time.time()-start_time)%10+1)
- status_code = 201
- data_res = list_result[0]
- if len(list_result)>1 and list_result[1] ==True:
- status_code = 500
- _resp = data_res
- # _resp = predict(doc_id=_doc_id,text=_content,title=_title,page_time=_page_time)
- return _resp,status_code
- def getPort(argv):
- port = 15030
- print(argv)
- for item in argv:
- _l = str(item).split("port=")
- if len(_l)>1:
- port = int(_l[-1])
- break
- return port
- def getWorkers(argv):
- worker = 15
- for item in argv:
- _l = str(item).split("worker=")
- if len(_l)>1:
- worker = int(_l[-1])
- break
- return worker
- def start_with_tornado(port,process_num):
- from tornado.wsgi import WSGIContainer
- from tornado.httpserver import HTTPServer
- from tornado.ioloop import IOLoop
- print("import ")
- http_server = HTTPServer(WSGIContainer(app))
- # http_server.listen(port) #shortcut for bind and start
- http_server.bind(port)
- http_server.start(process_num)
- IOLoop.instance().start()
- def start_with_flask():
- port = getPort(argv=sys.argv)
- app.run(host='0.0.0.0', port=port, threaded=True, debug=False)
- log("ContentExtractor running")
- # app.run()
- if __name__ == '__main__':
- port = getPort(argv=sys.argv)
- workers = getWorkers(argv=sys.argv)
- start_with_tornado(port,workers)
- pass
|