浏览代码

同步、提取、附件识别增加消费者监控

luojiehua 2 年之前
父节点
当前提交
ffa30ec45d

+ 2 - 1
BaseDataMaintenance/maintenance/dataflow.py

@@ -133,6 +133,7 @@ class Dataflow():
         self.current_path = os.path.dirname(__file__)
 
 
+
     def flow_init(self):
         def producer():
             bool_query = BoolQuery(must_queries=[RangeQuery("crtime",'2022-04-20')])
@@ -2843,4 +2844,4 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate()
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(258925781)
+    df_dump.test_dumplicate(263649800)

+ 40 - 1
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -46,13 +46,24 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         self.comsumer_count = 120
         self.retry_comsumer_count = 10
         self.retry_times = 5
+        self.list_attachment_comsumer = []
         for _i in range(self.comsumer_count):
             listener_attachment = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
             createComsumer(listener_attachment,self.mq_attachment)
+            self.list_attachment_comsumer.append(listener_attachment)
         self.attach_pool = ConnectorPool(10,30,getConnection_postgres)
         self.conn_mq = getConnect_activateMQ()
         self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
 
+    def monitor_listener(self):
+        for i in range(len(self.list_attachment_comsumer)):
+            if self.list_attachment_comsumer[i].conn.is_connected():
+                continue
+            else:
+                listener = ActiveMQListener(getConnect_activateMQ(),self.queue_attachment)
+                createComsumer(listener,self.mq_attachment)
+                self.list_attachment_comsumer[i] = listener
+
 
     def process_failed_attachment(self):
 
@@ -497,6 +508,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         schedule.add_job(self.flow_attachment,"cron",second="*/10")
         schedule.add_job(self.remove_attachment_postgres,"cron",hour="6")
         schedule.add_job(self.process_failed_attachment,"cron",minute="*/10")
+        schedule.add_job(self.monitor_listener,"cron",minute="*/1")
         schedule.start()
 
 class Dataflow_ActivteMQ_extract(Dataflow_extract):
@@ -549,14 +561,26 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         self.url_count = 0
 
         self.comsumer_count = 30
+        self.list_extract_comsumer = []
         for _i in range(self.comsumer_count):
             listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
             createComsumer(listener_extract,self.mq_extract)
+            self.list_extract_comsumer.append(listener_extract)
+
         self.pool_postgres = ConnectorPool(10,self.comsumer_count,getConnection_postgres)
         self.conn_mq = getConnect_activateMQ()
         self.pool_mq = ConnectorPool(10,30,getConnect_activateMQ)
 
 
+    def monitor_listener(self):
+        for i in range(len(self.list_extract_comsumer)):
+            if self.list_extract_comsumer[i].conn.is_connected():
+                continue
+            else:
+                listener = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle)
+                createComsumer(listener,self.mq_extract)
+                self.list_extract_comsumer[i] = listener
+
     def getExtract_url(self):
         _url_num = 0
         with self.block_url:
@@ -852,6 +876,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         schedule.add_job(self.flow_extract_producer,"cron",second="*/20")
         schedule.add_job(self.process_extract_failed,"cron",minute="*/5")
         schedule.add_job(self.delete_document_extract,"cron",hour="*/5")
+        schedule.add_job(self.monitor_listener,"cron",minute="*/5")
         schedule.start()
 
 from multiprocessing import RLock
@@ -892,6 +917,7 @@ class MyEncoder(json.JSONEncoder):
             return obj
         return json.JSONEncoder.default(self, obj)
 
+
 class Dataflow_init(Dataflow):
 
     class InitListener():
@@ -962,9 +988,21 @@ class Dataflow_init(Dataflow):
         self.ots_capacity = getConnect_ots_capacity()
 
         self.init_comsumer_counts = 2
+        self.list_init_comsumer = []
         for i in range(self.init_comsumer_counts):
             listener = self.InitListener(getConnect_activateMQ())
             createComsumer(listener,self.mq_init)
