#encoding:UTF8 from BaseDataMaintenance.dataSource.pool import ConnectorPool from BaseDataMaintenance.dataSource.source import * from BaseDataMaintenance.common.Utils import * import queue from tablestore import * from multiprocessing import RLock from threading import Thread from apscheduler.schedulers.blocking import BlockingScheduler from BaseDataMaintenance.common.multiThread import MultiThreadHandler from BaseDataMaintenance.model.ots.proposedBuilding_tmp import proposedBuilding_tmp from BaseDataMaintenance.model.ots.designed_project import designed_project import traceback class DataProduction: def __init__(self): self.done_lock = RLock() self.isDone = False self.proposedBuilding_table = "proposedBuilding_tmp" self.proposedBuilding_table_index = "proposedBuilding_tmp_index" self.ots_client = getConnect_ots() def producer(self,task_queue): """ :return:生产数据 """ ots_client = getConnect_ots() bool_query = BoolQuery(must_queries=[ExistsQuery("crtime")]) columns = ["uuid", "crtime", "json_list_group"] rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index, SearchQuery(bool_query, sort=Sort(sorters=[FieldSort("crtime", SortOrder.DESC)]), limit=100, get_total_count=True), ColumnsToGet(columns, return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) for _data in list_data: _proposed = proposedBuilding_tmp(_data) task_queue.put(_proposed,True) _count = len(list_data) while next_token: rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index, SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True), ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED)) list_data = getRow_ots(rows) for _data in list_data: _proposed = proposedBuilding_tmp(_data) task_queue.put(_proposed,True) _count += len(list_data) if _count>3000: break def comsumer(self,task_queue): def _handle(_proposed,result_queue,ots_client): print(_proposed) #修改designed_project _time = time.time() _project_dict = _proposed.toDesigned_project(ots_client) log("toDesigned_project takes %.2fs"%(time.time()-_time)) try: _time = time.time() if _project_dict is not None: #更新数据 _designed_project = designed_project(_project_dict) _designed_project.update_project(ots_client) #删除tmp _proposed.delete_row(ots_client) log("update designed takes %.2fs"%(time.time()-_time)) except Exception as e: log("comsumer failed cause of %s"%(str(e))) log(traceback.format_exc()) result_queue = queue.Queue() mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,ots_client=self.ots_client) mt.run() def waitTask(self,task_queue): for i in range(60): if task_queue.qsize()>0: return True else: time.sleep(1) return False def maxcompute2ots(self): task_queue = queue.Queue() self.producer(task_queue) # _dict = {"uuid":"12313","crtime":123, # "json_list_group":''' # [{"docid": 218170163, "shenpi_id": null, "type_id": "218170163", "page_time": "2022-01-26", "province": "广东", "city": "深圳", "district": "罗湖", "tenderee": "深圳市城市建设开发(集团)有限公司", "tenderee_contact": "孙工", "tenderee_phone": "18998998087", "agency": "", "project_code": null, "project_name": "成都市白鹭湾(住宅用地)项目可行性研究", "doctitle": "成都市白鹭湾(住宅用地)项目可行性研究服务招标公告", "docchannel": "52", "stage": "可研阶段", "proportion": "建筑面积为32388.83㎡", "projectDigest": "招标信息),六、开标地点:深圳市罗湖区桂园街道滨河东路1011号鹿丹大厦12层会议室;(鉴于疫情原因,投标人自行决定是否到开标现场开标;如投标人需要到开标现场,请投标人关注、执行深圳市关于疫情的相关规定,并提前2天与招标人进行沟通协商。),七、投标人资格标准等详细内容详见招标文件。招标联系人:孙工;联系电话:18998998087;邮箱:sundh@szcjjt.com;张工;联系电话:13928429353;邮箱:zli@szcjjt.com", "projectAddress": null, "begin_time": null, "end_time": null, "project_name_refind": "成都市白鹭湾(住宅用地)项目可行性研究", "industry": "办公楼", "location": null, "section_num": "-1", "new_enough": 1, "follow_enough": 1}] # '''} # task_queue.put(proposedBuilding_tmp(_dict)) self.comsumer(task_queue) def scheduler(self): _scheduler = BlockingScheduler() _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1") _scheduler.start() def startSychro(): ds = DataProduction() ds.scheduler() if __name__=="__main__": ds = DataProduction() # ds.scheduler() ds.maxcompute2ots()