run_model_server.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import sys
  2. import os
  3. from threading import Thread
  4. sys.path.append(os.path.abspath("../.."))
  5. #不使用gpu加速
  6. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  7. os.environ["CUDA_VISIBLE_DEVICES"] = ""
  8. import redis
  9. import time
  10. import json
  11. import numpy as np
  12. import BiddingKG.dl.interface.settings as settings
  13. import logging
  14. import BiddingKG.dl.interface.predictor as predictor
  15. import BiddingKG.dl.interface.Preprocessing as Preprocessing
  16. import BiddingKG.dl.interface.getAttributes as getAttributes
  17. import pickle
  18. import run_utils
  19. from BiddingKG.dl.common.Utils import *
  20. logging.basicConfig(filename=settings.LOG_FILE, level=settings.LOG_LEVEL,
  21. format=settings.LOG_FORMAT, datefmt=settings.DATE_FORMAT)
  22. logger = logging.getLogger(__name__)
  23. getModel_w2v()
  24. getModel_word()
  25. #模型实例
  26. codeNamePredict = predictor.CodeNamePredict()
  27. premPredict = predictor.PREMPredict()
  28. epcPredict = predictor.EPCPredict()
  29. roleRulePredict = predictor.RoleRulePredictor()
  30. # 连接redis数据库
  31. db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
  32. db=settings.REDIS_DB,password=settings.REDIS_PASS)
  33. def thread_run_model(_id,_jointime,list_articles,list_sentences,list_entitys):
  34. '''
  35. @summary: run_model处理线程
  36. '''
  37. try:
  38. start_time = time.time()
  39. #list_articles,list_sentences,list_entitys = _data
  40. codeName = codeNamePredict.predict(list_articles)
  41. codename_time = time.time()
  42. premPredict.predict(list_sentences,list_entitys)
  43. prem_time = time.time()
  44. roleRulePredict.predict(list_articles,list_sentences, list_entitys,codeName)
  45. epcPredict.predict(list_sentences,list_entitys)
  46. epc_time = time.time()
  47. prem = getAttributes.getPREMs(list_sentences,list_entitys,list_articles)
  48. att_time = time.time()
  49. step = 4
  50. '''
  51. for entitys in list_entitys:
  52. for entity in entitys:
  53. print(entity.entity_text,entity.entity_type,entity.values)
  54. '''
  55. if not run_utils.run_timeout(db,_id,"model_after",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id):
  56. union = Preprocessing.union_result(codeName,prem)
  57. step = 5
  58. for item in union:
  59. db.set(item[0],json.dumps(item[1],cls=run_utils.MyEncoder))
  60. step = 6
  61. #Preprocessing.persistenceData1(list_entitys, list_sentences)
  62. #Preprocessing.persistenceData(union)
  63. #Preprocessing.dropTempDatas(ContentIDs)
  64. #print(step)
  65. '''
  66. print("codename:",codename_time-process_time)
  67. print("prem",prem_time-codename_time)
  68. print("epc",epc_time-prem_time)
  69. print("getatt",time.time()-epc_time)
  70. '''
  71. #print("process cost"+str(process_time)+" with length:"+str(len(q['content'])))
  72. logger.info(msg="process cost:%s,includes codename:%s,prem:%s,epc:%s,getatt:%s"%
  73. (str(att_time-start_time),
  74. str(codename_time-start_time),
  75. str(prem_time-codename_time),
  76. str(epc_time-prem_time),
  77. str(att_time-epc_time)))
  78. except Exception as e:
  79. print(str(e))
  80. db.set(_id,run_utils.TIMEOUT_RES)
  81. return
  82. #os._exit(0)
  83. #循环从redis队列中取数据处理,处理结果保存到redis
  84. def redis_process():
  85. len_processed = 0
  86. predicts = []
  87. predicts_entitys = []
  88. count = 0
  89. step = 0
  90. while(True):
  91. try:
  92. #拿到队列,若需要多进程处理,需要改成带锁机制的弹出
  93. #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1)
  94. step = 0
  95. queue = db.blpop(settings.PREPROCESS_QUEUE)
  96. step = 1
  97. _id = "None"
  98. if queue:
  99. #q = json.loads(queue[1].decode("utf-8"))
  100. preprocess_data = pickle.loads(queue[1])
  101. _id = preprocess_data["id"]
  102. _jointime = preprocess_data["jointime"]
  103. _data = preprocess_data["data"]
  104. list_articles,list_sentences,list_entitys = _data
  105. if not run_utils.run_timeout(db,_id,"model_before",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id):
  106. _timeout = run_utils.getTimeOut(_jointime)
  107. if _timeout>0:
  108. t = Thread(target = thread_run_model, args=(_id,_jointime,list_articles,list_sentences,list_entitys))
  109. t.daemon = True
  110. t.start()
  111. print("timeout",_timeout,time.time()-_jointime)
  112. t.join(timeout=_timeout)
  113. if t.isAlive():
  114. run_utils.stop_thread(t)
  115. run_utils.run_timeout(db,_id,"model_ing",_jointime,list_articles[0].sourceContent,list_articles[0].doc_id)
  116. t._stop()
  117. t._delete()
  118. except Exception as e:
  119. q = dict()
  120. q["error"] = str(e)
  121. q["step"] = "step:"+str(step)
  122. q["content"] = list_articles[0].sourceContent
  123. db.rpush(settings.ERROR_QUEUE, json.dumps(q,cls=run_utils.MyEncoder))
  124. db.set(_id,run_utils.TIMEOUT_RES)
  125. logger.info("error"+str(e))
  126. '''
  127. finally:
  128. #若是没有执行完,就把数据放回去
  129. if step<=5:
  130. db.rpush(settings.CONTENT_QUEUE, json.dumps(q))
  131. '''
  132. # 短暂等待
  133. time.sleep(settings.SERVER_SLEEP)
  134. # if this is the main thread of execution start the model server process
  135. if __name__ == "__main__":
  136. logging.info("BiddingKG processing Server runing ")
  137. redis_process()