''' Created on 2019年12月3日 @author: User ''' import allspark import sys import os os.environ["KERAS_BACKEND"] = "tensorflow" import json import re import time import uuid from BiddingKG.dl.common.Utils import log from BiddingKG.dl.interface.extract import predict import numpy as np import ctypes import inspect from threading import Thread import traceback os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "" sys.path.append(os.path.abspath(".")) #自定义jsonEncoder class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) return json.JSONEncoder.default(self, obj) def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(thread): _async_raise(thread.ident, SystemExit) class MyProcessor(allspark.BaseProcessor): """ MyProcessor is a example you can send mesage like this to predict curl -v http://127.0.0.1:8080/api/predict/service_name -d '2 105' """ def run_thread(self,data,list_result): # data = data.decode("utf8") # data = json.loads(data,encoding="utf8") k = str(uuid.uuid4()) cost_time = dict() if "doc_id" in data: _doc_id = data['doc_id'] else: _doc_id = "" if "title" in data: _title = data["title"] else: _title = "" data_res = "" try: if "content" in data: log("get request of doc_id:%s"%(_doc_id)) k = str(uuid.uuid4()) cost_time = dict() content = data['content'] start_time = time.time() list_articles,list_sentences,list_entitys,_cost_time = Preprocessing.get_preprocessed([[k,content,"",_doc_id,_title]],useselffool=True) log("get preprocessed done of doc_id%s"%(_doc_id)) cost_time["preprocess"] = time.time()-start_time cost_time.update(_cost_time) ''' for articles in list_articles: print(articles.content) ''' start_time = time.time() codeName = self.codeNamePredict.predict(list_sentences,list_entitys=list_entitys) log("get codename done of doc_id%s"%(_doc_id)) cost_time["codename"] = time.time()-start_time start_time = time.time() self.premPredict.predict(list_sentences,list_entitys) self.premPredict.predict(list_sentences,list_entitys) log("get prem done of doc_id%s"%(_doc_id)) cost_time["prem"] = time.time()-start_time start_time = time.time() self.roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName) # self.roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName) cost_time["rule"] = time.time()-start_time start_time = time.time() self.epcPredict.predict(list_sentences,list_entitys) log("get epc done of doc_id%s"%(_doc_id)) cost_time["person"] = time.time()-start_time start_time = time.time() entityLink.link_entitys(list_entitys) ''' for list_entity in list_entitys: for _entity in list_entity: for _ent in _entity.linked_entitys: print(_entity.entity_text,_ent.entity_text) ''' prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles) log("get attributes done of doc_id%s"%(_doc_id)) cost_time["attrs"] = time.time()-start_time ''' for entitys in list_entitys: for entity in entitys: print(entity.entity_text,entity.entity_type,entity.sentence_index,entity.begin_index,entity.label,entity.values) ''' #print(prem) data_res = predict(docid) data_res["cost_time"] = cost_time data_res["success"] = True #return json.dumps(Preprocessing.union_result(codeName, prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False) else: data_res = {"success":False,"msg":"content not passed"} except Exception as e: traceback.print_exc() data_res = {"success":False,"msg":str(e)} # 以json形式返回结果 #_resp = json.dumps(data_res,cls=MyEncoder) #log(str(data["flag"])+str(data)) log("done for doc_id:%s with result:%s"%(_doc_id,str(data_res))) list_result.append(data_res) def initialize(self): """ load module, executed once at the start of the service do service intialization and load models in this function. """''' ''' self.timeout = 60 self.status_types = 5 self.timeOfType = self.timeout//self.status_types def pre_proccess(self, data): """ data format pre process """ x, y = data.split(b' ') return int(x), int(y) def post_process(self, data): """ proccess after process """ return bytes(data, encoding='utf8') def process(self, data): """ process the request data """ data = data.decode("utf8") data = json.loads(data,encoding="utf8") _doc_id = data.get("doc_id","") _title = data.get("title","") _content = data.get("content","") status_code = 200 # if "timeout" in data: # _timeout = data["timeout"] list_result = [] # t = Thread(target=self.run_thread,args=(data,list_result)) # start_time = time.time() # t.start() # t.join(_timeout) # if t.is_alive(): # stop_thread(t) # status_code = 302#超时被kill # data_res = {"success":False,"msg":"timeout"} # else: # status_code += int((time.time()-start_time)//self.timeOfType+1) # data_res = list_result[0] # _resp = json.dumps(data_res,cls=MyEncoder) _resp = predict(doc_id=_doc_id,text=_content,title=_title) return self.post_process(_resp),status_code if __name__ == '__main__': # paramter worker_threads indicates concurrency of processing #本地运行 allspark.default_properties().put("rpc.keepalive", 60000) runner = MyProcessor(worker_threads=5,worker_processes=1,endpoint="0.0.0.0:15030") #PAI平台运行 # runner = MyProcessor() runner.run()