123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- '''
- Created on 2019年8月2日
- @author: User
- '''
- import sys
- import os
- from BiddingKG.dl.common.Utils import save,load
- import codecs
- from threading import Thread
- import requests
- sys.path.append(os.path.abspath("../.."))
- #不使用gpu加速
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
- import redis
- import BiddingKG.dl.interface.settings as settings
- import time
- import json
- import numpy as np
- import logging
- import BiddingKG.dl.interface.predictor as predictor
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- import BiddingKG.dl.interface.getAttributes as getAttributes
- import pickle
- import run_utils
- codeNamePredict = predictor.CodeNamePredict()
- premPredict = predictor.PREMPredict()
- epcPredict = predictor.EPCPredict()
- roleRulePredict = predictor.RoleRulePredictor()
- # 连接redis数据库
- db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
- db=6,password=settings.REDIS_PASS)
- ''''''
- def thread_run_model(_id,_jointime,list_articles,list_sentences,list_entitys):
- '''
- @summary: run_model处理线程
- '''
- start_time = time.time()
- #list_articles,list_sentences,list_entitys = _data
-
- codeName = codeNamePredict.predict(list_articles)
- codename_time = time.time()
-
- premPredict.predict(list_sentences,list_entitys)
- prem_time = time.time()
- roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
- epcPredict.predict(list_sentences,list_entitys)
- epc_time = time.time()
-
- prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
-
- att_time = time.time()
- step = 4
- '''
- for entitys in list_entitys:
- for entity in entitys:
- print(entity.entity_text,entity.entity_type,entity.values)
- '''
- if not run_utils.run_timeout(db,_id,"model_after",_jointime,_data,"123"):
- union = Preprocessing.union_result(codeName,prem)
- step = 5
- for item in union:
- db.set(item[0],json.dumps(item[1],cls=run_utils.MyEncoder))
- step = 6
- #Preprocessing.persistenceData1(list_entitys, list_sentences)
- #Preprocessing.persistenceData(union)
- #Preprocessing.dropTempDatas(ContentIDs)
- #print(step)
- '''
- print("codename:",codename_time-process_time)
- print("prem",prem_time-codename_time)
- print("epc",epc_time-prem_time)
- print("getatt",time.time()-epc_time)
- '''
- #print("process cost"+str(process_time)+" with length:"+str(len(q['content'])))
-
- def thread_run_preprocess(_id,_jointime,ContentIDs):
- '''
- @summary: 线程处理
- '''
- try:
- list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed(ContentIDs,useselffool=True)
-
- _index = 0
- for ContentID in ContentIDs:
- _id = ContentID[0]
- _content = ContentID[1]
- _join_time = ContentID[2]
- _doc_id = ContentID[3]
- _title = ContentID[4]
- if not run_utils.run_timeout(db,_id,"preprocess_after",_jointime,_content,_doc_id):
- pass
- '''
- article = list_articles[_index]
- sentences = list_sentences[_index]
- entitys = list_entitys[_index]
- preprocess_data = dict()
- preprocess_data["id"] = _id
- preprocess_data["jointime"] = _jointime
- preprocess_data["data"] = [[article],[sentences],[entitys]]
- db.rpush(settings.PREPROCESS_QUEUE,pickle.dumps(preprocess_data))
- '''
- _index += 1
-
- except Exception as e:
- print(str(e))
-
- return
- def test(name,content):
- user = {
- "content": content,
- "id":name
- }
- myheaders = {'Content-Type': 'application/json'}
- _resp = requests.post("http://192.168.2.101:15015" + '/article_extract', json=user, headers=myheaders, verify=True)
- resp_json = _resp.content.decode("utf-8")
- print(resp_json)
- return resp_json
- if __name__=="__main__":
- '''
- queue = load("err.pk")
- print(queue)
- data = json.loads(queue[1].decode())
- _data = data["content"]
- test("12",_data)
-
- list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed([["123",_data,time.time(),"123",""]], useselffool=True)
- print(data["error"])
- print(1)
- while(True):
- _id = "123"
- _jointime = time.time()
- if not run_utils.run_timeout(db,_id,"model_before",_jointime,_data,"213"):
- _timeout = run_utils.getTimeOut(_jointime)
- if _timeout>0:
- #t = Thread(target = thread_run_model, args=(_id,_jointime,list_articles,list_sentences,list_entitys))
- t = Thread(target=thread_run_preprocess,args=(_id,_jointime,[["123",_data,time.time(),"123",""]]))
- t.start()
- t.join(timeout=_timeout)
- print(time.time()-_jointime,t.isAlive())
- if t.isAlive():
- run_utils.stop_thread(t)
- #db.rpush(settings.TIMEOUT_QUEUE,pickle.dumps())
- '''
- print(2)
- while(True):
- queue = db.blpop(settings.ERROR_QUEUE)
- if queue:
- print(3)
- break
- save(queue,"err.pk")
-
|