123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- import sys
- import os
- from threading import Thread
- sys.path.append(os.path.abspath("../.."))
- #不使用gpu加速
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
- import redis
- import time
- import json
- import numpy as np
- import BiddingKG.dl.interface.settings as settings
- import logging
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- import BiddingKG.dl.interface.getAttributes as getAttributes
- import pickle
- import run_utils
- from BiddingKG.dl.common.Utils import *
- logging.basicConfig(filename=settings.LOG_FILE, level=settings.LOG_LEVEL,
- format=settings.LOG_FORMAT, datefmt=settings.DATE_FORMAT)
- logger = logging.getLogger(__name__)
- getModel_w2v()
- getModel_word()
- #模型实例
- codeNamePredict = predictor.CodeNamePredict()
- premPredict = predictor.PREMPredict()
- epcPredict = predictor.EPCPredict()
- roleRulePredict = predictor.RoleRulePredictor()
- # 连接redis数据库
- db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
- db=settings.REDIS_DB,password=settings.REDIS_PASS)
- def thread_run_model(_id,_jointime,list_articles,list_sentences,list_entitys):
- '''
- @summary: run_model处理线程
- '''
- try:
- start_time = time.time()
- #list_articles,list_sentences,list_entitys = _data
-
- codeName = codeNamePredict.predict(list_articles)
- codename_time = time.time()
-
- premPredict.predict(list_sentences,list_entitys)
- prem_time = time.time()
- roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
- epcPredict.predict(list_sentences,list_entitys)
- epc_time = time.time()
-
- prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
-
- att_time = time.time()
- step = 4
- '''
- for entitys in list_entitys:
- for entity in entitys:
- print(entity.entity_text,entity.entity_type,entity.values)
- '''
- if not run_utils.run_timeout(db,_id,"model_after",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id):
- union = Preprocessing.union_result(codeName,prem)
- step = 5
- for item in union:
- db.set(item[0],json.dumps(item[1],cls=run_utils.MyEncoder))
- step = 6
- #Preprocessing.persistenceData1(list_entitys, list_sentences)
- #Preprocessing.persistenceData(union)
- #Preprocessing.dropTempDatas(ContentIDs)
- #print(step)
- '''
- print("codename:",codename_time-process_time)
- print("prem",prem_time-codename_time)
- print("epc",epc_time-prem_time)
- print("getatt",time.time()-epc_time)
- '''
- #print("process cost"+str(process_time)+" with length:"+str(len(q['content'])))
- logger.info(msg="process cost:%s,includes codename:%s,prem:%s,epc:%s,getatt:%s"%
- (str(att_time-start_time),
- str(codename_time-start_time),
- str(prem_time-codename_time),
- str(epc_time-prem_time),
- str(att_time-epc_time)))
- except Exception as e:
- print(str(e))
- db.set(_id,run_utils.TIMEOUT_RES)
- return
- #os._exit(0)
-
- #循环从redis队列中取数据处理,处理结果保存到redis
- def redis_process():
- len_processed = 0
- predicts = []
- predicts_entitys = []
- count = 0
- step = 0
- while(True):
- try:
- #拿到队列,若需要多进程处理,需要改成带锁机制的弹出
- #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1)
- step = 0
- queue = db.blpop(settings.PREPROCESS_QUEUE)
- step = 1
- _id = "None"
- if queue:
-
- #q = json.loads(queue[1].decode("utf-8"))
- preprocess_data = pickle.loads(queue[1])
- _id = preprocess_data["id"]
- _jointime = preprocess_data["jointime"]
- _data = preprocess_data["data"]
- list_articles,list_sentences,list_entitys = _data
- if not run_utils.run_timeout(db,_id,"model_before",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id):
- _timeout = run_utils.getTimeOut(_jointime)
- if _timeout>0:
- t = Thread(target = thread_run_model, args=(_id,_jointime,list_articles,list_sentences,list_entitys))
- t.daemon = True
- t.start()
- print("timeout",_timeout,time.time()-_jointime)
- t.join(timeout=_timeout)
- if t.isAlive():
- run_utils.stop_thread(t)
- run_utils.run_timeout(db,_id,"model_ing",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id)
- t._stop()
- t._delete()
-
- except Exception as e:
- q = dict()
- q["error"] = str(e)
- q["step"] = "step:"+str(step)
- q["content"] = list_articles[0].sourceContent
- db.rpush(settings.ERROR_QUEUE, json.dumps(q,cls=run_utils.MyEncoder))
- db.set(_id,run_utils.TIMEOUT_RES)
- logger.info("error"+str(e))
-
- '''
- finally:
- #若是没有执行完,就把数据放回去
- if step<=5:
- db.rpush(settings.CONTENT_QUEUE, json.dumps(q))
- '''
-
- # 短暂等待
- time.sleep(settings.SERVER_SLEEP)
- # if this is the main thread of execution start the model server process
- if __name__ == "__main__":
- logging.info("BiddingKG processing Server runing ")
- redis_process()
|