+            self.list_init_comsumer.append(listener)
+
+
+    def monitor_listener(self):
+        for i in range(len(self.list_init_comsumer)):
+            if self.list_init_comsumer[i].conn.is_connected():
+                continue
+            else:
+                listener = self.InitListener(getConnect_activateMQ())
+                createComsumer(listener,self.mq_init)
+                self.list_init_comsumer[i] = listener
 
 
 
@@ -1111,6 +1149,7 @@ class Dataflow_init(Dataflow):
         schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoDaYiTemp({}),),second="*/10")
         schedule.add_job(self.temp2mq,"cron",args=(ZhaoBiaoWenJianTemp({}),),second="*/10")
         schedule.add_job(self.ots2mq,"cron",second="*/10")
+        schedule.add_job(self.monitor_listener,"cron",minute="*/1")
         schedule.start()
 
 
@@ -1330,5 +1369,5 @@ if __name__ == '__main__':
     # de = Dataflow_ActivteMQ_extract()
     # de.start_flow_extract()
     # fixDoc_to_queue_extract()
-    # check_data_synchronization()
+    check_data_synchronization()
     fixDoc_to_queue_init()

+ 36 - 29
BaseDataMaintenance/maintenance/preproject/fillColumns.py

@@ -81,42 +81,49 @@ def delete_wrong_data():
     list_data = []
     task_queue = Queue()
     ots_client = getConnect_ots()
-    # q1 = BoolQuery(must_queries=[
-    #     TermQuery("type",2),
-    #     RangeQuery("crtime",'2022-05-24')
-    # ])
-    # rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
-    #                                                                     SearchQuery(q1,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
-    #                                                                     ColumnsToGet(return_type=ColumnReturnType.NONE))
-    # dict_rows = getRow_ots(rows)
-    # list_data.extend(dict_rows)
-    # _count = len(dict_rows)
-    # while next_token:
-    #     rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
-    #                                                                    SearchQuery(q1,next_token=next_token,get_total_count=True,limit=100),
-    #                                                                    ColumnsToGet(return_type=ColumnReturnType.NONE))
-    #     dict_rows = getRow_ots(rows)
-    #     list_data.extend(dict_rows)
-    #     _count += len(dict_rows)
-    #     print("%d/%d"%(_count,total_count))
-    #     # if _count>10000:
-    #     #     break
-    df = pd.read_csv("fa45de36-6e47-4817-b2d6-e79ea8c154601.csv")
-    for tenderee,product,may_begin,may_end in zip(df["tenderee"],df["product"],df["may_begin"],df["may_end"]):
-        _data = {"tenderee":tenderee,
-                 "product":product,
-                 "may_begin":str(may_begin),
-                 "may_end":str(may_end)}
-        list_data.append(_data)
+    q1 = BoolQuery(must_queries=[
+        WildcardQuery("may_begin","000*"),
+        # RangeQuery("crtime",'2022-05-24')
+    ])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
+                                                                        SearchQuery(q1,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
+                                                                        ColumnsToGet(return_type=ColumnReturnType.ALL))
+    dict_rows = getRow_ots(rows)
+    list_data.extend(dict_rows)
+    _count = len(dict_rows)
+    print("total_count",total_count)
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("preproject","preproject_index",
+                                                                       SearchQuery(q1,next_token=next_token,get_total_count=True,limit=100),
+                                                                       ColumnsToGet(return_type=ColumnReturnType.ALL))
+        dict_rows = getRow_ots(rows)
+        list_data.extend(dict_rows)
+        _count += len(dict_rows)
+        print("%d/%d"%(_count,total_count))
+        # if _count>10000:
+        #     break
+    # df = pd.read_csv("fa45de36-6e47-4817-b2d6-e79ea8c154601.csv")
+    # for tenderee,product,may_begin,may_end in zip(df["tenderee"],df["product"],df["may_begin"],df["may_end"]):
+    #     _data = {"tenderee":tenderee,
+    #              "product":product,
+    #              "may_begin":str(may_begin),
+    #              "may_end":str(may_end)}
+    #     list_data.append(_data)
     for _data in list_data:
