# -*- coding: utf-8 -*- """ Created on Fri Jun 1 18:03:03 2018 @author: DONG """ import sys import os from flask import Flask, jsonify from flask import abort from flask import request sys.path.append(os.path.dirname(__file__)+"/..") os.environ["KERAS_BACKEND"] = "tensorflow" app = Flask(__name__) app.config['JSON_AS_ASCII'] = False limit_num = "4" os.environ["OMP_NUM_THREADS"] = limit_num # 1为一个核,设置为5的时候,系统显示用了10个核,不太清楚之间的具体数量关系 os.environ["OMP_NUM_THREADS"] = limit_num # export OMP_NUM_THREADS=1 os.environ["OPENBLAS_NUM_THREADS"] = limit_num # export OPENBLAS_NUM_THREADS=1 os.environ["MKL_NUM_THREADS"] = limit_num # export MKL_NUM_THREADS=1 os.environ["VECLIB_MAXIMUM_THREADS"] = limit_num # export VECLIB_MAXIMUM_THREADS=1 os.environ["NUMEXPR_NUM_THREADS"] = limit_num # export NUMEXPR_NUM_THREADS=1 import time import uuid import numpy as np import ctypes import inspect from threading import Thread import traceback import json os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "-1" sys.path.append(os.path.abspath(".")) #自定义jsonEncoder class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) return json.JSONEncoder.default(self, obj) def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(thread): _async_raise(thread.ident, SystemExit) def run_thread(data,list_result): # data = data.decode("utf8") # data = json.loads(data,encoding="utf8") k = str(uuid.uuid4()) cost_time = dict() _doc_id = data.get("doc_id","") _title = data.get("title","") _content = data.get("content","") _page_time = data.get("page_time","") data_res = "" web_source_no = data.get("web_source_no","") web_source_name = data.get("web_source_name","") original_docchannel = data.get("original_docchannel","") # print("web_source_name:",web_source_name) is_fail = False try: if _content!="": data_res = predict(_doc_id,_content,_title,_page_time,web_source_no=web_source_no,web_source_name=web_source_name,original_docchannel=original_docchannel) else: data_res = json.dumps({"success":False,"msg":"content not passed"}) # is_fail = True except Exception as e: traceback.print_exc() data_res = json.dumps({"success":False,"msg":str(e)}) is_fail = True # 以json形式返回结果 #_resp = json.dumps(data_res,cls=MyEncoder) #log(str(data["flag"])+str(data)) # log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res))) list_result.append(data_res) if is_fail: list_result.append(is_fail) @app.route("/test",methods=['POST']) def test(): from BiddingKG.dl.common.Utils import log from BiddingKG.dl.interface.extract import predict global predict,log _time = time.time() a = request.form.get("content") log("get form takes %.2fs"%(time.time()-_time)) return json.dumps(sys.getsizeof(request.form)),201 @app.route('/content_extract', methods=['POST']) def text_predict(): from BiddingKG.dl.common.Utils import log from BiddingKG.dl.interface.extract import predict global predict,log _time = time.time() data = request.json status_code = 200 list_result = [] _timeout = data.get("timeout",400) log("get data cost:%.2fs"%((time.time()-_time))) t = Thread(target=run_thread,args=(data,list_result)) start_time = time.time() t.start() t.join(_timeout) if t.is_alive(): stop_thread(t) status_code = 302#超时被kill data_res = json.dumps({"success":False,"msg":"timeout"}) else: # status_code += int((time.time()-start_time)%10+1) status_code = 201 data_res = list_result[0] if len(list_result)>1 and list_result[1] ==True: status_code = 500 _resp = data_res # _resp = predict(doc_id=_doc_id,text=_content,title=_title,page_time=_page_time) return _resp,status_code def getPort(argv): port = 15030 print(argv) for item in argv: _l = str(item).split("port=") if len(_l)>1: port = int(_l[-1]) break return port def getWorkers(argv): worker = 15 for item in argv: _l = str(item).split("worker=") if len(_l)>1: worker = int(_l[-1]) break return worker def start_with_tornado(port,process_num): from tornado.wsgi import WSGIContainer from tornado.httpserver import HTTPServer from tornado.ioloop import IOLoop print("import ") http_server = HTTPServer(WSGIContainer(app)) # http_server.listen(port) #shortcut for bind and start http_server.bind(port) http_server.start(process_num) IOLoop.instance().start() def start_with_flask(): port = getPort(argv=sys.argv) app.run(host='0.0.0.0', port=port, threaded=True, debug=False) log("ContentExtractor running") # app.run() if __name__ == '__main__': port = getPort(argv=sys.argv) workers = getWorkers(argv=sys.argv) start_with_tornado(port,workers) pass