''' Created on 2019年8月2日 @author: User ''' import sys import os from BiddingKG.dl.common.Utils import save,load import codecs from threading import Thread import requests sys.path.append(os.path.abspath("../..")) #不使用gpu加速 os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "" import redis import BiddingKG.dl.interface.settings as settings import time import json import numpy as np import logging import BiddingKG.dl.interface.predictor as predictor import BiddingKG.dl.interface.Preprocessing as Preprocessing import BiddingKG.dl.interface.getAttributes as getAttributes import pickle import run_utils codeNamePredict = predictor.CodeNamePredict() premPredict = predictor.PREMPredict() epcPredict = predictor.EPCPredict() roleRulePredict = predictor.RoleRulePredictor() # 连接redis数据库 db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=6,password=settings.REDIS_PASS) '''''' def thread_run_model(_id,_jointime,list_articles,list_sentences,list_entitys): ''' @summary: run_model处理线程 ''' start_time = time.time() #list_articles,list_sentences,list_entitys = _data codeName = codeNamePredict.predict(list_articles) codename_time = time.time() premPredict.predict(list_sentences,list_entitys) prem_time = time.time() roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName) epcPredict.predict(list_sentences,list_entitys) epc_time = time.time() prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles) att_time = time.time() step = 4 ''' for entitys in list_entitys: for entity in entitys: print(entity.entity_text,entity.entity_type,entity.values) ''' if not run_utils.run_timeout(db,_id,"model_after",_jointime,_data,"123"): union = Preprocessing.union_result(codeName,prem) step = 5 for item in union: db.set(item[0],json.dumps(item[1],cls=run_utils.MyEncoder)) step = 6 #Preprocessing.persistenceData1(list_entitys, list_sentences) #Preprocessing.persistenceData(union) #Preprocessing.dropTempDatas(ContentIDs) #print(step) ''' print("codename:",codename_time-process_time) print("prem",prem_time-codename_time) print("epc",epc_time-prem_time) print("getatt",time.time()-epc_time) ''' #print("process cost"+str(process_time)+" with length:"+str(len(q['content']))) def thread_run_preprocess(_id,_jointime,ContentIDs): ''' @summary: 线程处理 ''' try: list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed(ContentIDs,useselffool=True) _index = 0 for ContentID in ContentIDs: _id = ContentID[0] _content = ContentID[1] _join_time = ContentID[2] _doc_id = ContentID[3] _title = ContentID[4] if not run_utils.run_timeout(db,_id,"preprocess_after",_jointime,_content,_doc_id): pass ''' article = list_articles[_index] sentences = list_sentences[_index] entitys = list_entitys[_index] preprocess_data = dict() preprocess_data["id"] = _id preprocess_data["jointime"] = _jointime preprocess_data["data"] = [[article],[sentences],[entitys]] db.rpush(settings.PREPROCESS_QUEUE,pickle.dumps(preprocess_data)) ''' _index += 1 except Exception as e: print(str(e)) return def test(name,content): user = { "content": content, "id":name } myheaders = {'Content-Type': 'application/json'} _resp = requests.post("http://192.168.2.101:15015" + '/article_extract', json=user, headers=myheaders, verify=True) resp_json = _resp.content.decode("utf-8") print(resp_json) return resp_json if __name__=="__main__": ''' queue = load("err.pk") print(queue) data = json.loads(queue[1].decode()) _data = data["content"] test("12",_data) list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed([["123",_data,time.time(),"123",""]], useselffool=True) print(data["error"]) print(1) while(True): _id = "123" _jointime = time.time() if not run_utils.run_timeout(db,_id,"model_before",_jointime,_data,"213"): _timeout = run_utils.getTimeOut(_jointime) if _timeout>0: #t = Thread(target = thread_run_model, args=(_id,_jointime,list_articles,list_sentences,list_entitys)) t = Thread(target=thread_run_preprocess,args=(_id,_jointime,[["123",_data,time.time(),"123",""]])) t.start() t.join(timeout=_timeout) print(time.time()-_jointime,t.isAlive()) if t.isAlive(): run_utils.stop_thread(t) #db.rpush(settings.TIMEOUT_QUEUE,pickle.dumps()) ''' print(2) while(True): queue = db.blpop(settings.ERROR_QUEUE) if queue: print(3) break save(queue,"err.pk")