Ver Fonte

公告去重文档指纹不限时间,只要存在状态正常且文档指纹一样的公告即去重

luojiehua há 3 meses atrás
pai
commit
9fa2fc346f

+ 73 - 51
BaseDataMaintenance/interface/project_merge_interface.py

@@ -1,4 +1,7 @@
 
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/../..")
 
 from BaseDataMaintenance.maintenance.dataflow import Dataflow_dumplicate,log
 
@@ -10,6 +13,7 @@ flow = Dataflow_dumplicate(start_delete_listener=False)
 from BaseDataMaintenance.common.Utils import uniform_package_name
 import json
 import re
+import traceback
 
 
 def merge_document_interface(item,b_log=False):
@@ -21,54 +25,67 @@ def merge_document_interface(item,b_log=False):
     :return:
     '''
     try:
-
-        _proj = {
-            "page_time":item.get("page_time"),
-            "project_codes":item.get("project_codes"),
-            "project_name":item.get("project_name"),
-            "tenderee":item.get("tenderee"),
-            "agency":item.get("agency"),
-            "product":item.get("product"),
-            "sub_project_name":item.get("sub_project_name"),
-            "bidding_budget":item.get("bidding_budget"),
-            "win_tenderer":item.get("win_tenderer"),
-            "win_bid_price":item.get("win_bid_price"),
-            "province":item.get("province"),
-            "city":item.get("city"),
-            "district":item.get("district"),
-            "zhao_biao_page_time":item.get("zhao_biao_page_time"),
-            "zhong_biao_page_time":item.get("zhong_biao_page_time"),
-            "enterprise":item.get("enterprise"),
-            "detail_link":item.get("detail_link"),
-            "doctitle":item.get("doctitle"),
-
-        }
-
-        if _proj.get("province"):
-            _proj["province"] = re.sub("省","",str(_proj["province"]))
-        if _proj.get("city"):
-            if len(str(_proj["city"]))>2:
-                _proj["city"] = re.sub("市","",str(_proj["city"]))
-        if _proj.get("district"):
-            if len(str(_proj["district"]))>2:
-                _proj["district"] = re.sub("区|县|镇","",str(_proj["district"]))
-
-        _proj["sub_project_name"] = uniform_package_name(_proj["sub_project_name"])
-
-        enterprise = _proj.get("enterprise","")
-        list_enterprise = enterprise.split(",") if enterprise else []
-        enterprise = {"nlp_enterprise":list_enterprise}
-        _proj["enterprise"] = json.dumps(enterprise,ensure_ascii= False)
-
-        list_projects = flow.merge_projects([_proj],b_log=b_log)
-        if len(list_projects)>0:
-            uuids = list_projects[0].get("uuid","")
-            if uuids:
-                l_uuid = uuids.split(",")
-                if l_uuid:
-                    return l_uuid[0]
+        if isinstance(item,dict):
+            log("merge_document_interface %s"%json.dumps(item,ensure_ascii=False))
+            _proj = {
+                "page_time":item.get("page_time",""),
+                "project_codes":item.get("project_codes",""),
+                "project_name":item.get("project_name",""),
+                "tenderee":item.get("tenderee",""),
+                "agency":item.get("agency",""),
+                "product":item.get("product",""),
+                "sub_project_name":item.get("sub_project_name",""),
+                "bidding_budget":item.get("bidding_budget","0"),
+                "win_tenderer":item.get("win_tenderer",""),
+                "win_bid_price":item.get("win_bid_price","0"),
+                "province":item.get("province",""),
+                "city":item.get("city",""),
+                "district":item.get("district",""),
+                "zhao_biao_page_time":item.get("zhao_biao_page_time",""),
+                "zhong_biao_page_time":item.get("zhong_biao_page_time",""),
+                "enterprise":item.get("enterprise",""),
+                "detail_link":item.get("detail_link",""),
+                "doctitle":item.get("doctitle",""),
+
+            }
+
+            if _proj.get("page_time")=="":
+                raise RuntimeError("page_time参数不能为空")
+
+            if _proj.get("province"):
+                _proj["province"] = re.sub("省","",str(_proj["province"]))
+            if _proj.get("city"):
+                if len(str(_proj["city"]))>2:
+                    _proj["city"] = re.sub("市","",str(_proj["city"]))
+            if _proj.get("district"):
+                if len(str(_proj["district"]))>2:
+                    _proj["district"] = re.sub("区|县|镇","",str(_proj["district"]))
+
+            if _proj.get("bidding_budget"):
+                _proj["bidding_budget"] = float(_proj["bidding_budget"])
+
+            if _proj.get("win_bid_price"):
+                _proj["win_bid_price"] = float(_proj["win_bid_price"])
+
+            _proj["sub_project_name"] = uniform_package_name(_proj["sub_project_name"])
+
+            enterprise = _proj.get("enterprise","")
+            list_enterprise = enterprise.split(",") if enterprise else []
+            enterprise = {"nlp_enterprise":list_enterprise}
+            _proj["enterprise"] = json.dumps(enterprise,ensure_ascii= False)
+
+            list_projects = flow.merge_projects([_proj],b_log=b_log)
+            if len(list_projects)>0:
+                uuids = list_projects[0].get("uuid","")
+                if uuids:
+                    l_uuid = uuids.split(",")
+                    if l_uuid:
+                        return l_uuid[0]
+        else:
+            raise RuntimeError("参数应该以字典方式传递")
     except Exception as e:
-        raise RuntimeError("error on dumplicate")
+        traceback.print_exc()
+        raise RuntimeError("error: %s"%(str(e)))
 
 
 import os
@@ -89,6 +106,7 @@ def embedding():
             project_uuid = ""
         _r["project_uuid"] = project_uuid
     except Exception as e:
+        traceback.print_exc()
         _r["success"] = False
         _r["msg"] = str(e)
 
@@ -97,10 +115,7 @@ def embedding():
 def start_project_merge_server():
     app.run(host="0.0.0.0",port="15010",debug=False)
 
-if __name__ == '__main__':
-
-    # start_project_merge_server()
-
+def test():
     _proj = {
         "page_time":"2025-01-14",
         "project_codes":"SHX-ZB-2024-01013-07",
@@ -130,3 +145,10 @@ if __name__ == '__main__':
     # print(merge_document_interface(_proj,b_log=True))
 
 
+if __name__ == '__main__':
+
+    # start_project_merge_server()
+    test()
+
+
+

+ 16 - 3
BaseDataMaintenance/maintenance/dataflow.py

@@ -4030,6 +4030,17 @@ class Dataflow_dumplicate(Dataflow):
             return True
         return False
 
+    def exists_normal_fingerprint(self,_fingerprint):
+        query = BoolQuery(must_queries=[
+            RangeQuery("status",201,301),
+            TermQuery("fingerprint",_fingerprint)
+        ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                            SearchQuery(query,get_total_count=True,limit=1))
+        if total_count>0:
+            return True
+        return False
+
 
     def check_page_time(self,item):
         page_time = item.get(document_page_time,"")
@@ -4132,6 +4143,8 @@ class Dataflow_dumplicate(Dataflow):
             final_list = self.dumplicate_fianl_check(base_list,b_log)
 
             exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),table_name)
+            exist_normal_fingerprint = self.exists_normal_fingerprint(item.get(document_tmp_fingerprint))
+            # print("exist_normal_fingerprint",exist_normal_fingerprint)
             # log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
             best_docid = self.get_best_docid(final_list)
 
@@ -4156,7 +4169,7 @@ class Dataflow_dumplicate(Dataflow):
             remove_list = []
 
 
-            if (self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
+            if (self.check_page_time(item) and not exist_normal_fingerprint  and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
                 dtmp.setValue(document_tmp_save,1,True)
                 # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
                 dmp_docid = ",".join([str(a) for a in list(dup_docid)])
@@ -4405,7 +4418,7 @@ class Dataflow_dumplicate(Dataflow):
         list_dict = getRow_ots(rows)
 
         for item in list_dict:
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=True)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
             return
 
     def test_merge(self,list_docid_less,list_docid_greater):
@@ -4581,7 +4594,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(583564377
+    df_dump.test_dumplicate(400068967697
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 1 - 0
BaseDataMaintenance/start_project_merge_server.py

@@ -2,6 +2,7 @@
 
 import sys
 import os
+sys.path.append(os.path.dirname(__file__)+"/..")
 from BaseDataMaintenance.interface.project_merge_interface import start_project_merge_server
 
 if __name__ == '__main__':