import sys import os from threading import Thread sys.path.append(os.path.abspath("../..")) #不使用gpu加速 os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "" import redis import time import json import numpy as np import BiddingKG.dl.interface.settings as settings import logging import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Preprocessing as Preprocessing import BiddingKG.dl.interface.getAttributes as getAttributes import pickle import run_utils from BiddingKG.dl.common.Utils import * logging.basicConfig(filename=settings.LOG_FILE, level=settings.LOG_LEVEL, format=settings.LOG_FORMAT, datefmt=settings.DATE_FORMAT) logger = logging.getLogger(__name__) getModel_w2v() getModel_word() #模型实例 codeNamePredict = predictor.CodeNamePredict() premPredict = predictor.PREMPredict() epcPredict = predictor.EPCPredict() roleRulePredict = predictor.RoleRulePredictor() # 连接redis数据库 db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB,password=settings.REDIS_PASS) def thread_run_model(_id,_jointime,list_articles,list_sentences,list_entitys): ''' @summary: run_model处理线程 ''' try: start_time = time.time() #list_articles,list_sentences,list_entitys = _data codeName = codeNamePredict.predict(list_articles) codename_time = time.time() premPredict.predict(list_sentences,list_entitys) prem_time = time.time() roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName) epcPredict.predict(list_sentences,list_entitys) epc_time = time.time() prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles) att_time = time.time() step = 4 ''' for entitys in list_entitys: for entity in entitys: print(entity.entity_text,entity.entity_type,entity.values) ''' if not run_utils.run_timeout(db,_id,"model_after",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id): union = Preprocessing.union_result(codeName,prem) step = 5 for item in union: db.set(item[0],json.dumps(item[1],cls=run_utils.MyEncoder)) step = 6 #Preprocessing.persistenceData1(list_entitys, list_sentences) #Preprocessing.persistenceData(union) #Preprocessing.dropTempDatas(ContentIDs) #print(step) ''' print("codename:",codename_time-process_time) print("prem",prem_time-codename_time) print("epc",epc_time-prem_time) print("getatt",time.time()-epc_time) ''' #print("process cost"+str(process_time)+" with length:"+str(len(q['content']))) logger.info(msg="process cost:%s,includes codename:%s,prem:%s,epc:%s,getatt:%s"% (str(att_time-start_time), str(codename_time-start_time), str(prem_time-codename_time), str(epc_time-prem_time), str(att_time-epc_time))) except Exception as e: print(str(e)) db.set(_id,run_utils.TIMEOUT_RES) return #os._exit(0) #循环从redis队列中取数据处理,处理结果保存到redis def redis_process(): len_processed = 0 predicts = [] predicts_entitys = [] count = 0 step = 0 while(True): try: #拿到队列,若需要多进程处理,需要改成带锁机制的弹出 #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1) step = 0 queue = db.blpop(settings.PREPROCESS_QUEUE) step = 1 _id = "None" if queue: #q = json.loads(queue[1].decode("utf-8")) preprocess_data = pickle.loads(queue[1]) _id = preprocess_data["id"] _jointime = preprocess_data["jointime"] _data = preprocess_data["data"] list_articles,list_sentences,list_entitys = _data if not run_utils.run_timeout(db,_id,"model_before",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id): _timeout = run_utils.getTimeOut(_jointime) if _timeout>0: t = Thread(target = thread_run_model, args=(_id,_jointime,list_articles,list_sentences,list_entitys)) t.daemon = True t.start() print("timeout",_timeout,time.time()-_jointime) t.join(timeout=_timeout) if t.isAlive(): run_utils.stop_thread(t) run_utils.run_timeout(db,_id,"model_ing",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id) t._stop() t._delete() except Exception as e: q = dict() q["error"] = str(e) q["step"] = "step:"+str(step) q["content"] = list_articles[0].sourceContent db.rpush(settings.ERROR_QUEUE, json.dumps(q,cls=run_utils.MyEncoder)) db.set(_id,run_utils.TIMEOUT_RES) logger.info("error"+str(e)) ''' finally: #若是没有执行完,就把数据放回去 if step<=5: db.rpush(settings.CONTENT_QUEUE, json.dumps(q)) ''' # 短暂等待 time.sleep(settings.SERVER_SLEEP) # if this is the main thread of execution start the model server process if __name__ == "__main__": logging.info("BiddingKG processing Server runing ") redis_process()