''' Created on 2019年7月29日 @author: User ''' import sys import os import json sys.path.append(os.path.abspath("..")) import logging import time import redis import settings import numpy as np import extractFlow from module.Utils import log #自定义jsonEncoder class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) return json.JSONEncoder.default(self, obj) # 连接redis数据库 db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB,password=settings.REDIS_PASS) DATA_ERROR = json.dumps({"flag":False,"success":False},cls=MyEncoder) #循环从redis队列中取数据处理,处理结果保存到redis def redis_process(): MAX_LEN = 0 MAX_TRY_TIMES = 2 while(True): try: #拿到队列,若需要多进程处理,需要改成带锁机制的弹出 #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1) step = 0 ContentIDs = [] while(True): try: queue = db.lpop(settings.CONTENT_QUEUE) if queue: q = json.loads(queue.decode("utf-8")) ContentIDs.append([q['id'],q['listpage_url']]) break else: time.sleep(settings.SERVER_SLEEP) except Exception as e: log("pop_error-"+str(e)) if len(ContentIDs)>0: for ContentID in ContentIDs: _try_times = 0 _process_flag = False k = ContentID[0] listpage_url = ContentID[1] log("begin to getting rule of listpage:"+str(listpage_url)) data = extractFlow.ruleExtract(listpage_url) db.set(k,json.dumps(data,cls=MyEncoder)) log("done for setting result of listpage:"+str(listpage_url)) except Exception as e: log("error"+str(e)) db.set(k,json.dumps(data,cls=MyEncoder)) # 短暂等待 time.sleep(settings.SERVER_SLEEP) # if this is the main thread of execution start the model server process if __name__ == "__main__": if len(sys.argv)>1: if "yes" in sys.argv: os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "" log("BiddingKG preprocessing Server runing ") redis_process()