Browse Source

增加重复入库提取监控,填充近期提取结果

luojiehua 2 years ago
parent
commit
d71ee03a8a

+ 5 - 1
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -248,6 +248,10 @@ class BaseDataMonitor():
                 log(_cmd)
                 success_count = self.cmd_execute(_cmd)
 
+                _cmd = 'cat %s | grep "%s" | grep -c "fingerprint.*exists docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
+                log(_cmd)
+                exists_count = self.cmd_execute(_cmd)
+
                 _cmd = 'cat %s | grep -c "%s.*delete sql"'%(flow_init_path,self.get_last_tenmin_time())
                 log(_cmd)
                 init_count = self.cmd_execute(_cmd)
@@ -270,7 +274,7 @@ class BaseDataMonitor():
                 #                                                                              SearchQuery(query,None,True),
                 #                                                                              columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-                _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count))
+                _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -824,7 +824,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                     ackMsg(conn,message_id,subscription)
 
 
-    def delete_document_extract(self,save_count=50*10000):
+    def delete_document_extract(self,save_count=70*10000):
         conn = self.pool_postgres.getConnector()
         try:
             cursor = conn.cursor()

+ 43 - 3
BaseDataMaintenance/model/postgres/document_extract.py

@@ -37,9 +37,7 @@ class Document_extract_postgres(BaseModel):
     # def delete_row(self,ots_client):
     #     raise NotImplementedError()
 
-
-
-if __name__=="__main__":
+def test():
     from BaseDataMaintenance.dataSource.pool import ConnectorPool
     from BaseDataMaintenance.dataSource.source import getConnection_postgres
 
@@ -55,3 +53,45 @@ if __name__=="__main__":
     conn = pool.getConnector()
     list_extract = Document_extract_postgres.select_rows(conn,Document_extract_postgres,"document_extract",[" fingerprint='%s'"%"md5=354bbc7cdbab7f63f53fb31331a78f25"])
     print("=",list_extract[0].getProperties().get(document_extract_extract_json),"=")
+
+from tablestore import *
+def fix_document_extract():
+
+    def _handle(item,result_queue):
+        de = Document_extract_postgres(item)
+        de.insert_row(pool_postgres)
+    from BaseDataMaintenance.dataSource.pool import ConnectorPool
+    from BaseDataMaintenance.dataSource.source import getConnection_postgres,getConnect_ots
+    from queue import Queue
+    from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+
+    pool_postgres = ConnectorPool(10,20,getConnection_postgres)
+    task_queue = Queue()
+
+    ots_client = getConnect_ots()
+    bool_query = BoolQuery(must_queries=[RangeQuery("crtime","2022-08-22"),
+                                         ])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.ASC)]),limit=100,get_total_count=True),
+                                                                   ColumnsToGet(column_names=["fingerprint","extract_json"],return_type=ColumnReturnType.SPECIFIED))
+    print(total_count)
+    list_data = getRow_ots(rows)
+    print(list_data[0])
+    for _data in list_data:
+        task_queue.put(_data)
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                       ColumnsToGet(column_names=["fingerprint","extract_json"],return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        print("%d/%d"%(task_queue.qsize(),total_count))
+        for _data in list_data:
+            task_queue.put(_data)
+
+    mt = MultiThreadHandler(task_queue,_handle,None,20)
+    mt.run()
+
+if __name__=="__main__":
+    pass
+    fix_document_extract()
+