浏览代码

去重规则更新优化

znj 1 周之前
父节点
当前提交
9f8683bdc5
共有 2 个文件被更改,包括 100 次插入18 次删除
  1. 11 9
      BaseDataMaintenance/maintenance/dataflow.py
  2. 89 9
      BaseDataMaintenance/maxcompute/documentDumplicate.py

+ 11 - 9
BaseDataMaintenance/maintenance/dataflow.py

@@ -2491,7 +2491,7 @@ class Dataflow_dumplicate(Dataflow):
                     check_result["pass"] = 0
                 else:
                     check_result["docchannel"] = 2
-        if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
+        if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater,page_time_less,page_time_greater):
             check_result["doctitle"] = 0
             check_result["pass"] = 0
             if b_log:
@@ -2689,7 +2689,8 @@ class Dataflow_dumplicate(Dataflow):
 
         if table_name in {"document_tmp","document"}:
 
-            if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1:
+            # if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1:
+            if page_time>=timeAdd(current_date,-7) and item.get("is_special_bonds")!=1 and not get_all:
                 table_name = "document_tmp"
                 table_index = "document_tmp_index"
                 base_dict = {
@@ -2939,7 +2940,7 @@ class Dataflow_dumplicate(Dataflow):
 
         return list_rules,table_name,table_index
 
-    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link"]):
+    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,"detail_link",'products']):
         q_size = self.queue_dumplicate.qsize()
         log("dumplicate queue size %d"%(q_size))
 
@@ -4463,7 +4464,7 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link'],b_log=b_log)
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document,document_tmp_web_source_name,'detail_link','products'],b_log=b_log)
                 _i += step
 
 
@@ -5248,14 +5249,15 @@ class Dataflow_dumplicate(Dataflow):
 
     def test_dumplicate(self,docid):
         # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
-        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link']
+        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type,'detail_link','products']
         # print('columns',columns)
         item = self.get_attrs_before_dump(docid,columns)
 
         if item:
             log("start dumplicate_comsumer_handle")
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=True)
-            return
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
+            # self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
+
 
     def test_merge(self,list_docid_less,list_docid_greater):
         list_docs_less = self.search_docs(list_docid_less)
@@ -5493,8 +5495,8 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(614326449
-                            )
+    df_dump.test_dumplicate(628365020
+)
     # df_dump.dumplicate_comsumer_handle_interface(603504420,document_table="document_0000",document_table_index="document_0000_index",project_table="project_0000",project_table_index="project_0000_index_formerge")
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 89 - 9
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -1002,12 +1002,12 @@ def check_demand():
 package_number_pattern = re.compile("(?P<name>(((([^承]|^)包|标[段号的包]|分?包|包组|包件)编?号?|子项目|项目类型|项目)[::]?[0-9A-Za-z一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦ]{1,4}[^\.]?)[^至]?|((?![\.])第?[ⅠⅡⅢⅣⅤⅥⅦ0-9A-Za-z一二三四五六七八九十]{1,4}(包号|标[段号的包]|分?包)))")  # 第? 去掉问号 修复 纯木浆8包/箱复印 这种作为包号
 code_pattern = re.compile("[A-Za-z0-9\-\(\)()【】\.-]+")
 num_pattern = re.compile("^\d+(?:\.\d+)?$")
-num1_pattern = re.compile("[一二三四五六七八九A-Za-z]+")
+num1_pattern = re.compile("[一二三四五六七八九A-Za-z]+")
 location_pattern = re.compile("[^\[【\(]{1,2}[市区镇县村路]")
-building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|结算审计|招标代理|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
+building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|结算审计|招标代理|设备类|第?[\((]?[一二三四五六七八九1-9]+[)\)]?[次批]"
 rebid_pattern = "再次|重新招标|[一二三四五六七八九十]+次"
 date_pattern = re.compile("\d{2,4}[\-\./年]\d{1,2}[\-\./月]\d{1,2}")
