pb_project_production.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #encoding:UTF8
  2. from BaseDataMaintenance.dataSource.pool import ConnectorPool
  3. from BaseDataMaintenance.dataSource.source import *
  4. from BaseDataMaintenance.common.Utils import *
  5. import queue
  6. from tablestore import *
  7. from multiprocessing import RLock
  8. from threading import Thread
  9. from apscheduler.schedulers.blocking import BlockingScheduler
  10. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  11. from BaseDataMaintenance.model.ots.proposedBuilding_tmp import proposedBuilding_tmp
  12. from BaseDataMaintenance.model.ots.designed_project import designed_project
  13. import traceback
  14. class DataProduction:
  15. def __init__(self):
  16. self.done_lock = RLock()
  17. self.isDone = False
  18. self.proposedBuilding_table = "proposedBuilding_tmp"
  19. self.proposedBuilding_table_index = "proposedBuilding_tmp_index"
  20. self.ots_client = getConnect_ots()
  21. def producer(self,task_queue):
  22. """
  23. :return:生产数据
  24. """
  25. ots_client = getConnect_ots()
  26. bool_query = BoolQuery(must_queries=[ExistsQuery("crtime")])
  27. columns = ["uuid", "crtime", "json_list_group"]
  28. rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
  29. SearchQuery(bool_query, sort=Sort(sorters=[FieldSort("crtime", SortOrder.DESC)]), limit=100, get_total_count=True),
  30. ColumnsToGet(columns, return_type=ColumnReturnType.SPECIFIED))
  31. list_data = getRow_ots(rows)
  32. for _data in list_data:
  33. _proposed = proposedBuilding_tmp(_data)
  34. task_queue.put(_proposed,True)
  35. _count = len(list_data)
  36. while next_token:
  37. rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
  38. SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
  39. ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  40. list_data = getRow_ots(rows)
  41. for _data in list_data:
  42. _proposed = proposedBuilding_tmp(_data)
  43. task_queue.put(_proposed,True)
  44. _count += len(list_data)
  45. if _count>3000:
  46. break
  47. def comsumer(self,task_queue):
  48. def _handle(_proposed,result_queue,ots_client):
  49. print(_proposed)
  50. #修改designed_project
  51. _time = time.time()
  52. _project_dict = _proposed.toDesigned_project(ots_client)
  53. log("toDesigned_project takes %.2fs"%(time.time()-_time))
  54. try:
  55. _time = time.time()
  56. if _project_dict is not None:
  57. #更新数据
  58. _designed_project = designed_project(_project_dict)
  59. _designed_project.update_project(ots_client)
  60. #删除tmp
  61. _proposed.delete_row(ots_client)
  62. log("update designed takes %.2fs"%(time.time()-_time))
  63. except Exception as e:
  64. log("comsumer failed cause of %s"%(str(e)))
  65. log(traceback.format_exc())
  66. result_queue = queue.Queue()
  67. mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,ots_client=self.ots_client)
  68. mt.run()
  69. def waitTask(self,task_queue):
  70. for i in range(60):
  71. if task_queue.qsize()>0:
  72. return True
  73. else:
  74. time.sleep(1)
  75. return False
  76. def maxcompute2ots(self):
  77. task_queue = queue.Queue()
  78. self.producer(task_queue)
  79. # _dict = {"uuid":"12313","crtime":123,
  80. # "json_list_group":'''
  81. # [{"docid": 218170163, "shenpi_id": null, "type_id": "218170163", "page_time": "2022-01-26", "province": "广东", "city": "深圳", "district": "罗湖", "tenderee": "深圳市城市建设开发(集团)有限公司", "tenderee_contact": "孙工", "tenderee_phone": "18998998087", "agency": "", "project_code": null, "project_name": "成都市白鹭湾(住宅用地)项目可行性研究", "doctitle": "成都市白鹭湾(住宅用地)项目可行性研究服务招标公告", "docchannel": "52", "stage": "可研阶段", "proportion": "建筑面积为32388.83㎡", "projectDigest": "招标信息),六、开标地点:深圳市罗湖区桂园街道滨河东路1011号鹿丹大厦12层会议室;(鉴于疫情原因,投标人自行决定是否到开标现场开标;如投标人需要到开标现场,请投标人关注、执行深圳市关于疫情的相关规定,并提前2天与招标人进行沟通协商。),七、投标人资格标准等详细内容详见招标文件。招标联系人:孙工;联系电话:18998998087;邮箱:sundh@szcjjt.com;张工;联系电话:13928429353;邮箱:zli@szcjjt.com", "projectAddress": null, "begin_time": null, "end_time": null, "project_name_refind": "成都市白鹭湾(住宅用地)项目可行性研究", "industry": "办公楼", "location": null, "section_num": "-1", "new_enough": 1, "follow_enough": 1}]
  82. # '''}
  83. # task_queue.put(proposedBuilding_tmp(_dict))
  84. self.comsumer(task_queue)
  85. def scheduler(self):
  86. _scheduler = BlockingScheduler()
  87. _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1")
  88. _scheduler.start()
  89. def startSychro():
  90. ds = DataProduction()
  91. ds.scheduler()
  92. if __name__=="__main__":
  93. ds = DataProduction()
  94. # ds.scheduler()
  95. ds.maxcompute2ots()