ソースを参照

完善去重规则以及项目合并数据监控

luojiehua 1 年間 前
コミット
4c33fb0c9e

+ 12 - 13
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -611,19 +611,18 @@ class BaseDataMonitor():
     def start_monitor(self):
         #数据监控
 
-        # scheduler = BlockingScheduler()
-        #
-        # # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
-        # scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
-        # scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
-        # # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
-        # scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
-        # scheduler.add_job(self.monitor_preproject,"cron",hour="8")
-        # scheduler.add_job(self.monitor_merge,"cron",hour="*/1")
-        # scheduler.add_job(self.monitor_init,"cron",hour="*/3")
-        # scheduler.start()
-
-        self.monitor_merge()
+        scheduler = BlockingScheduler()
+
+        # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
+        # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
+        scheduler.add_job(self.monitor_preproject,"cron",hour="8")
+        scheduler.add_job(self.monitor_merge,"cron",minute="*/30")
+        scheduler.add_job(self.monitor_init,"cron",hour="*/3")
+        scheduler.start()
+
 
     def start_attach_monitor(self):
         #附件监控

+ 113 - 74
BaseDataMaintenance/maintenance/dataflow.py

@@ -1593,7 +1593,7 @@ class Dataflow():
 
         def producer():
             current_date = getCurrent_date("%Y-%m-%d")
-            tmp_date = timeAdd(current_date,-4)
+            tmp_date = timeAdd(current_date,-10)
             bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
                                                  RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
@@ -2244,8 +2244,7 @@ class Dataflow_dumplicate(Dataflow):
                 continue
             for _j in range(min(_i,10)):
                 _dict2 = base_list[_j]
-                _prob = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
-                print("_prob:",_prob)
+                _prob,day_dis = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
                 if _prob<=0.1:
                     _pass = False
                     break
@@ -2320,7 +2319,19 @@ class Dataflow_dumplicate(Dataflow):
         if web_source_no_less==web_source_no_greater=="17397-3":
             hard_level=2
 
-        return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level)
+        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level)
+
+        pagetime_stamp_less = getTimeStamp(page_time_less)
+        pagetime_stamp_greater = getTimeStamp(page_time_greater)
+        
+        day_dis = abs(pagetime_stamp_greater-pagetime_stamp_less)//86400
+        if day_dis>7:
+            _prob = 0
+        elif day_dis>3:
+            if _prob<0.4:
+                _prob = 0
+
+        return _prob,day_dis
 
 
     def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
@@ -2525,7 +2536,7 @@ class Dataflow_dumplicate(Dataflow):
                 return 0
         return _prob
 
-    def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
+    def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
 
         for _ in range(retry_times):
             try:
@@ -2548,14 +2559,13 @@ class Dataflow_dumplicate(Dataflow):
                     else:
                         if _docid!=item.get(document_tmp_docid):
                             _time1 = time.time()
-                            confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
+                            confidence,day_dis = self.dumplicate_check(item,_dict,total_count,b_log=b_log)
                             check_time+= time.time()-_time1
 
                             _dict["confidence"] = confidence
                             _dict["min_counts"] = total_count
 
-                            if not confidence<0.1:
-                                list_data.append(_dict)
+                            list_data.append(_dict)
                 all_time = time.time()-_time
                 # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
                 return list_data
@@ -2563,12 +2573,15 @@ class Dataflow_dumplicate(Dataflow):
                 traceback.print_exc()
         return []
 
-    def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
-        list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
+    def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
+        list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns,b_log=b_log)
         for _dict in list_dict:
             _docid = _dict.get(document_tmp_docid)
             confidence = _dict["confidence"]
-            print("confidence",_docid,confidence)
+
+            if b_log:
+                log("confidence %d %.3f"%(_docid,confidence))
+
             if confidence>0.1:
                 if _docid not in set_docid:
                     base_list.append(_dict)
@@ -2581,7 +2594,7 @@ class Dataflow_dumplicate(Dataflow):
                 return
         _dict.update(base_dict)
         if b_log:
