run_preprocess_server.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import sys
  2. import os
  3. from threading import Thread
  4. sys.path.append(os.path.abspath("../.."))
  5. #不使用gpu加速
  6. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  7. os.environ["CUDA_VISIBLE_DEVICES"] = ""
  8. import redis
  9. import time
  10. import json
  11. import numpy as np
  12. import BiddingKG.dl.interface.settings as settings
  13. import BiddingKG.dl.interface.Preprocessing as Preprocessing
  14. import logging
  15. import pickle
  16. import run_utils
  17. logging.basicConfig(filename=settings.LOG_FILE, level=settings.LOG_LEVEL,
  18. format=settings.LOG_FORMAT, datefmt=settings.DATE_FORMAT)
  19. logger = logging.getLogger(__name__)
  20. # 连接redis数据库
  21. db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
  22. db=settings.REDIS_DB,password=settings.REDIS_PASS)
  23. def thread_run_preprocess(_id,_jointime,ContentIDs):
  24. '''
  25. @summary: 线程处理
  26. '''
  27. try:
  28. start_time = time.time()
  29. list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed(ContentIDs,useselffool=True)
  30. _index = 0
  31. for ContentID in ContentIDs:
  32. _id = ContentID[0]
  33. _content = ContentID[1]
  34. _join_time = ContentID[2]
  35. _doc_id = ContentID[3]
  36. _title = ContentID[4]
  37. if not run_utils.run_timeout(db,_id,"preprocess_after",_jointime,_content,_doc_id):
  38. article = list_articles[_index]
  39. sentences = list_sentences[_index]
  40. entitys = list_entitys[_index]
  41. preprocess_data = dict()
  42. preprocess_data["id"] = _id
  43. preprocess_data["jointime"] = _jointime
  44. preprocess_data["data"] = [[article],[sentences],[entitys]]
  45. db.rpush(settings.PREPROCESS_QUEUE,pickle.dumps(preprocess_data))
  46. logger.info(msg="preprocess cost "+str(time.time()-start_time)+" with length: "+str(len(article.content)))
  47. _index += 1
  48. process_time = time.time()-start_time
  49. except Exception as e:
  50. logger.info("error"+str(e))
  51. db.set(_id,run_utils.TIMEOUT_RES)
  52. return
  53. #循环从redis队列中取数据处理,处理结果保存到redis
  54. def redis_process():
  55. len_processed = 0
  56. MAX_LEN = 0
  57. step = 0
  58. while(True):
  59. try:
  60. #拿到队列,若需要多进程处理,需要改成带锁机制的弹出
  61. #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1)
  62. step = 0
  63. all_len = 0
  64. ContentIDs = []
  65. start_time = time.time()
  66. _id = "None";
  67. while(True):
  68. try:
  69. queue = db.lpop(settings.CONTENT_QUEUE)
  70. if queue:
  71. q = json.loads(queue.decode("utf-8"))
  72. _id = q['id']
  73. _content = q['content']
  74. _doc_id = q['doc_id']
  75. _title = q['title']
  76. _jointime = q['jointime']
  77. if not run_utils.run_timeout(db,_id,"preprocess_before",_jointime,_content,_doc_id):
  78. ContentIDs.append([_id,_content,_jointime,_doc_id,_title])
  79. all_len += len(q['content'])
  80. if all_len>MAX_LEN:
  81. break
  82. else:
  83. break
  84. except Exception as e:
  85. logger.info("error"+str(e))
  86. if ContentIDs:
  87. _timeout = run_utils.getTimeOut(_jointime)
  88. if _timeout>0:
  89. t = Thread(target=thread_run_preprocess,args=(_id,_jointime,ContentIDs))
  90. t.daemon = True
  91. t.start()
  92. t.join(_timeout)
  93. if t.isAlive():
  94. run_utils.stop_thread(t)
  95. run_utils.run_timeout(db,_id,"preprocess_ing",_jointime,_content,_doc_id)
  96. t._stop()
  97. t._delete()
  98. except Exception as e:
  99. logger.info("error preprocessing"+str(e))
  100. q["error"] = str(e)
  101. q["step"] = "step:"+str(step)
  102. q["content"] = _content
  103. db.rpush(settings.ERROR_QUEUE, json.dumps(q,cls=run_utils.MyEncoder))
  104. db.set(_id,run_utils.TIMEOUT_RES)
  105. # 短暂等待
  106. time.sleep(settings.SERVER_SLEEP)
  107. # if this is the main thread of execution start the model server process
  108. if __name__ == "__main__":
  109. if len(sys.argv)>1:
  110. if "yes" in sys.argv:
  111. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  112. os.environ["CUDA_VISIBLE_DEVICES"] = ""
  113. logging.info("BiddingKG preprocessing Server runing ")
  114. redis_process()