-def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[]):
+def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[],page_time_less="",page_time_greater=""):
     if code_greater is None:
         code_greater = []
     doctitle_refind_less = str(doctitle_refind_less).replace("(","(").replace(")",")")
@@ -1027,8 +1027,9 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
     _pack1 = None
     _pack2 = None
     #if contain then pass
-    if doctitle_refind_less.find(doctitle_refind_greater)>=0 or doctitle_refind_greater.find(doctitle_refind_less)>=0:
-        return True
+    if page_time_less and page_time_less == page_time_greater:
+        if doctitle_refind_less.find(doctitle_refind_greater)>=0 or doctitle_refind_greater.find(doctitle_refind_less)>=0:
+            return True
     #check the package in title
 
     _match = re.search(package_number_pattern,doctitle_refind_less)
@@ -1067,7 +1068,7 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
                 return False
 
     #check location and keywords
-    for _p in [num1_pattern,building_pattern,rebid_pattern]:
+    for _p in [num1_pattern,building_pattern]:
         num_all_l = re.findall(_p,doctitle_refind_less)
         num_all_g = re.findall(_p,doctitle_refind_greater)
         set_num_l = set(num_all_l)
@@ -1075,6 +1076,17 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
         if len(set_num_l)==len(set_num_g):
             if len(set_num_l&set_num_g)!=len(set_num_l):
                 return False
+    # 重新(多次)招标关键词
+    for _p in [rebid_pattern]:
+        num_all_l = re.findall(_p,doctitle_refind_less)
+        num_all_g = re.findall(_p,doctitle_refind_greater)
+        set_num_l = set(num_all_l)
+        set_num_g = set(num_all_g)
+        if len(set_num_l)==len(set_num_g):
+            if len(set_num_l&set_num_g)!=len(set_num_l):
+                return False
+        elif (len(set_num_l) and not len(set_num_g)) or (len(set_num_g) and not len(set_num_l)):
+            return False
 
     #check the location has conflict
     for _p in [location_pattern]:
@@ -1164,7 +1176,11 @@ def check_product(product_less,product_greater,split_char=",",doctitle_refine_le
             #         return False
         for _l in _product_l:
             for _g in _product_g:
-                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0 or doctitle_refine_less.find(_g)>=0:
+                # if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0 or doctitle_refine_less.find(_g)>=0:
+                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0:
+                    print(_l,_g)
+                    print(doctitle_refine_greater.find(_l))
+                    print(doctitle_refine_less.find(_g))
                     same_count += 1
                     break
         if same_count/len(_product_l)>=0.5:
@@ -1216,6 +1232,41 @@ def check_time(json_time_less,json_time_greater):
         return 0
     return 1
 
+def check_products(products_less,products_greater):
+    if isinstance(products_less, list):
+        products_less = products_less
+    else:
+        products_less = json.loads(products_less) if products_less else []
+    if isinstance(products_greater, list):
+        products_greater = products_greater
+    else:
+        products_greater = json.loads(products_greater) if products_greater else []
+    # if len(products_less)>0 and len(products_greater)>0:
+    if len(products_less)>=4 and len(products_greater)>=4:
+        products_less_list = [p['product'] for p in products_less]
+        products_less_list = product_dump(products_less_list)
+        products_greater_list = [p['product'] for p in products_greater]
+        products_greater_list = product_dump(products_greater_list)
+        if len(products_less_list)>len(products_greater_list):
+            a = products_greater_list
+            products_greater_list = products_less_list
+            products_less_list = a
+
+        # print('products_less_set',products_less_list)
+        # print('products_greater_set',products_greater_list)
+        same_count = 0
+        for _l in products_less_list:
+            for _g in products_greater_list:
+                if getSimilarityOfString(_l,_g)>=0.8:
+                    same_count += 1
+                    break
+        if same_count/len(products_less_list)<0.5:
+            # print('check_products false')
+            return False
+
+    return True
+
+
 def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,hard_level=1):
 
     docid_less = document_less["docid"]
@@ -1236,6 +1287,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     fingerprint_less = document_less.get("fingerprint")
     extract_count_less = document_less.get("extract_count",0)
     web_source_no_less = document_less.get("web_source_no")
