1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- '''
- Created on 2019年7月29日
- @author: User
- '''
- import sys
- import os
- import json
- sys.path.append(os.path.abspath(".."))
- import logging
- import time
- import redis
- import settings
- import numpy as np
- import extractFlow
- from module.Utils import log
- #自定义jsonEncoder
- class MyEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- elif isinstance(obj, bytes):
- return str(obj, encoding='utf-8')
- elif isinstance(obj, (np.float_, np.float16, np.float32,
- np.float64)):
- return float(obj)
- return json.JSONEncoder.default(self, obj)
- # 连接redis数据库
- db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
- db=settings.REDIS_DB,password=settings.REDIS_PASS)
- DATA_ERROR = json.dumps({"flag":False,"success":False},cls=MyEncoder)
- #循环从redis队列中取数据处理,处理结果保存到redis
- def redis_process():
- MAX_LEN = 0
- MAX_TRY_TIMES = 2
- while(True):
- try:
- #拿到队列,若需要多进程处理,需要改成带锁机制的弹出
- #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1)
- step = 0
- ContentIDs = []
- while(True):
- try:
- queue = db.lpop(settings.CONTENT_QUEUE)
- if queue:
- q = json.loads(queue.decode("utf-8"))
- ContentIDs.append([q['id'],q['listpage_url']])
- break
- else:
- time.sleep(settings.SERVER_SLEEP)
- except Exception as e:
- log("pop_error-"+str(e))
-
- if len(ContentIDs)>0:
- for ContentID in ContentIDs:
- _try_times = 0
- _process_flag = False
- k = ContentID[0]
- listpage_url = ContentID[1]
- log("begin to getting rule of listpage:"+str(listpage_url))
- data = extractFlow.ruleExtract(listpage_url)
- db.set(k,json.dumps(data,cls=MyEncoder))
- log("done for setting result of listpage:"+str(listpage_url))
-
- except Exception as e:
- log("error"+str(e))
- db.set(k,json.dumps(data,cls=MyEncoder))
- # 短暂等待
- time.sleep(settings.SERVER_SLEEP)
- # if this is the main thread of execution start the model server process
- if __name__ == "__main__":
- if len(sys.argv)>1:
- if "yes" in sys.argv:
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"] = ""
- log("BiddingKG preprocessing Server runing ")
- redis_process()
|