run_process_server.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. '''
  2. Created on 2019年7月29日
  3. @author: User
  4. '''
  5. import sys
  6. import os
  7. import json
  8. sys.path.append(os.path.abspath(".."))
  9. import logging
  10. import time
  11. import redis
  12. import settings
  13. import numpy as np
  14. import extractFlow
  15. from module.Utils import log
  16. #自定义jsonEncoder
  17. class MyEncoder(json.JSONEncoder):
  18. def default(self, obj):
  19. if isinstance(obj, np.ndarray):
  20. return obj.tolist()
  21. elif isinstance(obj, bytes):
  22. return str(obj, encoding='utf-8')
  23. elif isinstance(obj, (np.float_, np.float16, np.float32,
  24. np.float64)):
  25. return float(obj)
  26. return json.JSONEncoder.default(self, obj)
  27. # 连接redis数据库
  28. db = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT,
  29. db=settings.REDIS_DB,password=settings.REDIS_PASS)
  30. DATA_ERROR = json.dumps({"flag":False,"success":False},cls=MyEncoder)
  31. #循环从redis队列中取数据处理,处理结果保存到redis
  32. def redis_process():
  33. MAX_LEN = 0
  34. MAX_TRY_TIMES = 2
  35. while(True):
  36. try:
  37. #拿到队列,若需要多进程处理,需要改成带锁机制的弹出
  38. #queue = db.lrange(settings.CONTENT_QUEUE,0,settings.BATCH_SIZE-1)
  39. step = 0
  40. ContentIDs = []
  41. while(True):
  42. try:
  43. queue = db.lpop(settings.CONTENT_QUEUE)
  44. if queue:
  45. q = json.loads(queue.decode("utf-8"))
  46. ContentIDs.append([q['id'],q['listpage_url']])
  47. break
  48. else:
  49. time.sleep(settings.SERVER_SLEEP)
  50. except Exception as e:
  51. log("pop_error-"+str(e))
  52. if len(ContentIDs)>0:
  53. for ContentID in ContentIDs:
  54. _try_times = 0
  55. _process_flag = False
  56. k = ContentID[0]
  57. listpage_url = ContentID[1]
  58. log("begin to getting rule of listpage:"+str(listpage_url))
  59. data = extractFlow.ruleExtract(listpage_url)
  60. db.set(k,json.dumps(data,cls=MyEncoder))
  61. log("done for setting result of listpage:"+str(listpage_url))
  62. except Exception as e:
  63. log("error"+str(e))
  64. db.set(k,json.dumps(data,cls=MyEncoder))
  65. # 短暂等待
  66. time.sleep(settings.SERVER_SLEEP)
  67. # if this is the main thread of execution start the model server process
  68. if __name__ == "__main__":
  69. if len(sys.argv)>1:
  70. if "yes" in sys.argv:
  71. os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
  72. os.environ["CUDA_VISIBLE_DEVICES"] = ""
  73. log("BiddingKG preprocessing Server runing ")
  74. redis_process()