''' Created on 2019年12月3日 @author: User ''' import allspark import sys import os sys.path.append(os.path.dirname(__file__)+"/..") os.environ["KERAS_BACKEND"] = "tensorflow" import json import re import time import uuid from BiddingKG.dl.common.Utils import log from BiddingKG.dl.interface.extract import predict import numpy as np import ctypes import inspect from threading import Thread import traceback # os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" # os.environ["CUDA_VISIBLE_DEVICES"] = "" sys.path.append(os.path.abspath(".")) #自定义jsonEncoder class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) return json.JSONEncoder.default(self, obj) def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(thread): _async_raise(thread.ident, SystemExit) class MyProcessor(allspark.BaseProcessor): """ MyProcessor is a example you can send mesage like this to predict curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105' """ def run_thread(self,data,list_result): # data = data.decode("utf8") # data = json.loads(data,encoding="utf8") k = str(uuid.uuid4()) cost_time = dict() _doc_id = data.get("doc_id","") _title = data.get("title","") _content = data.get("content","") _page_time = data.get("page_time","") data_res = "" web_source_no = data.get("web_source_no","") original_docchannel = data.get("original_docchannel","") try: if "content" in data: data_res = predict(_doc_id,_content,_title,_page_time,web_source_no,original_docchannel) else: data_res = json.dumps({"success":False,"msg":"content not passed"}) except Exception as e: traceback.print_exc() data_res = json.dumps({"success":False,"msg":str(e)}) # 以json形式返回结果 #_resp = json.dumps(data_res,cls=MyEncoder) #log(str(data["flag"])+str(data)) log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res))) list_result.append(data_res) def initialize(self): """ load module, executed once at the start of the service do service intialization and load models in this function. """''' ''' self.timeout = 60 self.status_types = 5 self.timeOfType = self.timeout//self.status_types def pre_proccess(self, data): """ data format pre process """ x, y = data.split(b' ') return int(x), int(y) def post_process(self, data): """ proccess after process """ return bytes(data, encoding='utf8') def process(self, data): """ process the request data """ data = data.decode("utf8") data = json.loads(data,encoding="utf8") status_code = 200 list_result = [] _timeout = data.get("timeout",self.timeout) t = Thread(target=self.run_thread,args=(data,list_result)) start_time = time.time() t.start() t.join(_timeout) if t.is_alive(): stop_thread(t) status_code = 302#超时被kill data_res = json.dumps({"success":False,"msg":"timeout"}) else: status_code += int((time.time()-start_time)%self.timeOfType+1) data_res = list_result[0] _resp = data_res # _resp = predict(doc_id=_doc_id,text=_content,title=_title,page_time=_page_time) return self.post_process(_resp),status_code def getPort(argv): port = 15030 for item in argv: _l = str(item).split("port=") if len(_l)>1: port = int(_l[-1]) break return port if __name__ == '__main__': # paramter worker_threads indicates concurrency of processing #本地运行 port = getPort(argv=sys.argv) allspark.default_properties().put("rpc.keepalive", 250000) allspark.default_properties().put("rpc.max_queue_size", 100) log("port==%d"%(port)) # # runner = MyProcessor(worker_threads=5,worker_processes=1,endpoint="0.0.0.0:%d"%(port)) #PAI平台运行 # runner = MyProcessor() runner.run()