1.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. '''
  2. Created on 2019年8月2日
  3. @author: User
  4. '''
  5. import sys
  6. import os
  7. from BiddingKG.dl.common.Utils import save,load
  8. import codecs
  9. from threading import Thread
  10. import requests
  11. sys.path.append(os.path.abspath("../.."))
  12. #不使用gpu加速
  13. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  14. os.environ["CUDA_VISIBLE_DEVICES"] = ""
  15. import redis
  16. import BiddingKG.dl.interface.settings as settings
  17. import time
  18. import json
  19. import numpy as np
  20. import logging
  21. import BiddingKG.dl.interface.predictor as predictor
  22. import BiddingKG.dl.interface.Preprocessing as Preprocessing
  23. import BiddingKG.dl.interface.getAttributes as getAttributes
  24. import pickle
  25. import run_utils
  26. codeNamePredict = predictor.CodeNamePredict()
  27. premPredict = predictor.PREMPredict()
  28. epcPredict = predictor.EPCPredict()
  29. roleRulePredict = predictor.RoleRulePredictor()
  30. # 连接redis数据库
  31. db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
  32. db=6,password=settings.REDIS_PASS)
  33. ''''''
  34. def thread_run_model(_id,_jointime,list_articles,list_sentences,list_entitys):
  35. '''
  36. @summary: run_model处理线程
  37. '''
  38. start_time = time.time()
  39. #list_articles,list_sentences,list_entitys = _data
  40. codeName = codeNamePredict.predict(list_articles)
  41. codename_time = time.time()
  42. premPredict.predict(list_sentences,list_entitys)
  43. prem_time = time.time()
  44. roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
  45. epcPredict.predict(list_sentences,list_entitys)
  46. epc_time = time.time()
  47. prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
  48. att_time = time.time()
  49. step = 4
  50. '''
  51. for entitys in list_entitys:
  52. for entity in entitys:
  53. print(entity.entity_text,entity.entity_type,entity.values)
  54. '''
  55. if not run_utils.run_timeout(db,_id,"model_after",_jointime,_data,"123"):
  56. union = Preprocessing.union_result(codeName,prem)
  57. step = 5
  58. for item in union:
  59. db.set(item[0],json.dumps(item[1],cls=run_utils.MyEncoder))
  60. step = 6
  61. #Preprocessing.persistenceData1(list_entitys, list_sentences)
  62. #Preprocessing.persistenceData(union)
  63. #Preprocessing.dropTempDatas(ContentIDs)
  64. #print(step)
  65. '''
  66. print("codename:",codename_time-process_time)
  67. print("prem",prem_time-codename_time)
  68. print("epc",epc_time-prem_time)
  69. print("getatt",time.time()-epc_time)
  70. '''
  71. #print("process cost"+str(process_time)+" with length:"+str(len(q['content'])))
  72. def thread_run_preprocess(_id,_jointime,ContentIDs):
  73. '''
  74. @summary: 线程处理
  75. '''
  76. try:
  77. list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed(ContentIDs,useselffool=True)
  78. _index = 0
  79. for ContentID in ContentIDs:
  80. _id = ContentID[0]
  81. _content = ContentID[1]
  82. _join_time = ContentID[2]
  83. _doc_id = ContentID[3]
  84. _title = ContentID[4]
  85. if not run_utils.run_timeout(db,_id,"preprocess_after",_jointime,_content,_doc_id):
  86. pass
  87. '''
  88. article = list_articles[_index]
  89. sentences = list_sentences[_index]
  90. entitys = list_entitys[_index]
  91. preprocess_data = dict()
  92. preprocess_data["id"] = _id
  93. preprocess_data["jointime"] = _jointime
  94. preprocess_data["data"] = [[article],[sentences],[entitys]]
  95. db.rpush(settings.PREPROCESS_QUEUE,pickle.dumps(preprocess_data))
  96. '''
  97. _index += 1
  98. except Exception as e:
  99. print(str(e))
  100. return
  101. def test(name,content):
  102. user = {
  103. "content": content,
  104. "id":name
  105. }
  106. myheaders = {'Content-Type': 'application/json'}
  107. _resp = requests.post("http://192.168.2.101:15015" + '/article_extract', json=user, headers=myheaders, verify=True)
  108. resp_json = _resp.content.decode("utf-8")
  109. print(resp_json)
  110. return resp_json
  111. if __name__=="__main__":
  112. '''
  113. queue = load("err.pk")
  114. print(queue)
  115. data = json.loads(queue[1].decode())
  116. _data = data["content"]
  117. test("12",_data)
  118. list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed([["123",_data,time.time(),"123",""]], useselffool=True)
  119. print(data["error"])
  120. print(1)
  121. while(True):
  122. _id = "123"
  123. _jointime = time.time()
  124. if not run_utils.run_timeout(db,_id,"model_before",_jointime,_data,"213"):
  125. _timeout = run_utils.getTimeOut(_jointime)
  126. if _timeout>0:
  127. #t = Thread(target = thread_run_model, args=(_id,_jointime,list_articles,list_sentences,list_entitys))
  128. t = Thread(target=thread_run_preprocess,args=(_id,_jointime,[["123",_data,time.time(),"123",""]]))
  129. t.start()
  130. t.join(timeout=_timeout)
  131. print(time.time()-_jointime,t.isAlive())
  132. if t.isAlive():
  133. run_utils.stop_thread(t)
  134. #db.rpush(settings.TIMEOUT_QUEUE,pickle.dumps())
  135. '''
  136. print(2)
  137. while(True):
  138. queue = db.blpop(settings.ERROR_QUEUE)
  139. if queue:
  140. print(3)
  141. break
  142. save(queue,"err.pk")