-        may_begin = _data.get("may_begin")
-        may_end = _data.get("may_end")
+        # may_begin = _data.get("may_begin")
+        # may_end = _data.get("may_end")
         # if len(may_begin)!=10 or len(may_end)!=10:
         #     task_queue.put(_data)
         task_queue.put(_data)
     def _handle(item,result_queue):
         _preproject = Preproject(item)
         _preproject.delete_row(ots_client)
+        print(item)
+        item["may_begin"] = item["may_begin"].replace("0001","2022")
+        item["may_end"] = item["may_end"].replace("0001","2022")
+        _preproject = Preproject(item)
+        _preproject.update_row(ots_client)
+        print(item)
     log("====%d"%task_queue.qsize())
     mt = MultiThreadHandler(task_queue,_handle,None,30)
     mt.run()

+ 2 - 2
BaseDataMaintenance/maintenance/preproject/remove_dump.py

@@ -46,5 +46,5 @@ def start_drop_preproject_dump():
     scheduler.start()
 
 if __name__ == '__main__':
-    # drop_dump_data()
-    start_drop_preproject_dump()
+    drop_dump_data()
+    # start_drop_preproject_dump()

+ 67 - 2
BaseDataMaintenance/model/ots/document.py

@@ -437,7 +437,72 @@ def drop_extract2():
     mt.run()
 
 
+def fixDocumentHtml():
+    from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity
+    from queue import Queue
+    ots_client = getConnect_ots()
+    from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+    from BaseDataMaintenance.model.ots.document_html import Document_html
+    capacity_client = getConnect_ots_capacity()
+
+    list_data = []
+    bool_query = BoolQuery(must_queries=[
+        MatchPhraseQuery("doctextcon","信友-城市之光"),
+        MatchPhraseQuery("doctextcon","Copyright"),
+        # TermQuery("docid",254249505)
+    ])
+
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),get_total_count=True,limit=100),
+                                                                   columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED))
+
+    print("total_count",total_count)
+    list_data.extend(getRow_ots(rows))
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                       columns_to_get=ColumnsToGet(["doctextcon"],return_type=ColumnReturnType.SPECIFIED))
+
+        list_data.extend(getRow_ots(rows))
+    task_queue = Queue()
+    for _data in list_data:
+        task_queue.put(_data)
+
+    _pattern = "(?P<_find>城市之光.*Ltd.)"
+    _pattern1 = "(?P<_find>Evaluation.*Ltd.)"
+    def _handle(item,result_queue):
+        _doctextcon = item.get("doctextcon")
+
+        _search = re.search(_pattern,_doctextcon)
+        print(_search.groupdict().get("_find"))
+        item["doctextcon"] = re.sub(_pattern,"",_doctextcon)
+
+        _d = Document(item)
+        _d.update_row(ots_client)
+        _d1 = {"partitionkey":item.get("partitionkey"),
+               "docid":item.get("docid")}
+        _dh = Document(_d1)
+        _dh.fix_columns(capacity_client,["dochtmlcon"],True)
+        _dochtmlcon = _dh.getProperties().get("dochtmlcon")
+        _dochtmlcon = re.sub("\n","",_dochtmlcon)
+        _search = re.search(_pattern1,_dochtmlcon)
+        _dochtmlcon = re.sub(_pattern1,"",_dochtmlcon)
+        _d1["dochtmlcon"] = _dochtmlcon
+        _dh = Document(_d1)
+        _dh.update_row(capacity_client)
+        # print(re.sub(_pattern,"</div><p><span>",_dochtmlcon))
+
+
+
+
+    mt = MultiThreadHandler(task_queue,_handle,None,2)
+    mt.run()
+
+
+
+
 if __name__=="__main__":
     # turn_extract_status()
-    turn_document_status()
-    # drop_extract2()
+    # turn_document_status()
+    # drop_extract2()
+    fixDocumentHtml()