-            log(str(_dict))
+            log("rule dict:"+str(_dict))
         _query = self.generate_dumplicate_query(_dict,must_not_dict)
         _rule = {"confidence":confidence,
                  "item":item,
@@ -2591,22 +2604,25 @@ class Dataflow_dumplicate(Dataflow):
                  "multiNum_keys":[]}
         list_rules.append(_rule)
 
-    def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False):
+    def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False,day_dis=7):
         docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
         current_date = getCurrent_date("%Y-%m-%d")
         if page_time=='':
             page_time = current_date
 
+        two_day_dict = {"page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]}
+
         if page_time>=timeAdd(current_date,-2):
             table_name = "document_tmp"
             table_index = "document_tmp_index"
             base_dict = {
                 "docchannel":item.get("docchannel",52),
                 "status":[status_from[0]],
-                "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
+                "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
             }
             must_not_dict = {"save":0,"docid":item.get("docid")}
             doctitle_refine_name = "doctitle_refine"
+
         else:
             table_name = "document"
             table_index = "document_index"
@@ -2617,7 +2633,7 @@ class Dataflow_dumplicate(Dataflow):
             base_dict = {
                 "docchannel":item["docchannel"],
                 "status":_status,
-                "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
+                "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
             }
             must_not_dict = {"docid":item.get("docid")}
             doctitle_refine_name = "doctitle"
@@ -2673,82 +2689,95 @@ class Dataflow_dumplicate(Dataflow):
                  "bidding_budget":bidding_budget}
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        confidence=85
-        _dict = {"tenderee":tenderee,
-                 "agency":agency
-                 }
-        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
         _dict = {"tenderee":tenderee,
                  "project_codes":project_code
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
         _dict = {"tenderee":tenderee,
-                 "project_name":project_name
+                 "win_bid_price":win_bid_price
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        if getLength(product)>0:
-            l_p = product.split(",")
-            _dict = {"tenderee":tenderee,
-                     "product":l_p[0]
-                     }
-            self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+        _dict = {"agency":agency,
+                 "project_codes":project_code
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"tenderee":tenderee,
-                 "win_tenderer":win_tenderer
+        _dict = {"win_tenderer":win_tenderer,
+                 "bidding_budget":bidding_budget
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"tenderee":tenderee,
+        _dict = {"project_codes":project_code,
                  "win_bid_price":win_bid_price
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"tenderee":tenderee,
+        _dict = {"project_codes":project_code,
                  "bidding_budget":bidding_budget
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"tenderee":tenderee,
+        _dict = {"project_codes":project_code,
                  doctitle_refine_name:doctitle_refine
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"agency":agency,
-                 "project_codes":project_code
+        _dict = {"tenderee":tenderee,
+                 "bidding_budget":bidding_budget
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"agency":agency,
-                 "project_name":project_name
+        _dict = {"project_codes":project_code,
+                 "win_tenderer":win_tenderer
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"project_codes":project_code,
+        base_dict.update(two_day_dict)
+
+        confidence=85
+        _dict = {"tenderee":tenderee,
+                 "agency":agency
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
+        _dict = {"tenderee":tenderee,
                  "project_name":project_name
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"project_codes":project_code,
+        if getLength(product)>0:
+            l_p = product.split(",")
+            _dict = {"tenderee":tenderee,
+                     "product":l_p[0]
+                     }
+            self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
+        _dict = {"tenderee":tenderee,
                  "win_tenderer":win_tenderer
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"project_codes":project_code,
-                 "win_bid_price":win_bid_price
+
+        _dict = {"tenderee":tenderee,
+                 doctitle_refine_name:doctitle_refine
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"project_codes":project_code,
-                 "bidding_budget":bidding_budget
+
+        _dict = {"agency":agency,
+                 "project_name":project_name
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
         _dict = {"project_codes":project_code,
-                 doctitle_refine_name:doctitle_refine
+                 "project_name":project_name
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
+
+
         _dict = {"project_name":project_name,
                  "win_tenderer":win_tenderer
                  }
@@ -2774,10 +2803,6 @@ class Dataflow_dumplicate(Dataflow):
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"win_tenderer":win_tenderer,
-                 "bidding_budget":bidding_budget
-                 }
-        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
         _dict = {"win_tenderer":win_tenderer,
                  doctitle_refine_name:doctitle_refine
@@ -2789,6 +2814,11 @@ class Dataflow_dumplicate(Dataflow):
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
+        confidence=80
+        _dict = {"project_codes":project_code}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
+
         _dict = {"win_bid_price":win_bid_price,
                  doctitle_refine_name:doctitle_refine
                  }
@@ -2802,13 +2832,13 @@ class Dataflow_dumplicate(Dataflow):
         confidence=80
         _dict = {doctitle_refine_name:doctitle_refine}
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
-        _dict = {"project_codes":project_code}
-        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
 
         confidence=70
         _dict = {"project_name":project_name}
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
+
         return list_rules,table_name,table_index
 
     def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]):
@@ -3427,7 +3457,7 @@ class Dataflow_dumplicate(Dataflow):
         :param list_docids:
         :return:
         '''
-        print("==",list_docids)
+        log("search_projects_with_document %s"%str(list_docids))
         list_should_q = []
         for _docid in list_docids:
             list_should_q.append(TermQuery("docids",_docid))
@@ -3842,10 +3872,14 @@ class Dataflow_dumplicate(Dataflow):
             list_docids = []
             _docid = item.get(document_tmp_docid)
             list_docids.append(_docid)
+            if save==0:
+                dup_docid.insert(0,_docid)
             if isinstance(dup_docid,list):
                 list_docids.extend(dup_docid)
             list_docids = [a for a in list_docids if a is not None]
 
+
+
             _time = time.time()
             list_projects = self.search_projects_with_document(list_docids)
             # log("search projects takes:%.3f"%(time.time()-_time))
@@ -3872,6 +3906,9 @@ class Dataflow_dumplicate(Dataflow):
             list_merge_dump = dumplicate_document_in_merge(list_projects,dup_docid[:-1])
             # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
 
+            if list_merge_dump is None:
+                list_projects = []
+
             _time = time.time()
             project_json = to_project_json(list_projects)
             # log("json projects takes:%.3f"%(time.time()-_time))
@@ -3924,6 +3961,7 @@ class Dataflow_dumplicate(Dataflow):
     def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
         try:
             start_time = time.time()
+            b_log = False if upgrade else True
             self.post_extract(item)
 
 
@@ -3931,14 +3969,15 @@ class Dataflow_dumplicate(Dataflow):
             base_list = []
             set_docid = set()
 
-            list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=False)
+            list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=b_log)
             # print("len_rules",len(list_rules),table_name,table_index)
             list_rules.sort(key=lambda x:x["confidence"],reverse=True)
+
+            log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
             list_rules = list_rules[:30]
             _i = 0
             step = 5
 
-            print("here 1")
 
             item["confidence"] = 999
             if item.get(document_tmp_docid) not in set_docid:
@@ -3956,12 +3995,11 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle])
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle],b_log=b_log)
                 _i += step
 
 
-            print("here 2")
-            b_log = False if upgrade else True
+
             _time = time.time()
             # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
             final_list = self.dumplicate_fianl_check(base_list,b_log)
@@ -4125,26 +4163,27 @@ class Dataflow_dumplicate(Dataflow):
         schedule.start()
 
     def changeSaveStatus(self,list_dict):
-        for _dict in list_dict:
-            if isinstance(_dict,dict):
-                if _dict.get(document_tmp_save,1)==1:
-                    _d = {"partitionkey":_dict["partitionkey"],
-                          "docid":_dict["docid"],
+        if list_dict is not None:
+            for _dict in list_dict:
+                if isinstance(_dict,dict):
+                    if _dict.get(document_tmp_save,1)==1:
+                        _d = {"partitionkey":_dict["partitionkey"],
+                              "docid":_dict["docid"],
+                              document_tmp_save:0
+                              }
+                        _d_tmp = Document_tmp(_d)
+                        if _d_tmp.exists_row(self.ots_client):
+                            _d_tmp.update_row(self.ots_client)
+                elif isinstance(_dict,int):
+                    _d = {"partitionkey":_dict%500+1,
+                          "docid":_dict,
                           document_tmp_save:0
                           }
                     _d_tmp = Document_tmp(_d)
-                    if _d_tmp.exists_row(self.ots_client):
-                        _d_tmp.update_row(self.ots_client)
-            elif isinstance(_dict,int):
-                _d = {"partitionkey":_dict%500+1,
-                      "docid":_dict,
-                      document_tmp_save:0
-                      }
-                _d_tmp = Document_tmp(_d)
-                if _d_tmp.fix_columns(self.ots_client,["status"],True):
-                    if _d_tmp.getProperties().get("status")==1:
-                        _d_tmp.setValue("status",0,True)
-                        _d_tmp.update_row(self.ots_client)
+                    if _d_tmp.fix_columns(self.ots_client,["status"],True):
+                        if _d_tmp.getProperties().get("status")==1:
+                            _d_tmp.setValue("status",0,True)
+                            _d_tmp.update_row(self.ots_client)
 
 
 
@@ -4165,7 +4204,7 @@ class Dataflow_dumplicate(Dataflow):
         list_dict = getRow_ots(rows)
 
         for item in list_dict:
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
             return
 
     def test_merge(self,list_docid_less,list_docid_greater):
@@ -4252,7 +4291,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(397656324
+    df_dump.test_dumplicate(393333999
                             )
     # df_dump.test_merge([385521167
     #                     ],[385521113])

+ 1 - 1
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -1000,7 +1000,7 @@ def check_product(product_less,product_greater,split_char=","):
                 if getSimilarityOfString(_l,_g)>=0.8:
                     same_count += 1
                     break
-        if same_count/len(_product_l)>0.5:
+        if same_count/len(_product_l)>=0.5:
             return True
         return False
     return True

+ 6 - 5
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2131,7 +2131,7 @@ def dumplicate_projects(list_projects,b_log=False):
     list_projects.sort(key=lambda x:x.get("keyvaluecount",0),reverse=True)
     cluster_projects = list_projects[:10]
     _count = 10
-    print("dumplicate projects rest",len(cluster_projects))
+    log("dumplicate projects rest %d"%len(cluster_projects))
     while _count>0:
         _count -= 1
         _update = False
@@ -2162,7 +2162,7 @@ def dumplicate_projects(list_projects,b_log=False):
             break
         cluster_projects = list_p
 
-    print("dumplicate projects rest",len(cluster_projects))
+    log("dumplicate projects rest %d"%len(cluster_projects))
     return cluster_projects
 
 def update_projects_by_project(project_dict,projects):
@@ -2971,7 +2971,7 @@ def dumplicate_document_in_merge(list_projects,dup_docid):
     :return:
     '''
 
-    dup_docid = set(dup_docid)
+    dup_docid = set([str(a) for a in dup_docid])
     set_dup_total = set()
     for _proj in list_projects:
         try:
@@ -2989,7 +2989,7 @@ def dumplicate_document_in_merge(list_projects,dup_docid):
                 if str(docid) not in set_docids:
                     continue
 
-                if docid in dup_docid:
+                if str(docid) in dup_docid:
                     continue
                 _status = _d.get(document_status,201)
                 is_multipack = _d.get("is_multipack",True)
@@ -3041,10 +3041,11 @@ def dumplicate_document_in_merge(list_projects,dup_docid):
                     else:
                         dict_channel_proj[docchannel] = _d
 
-            set_docids = set_docids-set_dup_docid
+            set_docids = set_docids-set_dup_docid-dup_docid
             set_dup_total |= set_dup_docid
             if len(set_docids)==0:
                 log("projects set_docids length is zero %s"%(docids))
+                return None
             else:
                 _proj[project_docids] = ",".join(list(set_docids))
             _proj[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)