123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import sys
- import os
- from threading import Thread
- sys.path.append(os.path.abspath("../.."))
- #不使用gpu加速
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
- import redis
- import time
- import json
- import numpy as np
- import BiddingKG.dl.interface.settings as settings
- import BiddingKG.dl.interface.Preprocessing as Preprocessing
- import logging
- import pickle
- import run_utils
- logging.basicConfig(filename=settings.LOG_FILE, level=settings.LOG_LEVEL,
- format=settings.LOG_FORMAT, datefmt=settings.DATE_FORMAT)
- logger = logging.getLogger(__name__)
- # 连接redis数据库
- db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
- db=settings.REDIS_DB,password=settings.REDIS_PASS)
- def thread_run_preprocess(_id,_jointime,ContentIDs):
- '''
- @summary: 线程处理
- '''
- try:
- start_time = time.time()
- list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed(ContentIDs,useselffool=True)
-
- _index = 0
- for ContentID in ContentIDs:
- _id = ContentID[0]
- _content = ContentID[1]
- _join_time = ContentID[2]
- _doc_id = ContentID[3]
- _title = ContentID[4]
- if not run_utils.run_timeout(db,_id,"preprocess_after",_jointime,_content,_doc_id):
- article = list_articles[_index]
- sentences = list_sentences[_index]
- entitys = list_entitys[_index]
- preprocess_data = dict()
- preprocess_data["id"] = _id
- preprocess_data["jointime"] = _jointime
- preprocess_data["data"] = [[article],[sentences],[entitys]]
- db.rpush(settings.PREPROCESS_QUEUE,pickle.dumps(preprocess_data))
- logger.info(msg="preprocess cost "+str(time.time()-start_time)+" with length: "+str(len(article.content)))
- _index += 1
-
- process_time = time.time()-start_time
- except Exception as e:
- logger.info("error"+str(e))
- db.set(_id,run_utils.TIMEOUT_RES)
-
- return
-
- #循环从redis队列中取数据处理,处理结果保存到redis
- def redis_process():
- len_processed = 0
- MAX_LEN = 0
- step = 0
- while(True):
- try:
- #拿到队列,若需要多进程处理,需要改成带锁机制的弹出
- #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1)
- step = 0
- all_len = 0
- ContentIDs = []
- start_time = time.time()
- _id = "None";
- while(True):
- try:
- queue = db.lpop(settings.CONTENT_QUEUE)
- if queue:
- q = json.loads(queue.decode("utf-8"))
- _id = q['id']
- _content = q['content']
- _doc_id = q['doc_id']
- _title = q['title']
- _jointime = q['jointime']
- if not run_utils.run_timeout(db,_id,"preprocess_before",_jointime,_content,_doc_id):
- ContentIDs.append([_id,_content,_jointime,_doc_id,_title])
- all_len += len(q['content'])
- if all_len>MAX_LEN:
- break
- else:
- break
- except Exception as e:
- logger.info("error"+str(e))
-
- if ContentIDs:
- _timeout = run_utils.getTimeOut(_jointime)
- if _timeout>0:
- t = Thread(target=thread_run_preprocess,args=(_id,_jointime,ContentIDs))
- t.daemon = True
- t.start()
- t.join(_timeout)
- if t.isAlive():
- run_utils.stop_thread(t)
- run_utils.run_timeout(db,_id,"preprocess_ing",_jointime,_content,_doc_id)
- t._stop()
- t._delete()
-
-
- except Exception as e:
- logger.info("error preprocessing"+str(e))
- q["error"] = str(e)
- q["step"] = "step:"+str(step)
- q["content"] = _content
- db.rpush(settings.ERROR_QUEUE, json.dumps(q,cls=run_utils.MyEncoder))
- db.set(_id,run_utils.TIMEOUT_RES)
- # 短暂等待
- time.sleep(settings.SERVER_SLEEP)
- # if this is the main thread of execution start the model server process
- if __name__ == "__main__":
- if len(sys.argv)>1:
- if "yes" in sys.argv:
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
- logging.info("BiddingKG preprocessing Server runing ")
- redis_process()
|