Parcourir la source

项目合并时的公告去重逻辑优化,复用去重逻辑

luojiehua il y a 3 mois
Parent
commit
a0c07aad36

+ 4 - 6
BaseDataMaintenance/maintenance/dataflow.py

@@ -4126,11 +4126,9 @@ class Dataflow_dumplicate(Dataflow):
     def exists_normal_fingerprint(self,_fingerprint,docid):
         query = BoolQuery(must_queries=[
             RangeQuery("status",201,301),
-            TermQuery("fingerprint",_fingerprint)
-        ],
-            must_not_queries=[
-                TermQuery("docid",docid)
-            ]
+            TermQuery("fingerprint",_fingerprint),
+            RangeQuery("docid",0,docid-400000),
+        ]
         )
         rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
                                                                             SearchQuery(query,get_total_count=True,limit=1))
@@ -4884,7 +4882,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(400072861261
+    df_dump.test_dumplicate(597909937
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 28 - 7
BaseDataMaintenance/model/ots/document_tmp.py

@@ -267,8 +267,9 @@ def turn_document_tmp_status():
         bool_query = BoolQuery(
             must_queries=[
                 # TermQuery("fingerprint","md5=2cc044b81ec13acddcc970b71b780365")
-                # TermQuery("save",66),
-                RangeQuery("status",1,51),
+                # TermQuery("save",0),
+                RangeQuery("crtime","2025-03-05 09:30:00")
+                # RangeQuery("status",1,51),
                 # BoolQuery(should_queries=[
                 #                           # TermQuery("tenderee","山西利民工业有限责任公司"),
                 #                           # MatchPhraseQuery("doctitle","中国电信"),
@@ -290,7 +291,7 @@ def turn_document_tmp_status():
 
         rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+                                                                       columns_to_get=ColumnsToGet(column_names=["best_docid","dup_docid"],return_type=ColumnReturnType.SPECIFIED))
         list_data = getRow_ots(rows)
         print(total_count)
         # print(list_data)
@@ -298,11 +299,10 @@ def turn_document_tmp_status():
         for _data in list_data:
             _document = Document_tmp(_data)
             task_queue.put(_document)
-        print(list_data)
         while next_token:
             rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+                                                                           columns_to_get=ColumnsToGet(column_names=["best_docid","dup_docid"],return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
             _count += len(list_data)
             print("%d/%d"%(_count,total_count))
@@ -328,6 +328,7 @@ def turn_document_tmp_status():
         log("task_queue size:%d"%(task_queue.qsize()))
 
     def _handle(item,result_queue,ots_client):
+        from BaseDataMaintenance.model.ots.document import Document
         #change attach value
         # list_attachment = json.loads(item.getProperties().get(document_tmp_attachment_path))
         # print("docid",item.getProperties().get(document_tmp_docid))
@@ -344,18 +345,38 @@ def turn_document_tmp_status():
         # item.setValue(document_tmp_attachment_path,json.dumps(list_attachment,ensure_ascii=False),True)
         # item.all_columns.remove(document_tmp_dochtmlcon)
 
+        best_docid = item.getProperties().get(document_tmp_best_docid,"")
+        if best_docid==-1:
+            dup_docid = item.getProperties().get(document_tmp_dup_docid,"")
+            list_docid = [item.getProperties().get(document_tmp_docid,"")]
+            for _id in dup_docid.split(","):
+                if _id!="":
+                    list_docid.append(int(_id))
+            for docid in list_docid:
+                _d = {
+                    document_tmp_partitionkey:docid%500+1,
+                    document_tmp_docid:docid,
+                }
+                _document = Document(_d)
+                if _document.fix_columns(ots_client,[document_tmp_status],True):
+                    if _document.getProperties().get("status",0)>=401:
+                        _document.setValue(document_tmp_status,1,True)
+
+                        print(_d)
+                        _document.update_row(ots_client)
+
         #change status
         # item.setValue(document_tmp_docchannel,item.getProperties().get(document_tmp_original_docchannel),True)
         # _extract_json = item.getProperties().get(document_tmp_extract_json,"")
         # _extract_json = _extract_json.replace("\x06", "").replace("\x05", "").replace("\x07", "").replace('\\', '')
         # item.setValue(document_tmp_extract_json,_extract_json,True)
         # json.loads(_extract_json)
-        item.setValue(document_tmp_status,0,True)
+        # item.setValue(document_tmp_status,0,True)
         # item.setValue(document_tmp_save,1,True)
         # if item.exists_row(ots_client):
         #     item.update_row(ots_client)
         # print(item.getProperties())
-        item.update_row(ots_client)
+        # item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
         # item.delete_row(ots_client)
         # from BaseDataMaintenance.model.ots.document import Document