123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- #encoding:UTF8
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- from BaseDataMaintenance.dataSource.source import *
- from BaseDataMaintenance.common.Utils import *
- import queue
- from tablestore import *
- from multiprocessing import RLock
- from threading import Thread
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.model.ots.proposedBuilding_tmp import proposedBuilding_tmp
- from BaseDataMaintenance.model.ots.designed_project import designed_project
- import traceback
- class DataProduction:
- def __init__(self):
- self.done_lock = RLock()
- self.isDone = False
- self.proposedBuilding_table = "proposedBuilding_tmp"
- self.proposedBuilding_table_index = "proposedBuilding_tmp_index"
- self.ots_client = getConnect_ots()
- def producer(self,task_queue):
- """
- :return:生产数据
- """
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[ExistsQuery("crtime")])
- columns = ["uuid", "crtime", "json_list_group"]
- rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
- SearchQuery(bool_query, sort=Sort(sorters=[FieldSort("crtime", SortOrder.DESC)]), limit=100, get_total_count=True),
- ColumnsToGet(columns, return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _data in list_data:
- _proposed = proposedBuilding_tmp(_data)
- task_queue.put(_proposed,True)
- _count = len(list_data)
- while next_token:
- rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
- SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
- ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _data in list_data:
- _proposed = proposedBuilding_tmp(_data)
- task_queue.put(_proposed,True)
- _count += len(list_data)
- if _count>3000:
- break
- def comsumer(self,task_queue):
- def _handle(_proposed,result_queue,ots_client):
- print(_proposed)
- #修改designed_project
- _time = time.time()
- _project_dict = _proposed.toDesigned_project(ots_client)
- log("toDesigned_project takes %.2fs"%(time.time()-_time))
- try:
- _time = time.time()
- if _project_dict is not None:
- #更新数据
- _designed_project = designed_project(_project_dict)
- _designed_project.update_project(ots_client)
- #删除tmp
- _proposed.delete_row(ots_client)
- log("update designed takes %.2fs"%(time.time()-_time))
- except Exception as e:
- log("comsumer failed cause of %s"%(str(e)))
- log(traceback.format_exc())
- result_queue = queue.Queue()
- mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,ots_client=self.ots_client)
- mt.run()
- def waitTask(self,task_queue):
- for i in range(60):
- if task_queue.qsize()>0:
- return True
- else:
- time.sleep(1)
- return False
- def maxcompute2ots(self):
- task_queue = queue.Queue()
- self.producer(task_queue)
- # _dict = {"uuid":"12313","crtime":123,
- # "json_list_group":'''
- # [{"docid": 218170163, "shenpi_id": null, "type_id": "218170163", "page_time": "2022-01-26", "province": "广东", "city": "深圳", "district": "罗湖", "tenderee": "深圳市城市建设开发(集团)有限公司", "tenderee_contact": "孙工", "tenderee_phone": "18998998087", "agency": "", "project_code": null, "project_name": "成都市白鹭湾(住宅用地)项目可行性研究", "doctitle": "成都市白鹭湾(住宅用地)项目可行性研究服务招标公告", "docchannel": "52", "stage": "可研阶段", "proportion": "建筑面积为32388.83㎡", "projectDigest": "招标信息),六、开标地点:深圳市罗湖区桂园街道滨河东路1011号鹿丹大厦12层会议室;(鉴于疫情原因,投标人自行决定是否到开标现场开标;如投标人需要到开标现场,请投标人关注、执行深圳市关于疫情的相关规定,并提前2天与招标人进行沟通协商。),七、投标人资格标准等详细内容详见招标文件。招标联系人:孙工;联系电话:18998998087;邮箱:sundh@szcjjt.com;张工;联系电话:13928429353;邮箱:zli@szcjjt.com", "projectAddress": null, "begin_time": null, "end_time": null, "project_name_refind": "成都市白鹭湾(住宅用地)项目可行性研究", "industry": "办公楼", "location": null, "section_num": "-1", "new_enough": 1, "follow_enough": 1}]
- # '''}
- # task_queue.put(proposedBuilding_tmp(_dict))
- self.comsumer(task_queue)
- def scheduler(self):
- _scheduler = BlockingScheduler()
- _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1")
- _scheduler.start()
- def startSychro():
- ds = DataProduction()
- ds.scheduler()
- if __name__=="__main__":
- ds = DataProduction()
- # ds.scheduler()
- ds.maxcompute2ots()
|