+    web_source_name_less = document_less.get("web_source_name")
     province_less = document_less.get("province")
     city_less = document_less.get("city")
     district_less = document_less.get("district")
@@ -1247,6 +1299,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     source_type_less = document_less.get("source_type")
     detail_link_less = document_less.get("detail_link")
     is_special_bonds_less = document_less.get("is_special_bonds")
+    products_less = document_less.get("products")
 
 
     docid_greater = document_greater["docid"]
@@ -1267,11 +1320,13 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     fingerprint_greater = document_greater.get("fingerprint")
     extract_count_greater = document_greater.get("extract_count",0)
     web_source_no_greater = document_greater.get("web_source_no")
+    web_source_name_greater = document_greater.get("web_source_name")
     province_greater = document_greater.get("province")
     city_greater = document_greater.get("city")
     district_greater = document_greater.get("district")
     detail_link_greater = document_greater.get("detail_link")
     is_special_bonds_greater = document_greater.get("is_special_bonds")
+    products_greater = document_greater.get("products")
 
     moneys_greater = document_greater.get("moneys")
     moneys_attachment_greater = document_greater.get("moneys_attachment")
@@ -1281,11 +1336,34 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     approval_greater = document_greater.get("approval",[])
     source_type_greater = document_greater.get("source_type")
 
-
+    # print('docid:',docid_less,docid_greater)
     if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
+        # print('fingerprint same')
         return 1
 
 
+    # 站源相同时,除了fingerprint一样和detail_link一样,其他不去重
+    if web_source_no_less==web_source_no_greater and getLength(web_source_no_less)>0:
+        if getLength(detail_link_less)>0 and getLength(detail_link_greater)>0:
+            if detail_link_less != detail_link_greater:
+                # print('站源相同时,detail_link不一样,直接不去重')
+                return 0
+            else: # 链接一样时,判断其是否为主页或者列表页
+                detail_link_split_less = re.sub("https?://","",detail_link_less.strip())
+                detail_link_split_less = re.split("/",detail_link_split_less)
+                detail_link_split_less = [i for i in detail_link_split_less if i]
+                if len(detail_link_split_less)==1: #链接为站源主页域名
+                    # print('站源相同时,detail_link一样,链接为站源主页域名')
+                    return 0
+                elif re.search("(index|list)(\.html?|\.do)?$",detail_link_split_less[-1],re.I): #链接为列表页
+                    # print('站源相同时,detail_link一样,链接为列表页')
+                    return 0
+
+    # 采购产品products对比
+    if getLength(products_less)>0 and getLength(products_greater)>0:
+        if not check_products(products_less,products_greater):
+            return 0
+
     #一篇要素都在附件,且两篇附件md5有重叠
     set_md5_less = set()
     set_md5_greater = set()
@@ -1463,6 +1541,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
     else:
         base_prob = 0.6
     _prob = base_prob*same_count/all_count
+    # print('base_prob',base_prob,'min_counts',min_counts,'same_count',same_count,'all_count',all_count)
     if min(extract_count_less,extract_count_greater)<=3 and max(extract_count_less,extract_count_greater)>=5:
         if _prob<0.1 and str(page_time_less)==str(page_time_greater):
             if str(docchannel_less) not in ("302","303"):
@@ -1484,7 +1563,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
                 check_result["pass"] = 0
             else:
                 check_result["docchannel"] = 2
-    if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
+    if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater,page_time_less,page_time_greater):
         check_result["doctitle"] = 0
         check_result["pass"] = 0
         if b_log:
@@ -1596,6 +1675,7 @@ def check_dumplicate_rule(document_less,document_greater,min_counts,b_log=False,
         if b_log:
             logging.info("hard_level %s and check_product less than 2"%(str(hard_level)))
         return 0
+    # print('check_result',check_result,'_prob',_prob)
     if check_result.get("pass",0)==0:
         if b_log:
             logging.info(str(check_result))