import sys import os from threading import Thread sys.path.append(os.path.abspath("../..")) #不使用gpu加速 os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "" import redis import time import json import numpy as np import BiddingKG.dl.interface.settings as settings import BiddingKG.dl.interface.Preprocessing as Preprocessing import logging import pickle import run_utils logging.basicConfig(filename=settings.LOG_FILE, level=settings.LOG_LEVEL, format=settings.LOG_FORMAT, datefmt=settings.DATE_FORMAT) logger = logging.getLogger(__name__) # 连接redis数据库 db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB,password=settings.REDIS_PASS) def thread_run_preprocess(_id,_jointime,ContentIDs): ''' @summary: 线程处理 ''' try: start_time = time.time() list_articles,list_sentences,list_entitys = Preprocessing.get_preprocessed(ContentIDs,useselffool=True) _index = 0 for ContentID in ContentIDs: _id = ContentID[0] _content = ContentID[1] _join_time = ContentID[2] _doc_id = ContentID[3] _title = ContentID[4] if not run_utils.run_timeout(db,_id,"preprocess_after",_jointime,_content,_doc_id): article = list_articles[_index] sentences = list_sentences[_index] entitys = list_entitys[_index] preprocess_data = dict() preprocess_data["id"] = _id preprocess_data["jointime"] = _jointime preprocess_data["data"] = [[article],[sentences],[entitys]] db.rpush(settings.PREPROCESS_QUEUE,pickle.dumps(preprocess_data)) logger.info(msg="preprocess cost "+str(time.time()-start_time)+" with length: "+str(len(article.content))) _index += 1 process_time = time.time()-start_time except Exception as e: logger.info("error"+str(e)) db.set(_id,run_utils.TIMEOUT_RES) return #循环从redis队列中取数据处理,处理结果保存到redis def redis_process(): len_processed = 0 MAX_LEN = 0 step = 0 while(True): try: #拿到队列,若需要多进程处理,需要改成带锁机制的弹出 #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1) step = 0 all_len = 0 ContentIDs = [] start_time = time.time() _id = "None"; while(True): try: queue = db.lpop(settings.CONTENT_QUEUE) if queue: q = json.loads(queue.decode("utf-8")) _id = q['id'] _content = q['content'] _doc_id = q['doc_id'] _title = q['title'] _jointime = q['jointime'] if not run_utils.run_timeout(db,_id,"preprocess_before",_jointime,_content,_doc_id): ContentIDs.append([_id,_content,_jointime,_doc_id,_title]) all_len += len(q['content']) if all_len>MAX_LEN: break else: break except Exception as e: logger.info("error"+str(e)) if ContentIDs: _timeout = run_utils.getTimeOut(_jointime) if _timeout>0: t = Thread(target=thread_run_preprocess,args=(_id,_jointime,ContentIDs)) t.daemon = True t.start() t.join(_timeout) if t.isAlive(): run_utils.stop_thread(t) run_utils.run_timeout(db,_id,"preprocess_ing",_jointime,_content,_doc_id) t._stop() t._delete() except Exception as e: logger.info("error preprocessing"+str(e)) q["error"] = str(e) q["step"] = "step:"+str(step) q["content"] = _content db.rpush(settings.ERROR_QUEUE, json.dumps(q,cls=run_utils.MyEncoder)) db.set(_id,run_utils.TIMEOUT_RES) # 短暂等待 time.sleep(settings.SERVER_SLEEP) # if this is the main thread of execution start the model server process if __name__ == "__main__": if len(sys.argv)>1: if "yes" in sys.argv: os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "" logging.info("BiddingKG preprocessing Server runing ") redis_process()