ソースを参照

Merge remote-tracking branch 'origin/master'

luojiehua 1 年間 前
コミット
65f543b9e5

+ 0 - 11
BaseDataMaintenance/fixDoc_to_queue_extract.py

@@ -1,11 +0,0 @@
-
-import sys,os
-
-sys.path.append(os.path.dirname(__file__)+"/..")
-
-from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract,fixDoc_to_queue_init
-
-
-if __name__ == '__main__':
-    # fixDoc_to_queue_extract()
-    fixDoc_to_queue_init(filename="/data/python/flow_init_check/flow_init_2023-12-04.xlsx")

+ 6 - 5
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -753,11 +753,11 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
             current_weight += self.extract_interfaces[_i][1]
             self.extract_interfaces[_i][1] = current_weight/self.whole_weight
 
-        self.comsumer_count = 50
+        self.comsumer_count = 5
         # self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
-        self.pool_redis_doc = ConnectorPool(10,self.comsumer_count,getConnect_redis_doc)
+        self.pool_redis_doc = ConnectorPool(1,self.comsumer_count,getConnect_redis_doc)
         self.conn_mq = getConnect_activateMQ()
-        self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
+        self.pool_mq = ConnectorPool(1,30,getConnect_activateMQ)
         self.block_url = RLock()
         self.url_count = 0
         self.session = None
@@ -771,8 +771,9 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         #     self.list_extract_comsumer.append(listener_extract)
 
         if create_listener:
-            listener_p = Process(target=self.start_extract_listener)
-            listener_p.start()
+            for ii in range(10):
+                listener_p = Process(target=self.start_extract_listener)
+                listener_p.start()
 
     def start_extract_listener(self):
 

+ 124 - 0
BaseDataMaintenance/maintenance/proposedBuilding/pb_project_production.py

@@ -0,0 +1,124 @@
+#encoding:UTF8
+from BaseDataMaintenance.dataSource.pool import ConnectorPool
+from BaseDataMaintenance.dataSource.source import *
+from BaseDataMaintenance.common.Utils import *
+import queue
+from tablestore import *
+from multiprocessing import RLock
+from threading import Thread
+from apscheduler.schedulers.blocking import BlockingScheduler
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from BaseDataMaintenance.model.ots.proposedBuilding_tmp import proposedBuilding_tmp
+from BaseDataMaintenance.model.ots.designed_project import designed_project
+import traceback
+
+
+class DataProduction:
+    def __init__(self):
+        self.done_lock = RLock()
+        self.isDone = False
+        self.proposedBuilding_table = "proposedBuilding_tmp"
+        self.proposedBuilding_table_index = "proposedBuilding_tmp_index"
+        self.ots_client = getConnect_ots()
+
+    def producer(self,task_queue):
+        """
+        :return:生产数据
+        """
+        ots_client = getConnect_ots()
+
+        bool_query = BoolQuery(must_queries=[ExistsQuery("crtime")])
+
+        columns = ["uuid", "crtime", "json_list_group"]
+
+        rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
+                                                                          SearchQuery(bool_query, sort=Sort(sorters=[FieldSort("crtime", SortOrder.DESC)]), limit=100, get_total_count=True),
+                                                                          ColumnsToGet(columns, return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            _proposed = proposedBuilding_tmp(_data)
+            task_queue.put(_proposed,True)
+        _count = len(list_data)
+        while next_token:
+            rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
+                                                                              SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
+                                                                              ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                _proposed = proposedBuilding_tmp(_data)
+                task_queue.put(_proposed,True)
+            _count += len(list_data)
+            if _count>3000:
+                break
+
+    def comsumer(self,task_queue):
+
+        def _handle(_proposed,result_queue,ots_client):
+
+            print(_proposed)
+
+            #修改designed_project
+            _time = time.time()
+            _project_dict = _proposed.toDesigned_project(ots_client)
+            log("toDesigned_project takes %.2fs"%(time.time()-_time))
+
+            try:
+                _time = time.time()
+                if _project_dict is not None:
+                    #更新数据
+                    _designed_project = designed_project(_project_dict)
+                    _designed_project.update_project(ots_client)
+
+                #删除tmp
+                _proposed.delete_row(ots_client)
+                log("update designed takes %.2fs"%(time.time()-_time))
+            except Exception as e:
+                log("comsumer failed cause of %s"%(str(e)))
+                log(traceback.format_exc())
+
+
+        result_queue = queue.Queue()
+
+        mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,ots_client=self.ots_client)
+        mt.run()
+
+    def waitTask(self,task_queue):
+        for i in range(60):
+            if task_queue.qsize()>0:
+                return True
+            else:
+                time.sleep(1)
+        return False
+
+    def maxcompute2ots(self):
+
+        task_queue = queue.Queue()
+
+        self.producer(task_queue)
+
+        # _dict = {"uuid":"12313","crtime":123,
+        #                 "json_list_group":'''
+        #                 [{"docid": 218170163, "shenpi_id": null, "type_id": "218170163", "page_time": "2022-01-26", "province": "广东", "city": "深圳", "district": "罗湖", "tenderee": "深圳市城市建设开发(集团)有限公司", "tenderee_contact": "孙工", "tenderee_phone": "18998998087", "agency": "", "project_code": null, "project_name": "成都市白鹭湾(住宅用地)项目可行性研究", "doctitle": "成都市白鹭湾(住宅用地)项目可行性研究服务招标公告", "docchannel": "52", "stage": "可研阶段", "proportion": "建筑面积为32388.83㎡", "projectDigest": "招标信息),六、开标地点:深圳市罗湖区桂园街道滨河东路1011号鹿丹大厦12层会议室;(鉴于疫情原因,投标人自行决定是否到开标现场开标;如投标人需要到开标现场,请投标人关注、执行深圳市关于疫情的相关规定,并提前2天与招标人进行沟通协商。),七、投标人资格标准等详细内容详见招标文件。招标联系人:孙工;联系电话:18998998087;邮箱:sundh@szcjjt.com;张工;联系电话:13928429353;邮箱:zli@szcjjt.com", "projectAddress": null, "begin_time": null, "end_time": null, "project_name_refind": "成都市白鹭湾(住宅用地)项目可行性研究", "industry": "办公楼", "location": null, "section_num": "-1", "new_enough": 1, "follow_enough": 1}]
+        #                 '''}
+        # task_queue.put(proposedBuilding_tmp(_dict))
+
+        self.comsumer(task_queue)
+
+    def scheduler(self):
+        _scheduler = BlockingScheduler()
+        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1")
+        _scheduler.start()
+
+
+def startSychro():
+    ds = DataProduction()
+    ds.scheduler()
+
+
+if __name__=="__main__":
+    ds = DataProduction()
+    # ds.scheduler()
+    ds.maxcompute2ots()
+
+
+