Parcourir la source

去重规则调整,增加对投诉、审批项目的去重规则;同一个站源附件不同的公告不再去重

luojiehua il y a 10 mois
Parent
commit
0b5dda2fd1

+ 49 - 22
BaseDataMaintenance/maintenance/dataflow.py

@@ -350,8 +350,8 @@ class Dataflow():
 
 
     def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
-                                  set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
-                                  set_range=set(["page_time","status"]),set_phrase=set(["doctitle"])):
+                                  set_term=set(["doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
+                                  set_range=set(["page_time","status"]),set_phrase=set(["doctitle","project_name"])):
         list_must_queries = []
         list_must_no_queries = []
         for k,v in _dict.items():
@@ -2235,6 +2235,8 @@ class Dataflow_dumplicate(Dataflow):
         _dict["package"] = self.c_f_get_package.evaluate(extract_json)
         _dict["project_name"] = _extract.get("name","")
         _dict["dict_time"] = self.get_dict_time(_extract)
+        _dict["punish"] = _extract.get("punish",{})
+        _dict["approval"] = _extract.get("approval",[])
 
     def dumplicate_fianl_check(self,base_list,b_log=False):
         the_group = base_list
@@ -2295,6 +2297,9 @@ class Dataflow_dumplicate(Dataflow):
         moneys_less = document_less.get("moneys")
         moneys_attachment_less = document_less.get("moneys_attachment")
         page_attachments_less = document_less.get(document_tmp_attachment_path,"[]")
+        punish_less = document_less.get("punish",{})
+        approval_less = document_less.get("approval",[])
+        source_type_less = document_less.get("source_type")
 
 
         document_greater = _dict2
@@ -2324,12 +2329,16 @@ class Dataflow_dumplicate(Dataflow):
         moneys_attachment_greater = document_greater.get("moneys_attachment")
         page_attachments_greater = document_greater.get(document_tmp_attachment_path,"[]")
 
+        punish_greater = document_greater.get("punish",{})
+        approval_greater = document_greater.get("approval",[])
+        source_type_greater = document_greater.get("source_type")
+
         hard_level=1
         if web_source_no_less==web_source_no_greater=="17397-3":
             hard_level=2
 
         if self.check_rule==1:
-            _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
+            _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater,punish_less=punish_less,punish_greater=punish_greater,approval_less=approval_less,approval_greater=approval_greater,source_type_less=source_type_less,source_type_greater=source_type_greater)
         else:
             _prob = check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
 
@@ -2854,7 +2863,7 @@ class Dataflow_dumplicate(Dataflow):
 
         return list_rules,table_name,table_index
 
-    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_name]):
+    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
         q_size = self.queue_dumplicate.qsize()
         log("dumplicate queue size %d"%(q_size))
 
@@ -2939,7 +2948,7 @@ class Dataflow_dumplicate(Dataflow):
         # mt.run()
 
 
-    def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
+    def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment,document_tenderee_code,document_agency_code,document_candidates]):
         '''
         根据docid查询公告内容,先查询document_tmp,再查询document
         :param list_docids:
@@ -3049,7 +3058,7 @@ class Dataflow_dumplicate(Dataflow):
                     continue
             if v is None or v=="" or v=="[]" or v=="未知":
                 continue
-            if k in (project_project_dynamics,project_product,project_project_codes,project_docids):
+            if k in (project_project_dynamics,project_product,project_project_codes,project_docids,project_candidates):
                 continue
             _dict[k] = v
         for _proj in projects:
@@ -3058,14 +3067,19 @@ class Dataflow_dumplicate(Dataflow):
             if _proj.get(project_page_time,"")<project_dict.get(project_page_time,""):
                 _proj[project_page_time] = project_dict.get(project_page_time,"")
 
-        #拼接属性
-        append_dict = {}
-        set_docid = set()
-        set_product = set()
-        set_code = set()
-        set_nlp_enterprise = set()
-        set_nlp_enterprise_attachment = set()
+
         for _proj in projects:
+            #拼接属性
+            append_dict = {}
+            set_docid = set()
+            set_product = set()
+            set_code = set()
+            set_nlp_enterprise = set()
+            set_nlp_enterprise_attachment = set()
+            set_candidates = set()
+
+
+
             _docids = _proj.get(project_docids,"")
             _codes = _proj.get(project_project_codes,"")
             _product = _proj.get(project_product,"")
@@ -3081,15 +3095,22 @@ class Dataflow_dumplicate(Dataflow):
             try:
                 set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
                 set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
-            except Exception as e:
-                pass
+                list_candidates = json.loads(project_dict.get(project_candidates,"[]"))
+                for item in list_candidates:
+                    if item.get("name") is not None and item.get("name") not in set_candidates:
+                        set_candidates.add(item.get("name"))
 
-            set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
-            set_product = set_product | set(project_dict.get(project_product,"").split(","))
 
-            try:
+                set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
+                set_product = set_product | set(project_dict.get(project_product,"").split(","))
+
                 set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
                 set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
+
+                for item in json.loads(_proj.get(project_candidates,"[]")):
+                    if item.get("name") is not None and item.get("name") not in set_candidates:
+                        set_candidates.add(item.get("name"))
+                        list_candidates.append(item)
             except Exception as e:
                 pass
 
@@ -3101,6 +3122,7 @@ class Dataflow_dumplicate(Dataflow):
 
             append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
             append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
+            append_dict[project_candidates] = json.dumps(list_candidates,ensure_ascii=False)
 
 
             dict_dynamic = {}
@@ -3119,6 +3141,7 @@ class Dataflow_dumplicate(Dataflow):
             list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
 
             append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
+
             _proj.update(append_dict)
 
 
@@ -3539,6 +3562,9 @@ class Dataflow_dumplicate(Dataflow):
             project_info_source,
             project_nlp_enterprise,
             project_nlp_enterprise_attachment,
+            project_tenderee_code,
+            project_agency_code,
+            project_candidates
         ],sort="page_time",table_name="project2",table_index="project2_index")
 
         return list_project_dict
@@ -4024,7 +4050,7 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path],b_log=b_log)
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type],b_log=b_log)
                 _i += step
 
 
@@ -4087,7 +4113,8 @@ class Dataflow_dumplicate(Dataflow):
 
             list_merge_dump = []
             if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
-                log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
+                if exist_finterprint:
+                    log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
                 dtmp.setValue(document_tmp_projects,"[]",True)
             else:
                 project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
@@ -4222,7 +4249,7 @@ class Dataflow_dumplicate(Dataflow):
 
     def test_dumplicate(self,docid):
         # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
-        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_name]
+        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]
         bool_query = BoolQuery(must_queries=[
             TermQuery("docid",docid)
         ])
@@ -4413,7 +4440,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(497234586
+    df_dump.test_dumplicate(505243916
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 113 - 5
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -777,7 +777,7 @@ def getSimLevel(str1,str2):
     return _v
 
 def getLength(_str):
-    return len(_str if _str is not None else "")
+    return len(str(_str) if _str is not None else "")
 
 def check_money(bidding_budget_less,bidding_budget_greater,
                 win_bid_price_less,win_bid_price_greater,
@@ -868,6 +868,85 @@ def check_entity(nlp_enterprise_less,nlp_enterprise_greater,
         return False
     return True
 
+
+def check_punish(punish_less,punish_greater):
+    same_count = 0
+    not_same_count = 0
+    _flag = True
+    keys = list(set(list(punish_less.keys())) | set(list(punish_greater.keys())))
+    for k in keys:
+        v1 = punish_less.get(k)
+        v2 = punish_greater.get(k)
+        if getLength(v1)>0 and getLength(v2)>0:
+            if k=="punish_code":
+                if not check_codes([v1],[v2]):
+                    not_same_count += 1
+                    _flag = False
+                else:
+                    same_count += 1
+            if k=="punishDecision":
+                if getSimilarityOfString(v1,v2)>0.8:
+                    same_count += 1
+            if k in ("complainants","punishPeople","institutions"):
+                if v1==v2:
+                    same_count += 1
+                else:
+                    not_same_count == 1
+                    _flag = False
+    return _flag,same_count,not_same_count
+
+def check_source_type(source_type_less,source_type_greater):
+    if getLength(source_type_less)>0 and getLength(source_type_greater)>0:
+        if source_type_less!=source_type_greater:
+            return False
+    return True
+def check_approval(approval_less,approval_greater,b_log):
+
+    if b_log:
+        logging.info("approval_less %s==approval_greater %s"%(approval_less,approval_greater))
+    for _less in approval_less:
+        for _greater in approval_greater:
+            same_count = 0
+            not_same_count = 0
+            flag = True
+            keys = ["source_stage","source_type","doc_num","project_code","project_name","approval_items","approval_result","approver","construct_company","construction_scale","declare_company","evaluation_agency","legal_person","compilation_unit","time_approval"]
+            for k in keys:
+                v1 = _less.get(k)
+                v2 = _greater.get(k)
+                if getLength(v1)>0 and getLength(v2)>0:
+                    if k in ("source_stage","source_type"):
+                        if v1!=v2:
+                            flag = False
+
+                    if k in ("project_code","doc_num"):
+                        if check_codes([v1],[v2]):
+                            same_count += 1
+                        else:
+                            not_same_count -= 1
+                            if b_log:
+                                logging.info("check approval %s false %s-%s"%(k,v1,v2))
+                            flag = False
+                    if k in ("approval_items","approval_result","project_name"):
+                        if getSimilarityOfString(v1,v2)>0.8:
+                            same_count += 1
+                        else:
+                            not_same_count -= 1
+                    if k in ("approver","construct_company","declare_company","evaluation_agency","legal_person","compilation_unit"):
+                        if v1==v2:
+                            same_count += 1
+                        else:
+                            not_same_count -= 1
+                            if b_log:
+                                logging.info("check approval %s false %s-%s"%(k,v1,v2))
+                            flag = False
+            if flag and same_count>1:
+                return flag,same_count,not_same_count
+    flag = True
+    if len(approval_less)>0 and len(approval_greater)>0:
+        flag = False
+    return flag,0,0
+
+
 def check_codes(project_codes_less,project_codes_greater):
     #check the similarity
     is_same = False
@@ -1008,7 +1087,7 @@ def check_product(product_less,product_greater,split_char=",",doctitle_refine_le
             _product_l = a
         for _l in _product_l:
             for _g in _product_g:
-                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>-0 or doctitle_refine_less.find(_g)>=0:
+                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0 or doctitle_refine_less.find(_g)>=0:
                     same_count += 1
                     break
         if same_count/len(_product_l)>=0.5:
@@ -1057,7 +1136,7 @@ def check_time(json_time_less,json_time_greater):
         return 0
     return 1
 
-def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater="",moneys_less=set(),moneys_greater=set(),moneys_attachment_less=set(),moneys_attachment_greater=set(),page_attachments_less="[]",page_attachments_greater="[]"):
+def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater="",moneys_less=set(),moneys_greater=set(),moneys_attachment_less=set(),moneys_attachment_greater=set(),page_attachments_less="[]",page_attachments_greater="[]",punish_less = {},punish_greater = {},approval_less = [],approval_greater = [],source_type_less = None,source_type_greater=None):
     if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
         return 1
 
@@ -1101,6 +1180,9 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
             if check_product(product_less,product_greater,doctitle_refine_less=doctitle_refine_less,doctitle_refine_greater=doctitle_refine_greater):
                 return 1
 
+    #同一个站源,都有附件但附件没有重叠则不去重
+    if web_source_no_less==web_source_no_greater and len(set_md5_less)>0 and len(set_md5_greater)>0 and len(set_md5_less&set_md5_greater)==0:
+        return 0
 
     if isinstance(project_codes_less,str):
         project_codes_less = [a for a in project_codes_less.split(",") if a!=""]
@@ -1131,6 +1213,33 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
         same_count += 1
     if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
         same_count += 1
+
+    _flag,_c1,_c2 = check_punish(punish_less,punish_greater)
+    if not _flag:
+        if b_log:
+            logging.info("check_punish failed")
+        return 0
+    else:
+        if b_log:
+            logging.info("check_punish true %d"%(_c1))
+        same_count += _c1
+
+    _flag,_c1,_c2 = check_approval(approval_less,approval_greater,b_log)
+    if not _flag:
+        if b_log:
+            logging.info("check approval failed")
+        return 0
+    else:
+        if b_log:
+            logging.info("check approval true %d"%(_c1))
+        same_count += _c1
+
+    _flag = check_source_type(source_type_less,source_type_greater)
+    if not _flag:
+        if b_log:
+            logging.info("check source type failed")
+        return 0
+
     base_prob = 0
     if min_counts<3:
         base_prob = 0.9
@@ -1208,8 +1317,7 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
         else:
             check_result["entity"] = 1
 
-    logging.info("moneys_less"+str(moneys_less)+"---"+str(moneys_attachment_less))
-    logging.info("moneys_less"+str(moneys_greater)+"---"+str(moneys_attachment_greater))
+
     if not check_money(bidding_budget_less,bidding_budget_greater,
                        win_bid_price_less,win_bid_price_greater,
                        moneys_less,moneys_greater,

+ 82 - 10
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -87,6 +87,15 @@ project_nlp_enterprise = "nlp_enterprise"
 project_nlp_enterprise_attachment = "nlp_enterprise_attachment"
 project_update_time = "update_time"
 project_tmp_attrs = "tmp_attrs"
+project_tenderee_code = "tenderee_code"
+project_agency_code = "agency_code"
+project_candidates = "candidates"
+
+project_win_tenderer_code = "win_tenderer_code"
+project_second_tenderer_code = "second_tenderer_code"
+project_third_tenderer_code = "third_tenderer_code"
+project_win_tenderer_joints = "win_tenderer_joints"
+project_multi_winners = "multi_winners"
 
 document_partitionkey = "partitionkey"
 document_docid = "docid"
@@ -148,6 +157,9 @@ document_time_release = "time_release"
 document_info_source = "info_source"
 document_nlp_enterprise = "nlp_enterprise"
 document_nlp_enterprise_attachment = "nlp_enterprise_attachment"
+document_tenderee_code = "tenderee_code"
+document_agency_code = "agency_code"
+document_candidates = "candidates"
 
 document_tmp_partitionkey = "partitionkey"
 document_tmp_docid = "docid"
@@ -183,6 +195,9 @@ document_tmp_opertime = "opertime"
 document_tmp_docchannel = "docchannel"
 document_tmp_original_docchannel = "original_docchannel"
 
+document_tmp_source_stage = "source_stage"
+document_tmp_source_type = "source_type"
+
 document_tmp_extract_json = "extract_json"
 document_tmp_industry_json = "industry_json"
 document_tmp_other_json = "other_json"
@@ -1516,7 +1531,7 @@ def generate_common_properties(list_docs):
     #计数法选择
     choose_dict = {}
     project_dict = {}
-    for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
+    for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_tenderee_code,document_agency_code]:
         for _doc in list_docs:
             _value = _doc.get(_key,"")
             if _value!="":
@@ -1616,6 +1631,9 @@ def generate_common_properties(list_docs):
     remove_docids = set()
     set_nlp_enterprise = set()
     set_nlp_enterprise_attachment = set()
+
+    set_candidates = set()
+    list_candidates = []
     for _doc in list_docs:
         table_name = _doc.get("table_name")
         status = _doc.get(document_status,0)
@@ -1632,13 +1650,23 @@ def generate_common_properties(list_docs):
 
         is_multipack = True if len(sub_docs)>1 else False
         extract_count = _doc.get(document_tmp_extract_count,0)
+        candidates = _doc.get(document_candidates,"[]")
+
 
         try:
             set_nlp_enterprise |= set(json.loads(_doc.get(document_nlp_enterprise,"[]")))
             set_nlp_enterprise_attachment |= set(json.loads(_doc.get(document_nlp_enterprise_attachment,"[]")))
+
+            for item in json.loads(candidates):
+                if item.get("name") is not None and item.get("name") not in set_candidates:
+                    list_candidates.append(item)
+                    set_candidates.add(item.get("name"))
+
         except Exception as e:
             traceback.print_exc()
 
+
+
         if product is not None:
             list_product.extend(product.split(","))
 
@@ -1651,7 +1679,7 @@ def generate_common_properties(list_docs):
 
         if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
             zhao_biao_page_time = page_time
-        if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
+        if zhong_biao_page_time=="" and _docchannel in (101,118,119,120,121,122):
             zhong_biao_page_time = page_time
         is_visuable = 0
         if table_name=="document":
@@ -1691,6 +1719,7 @@ def generate_common_properties(list_docs):
     project_dict[project_product] = ",".join(list(set(list_product)))
     project_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
     project_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
+    project_dict[project_candidates] = json.dumps(list_candidates[:100],ensure_ascii=False)
 
     return project_dict
 
@@ -1938,7 +1967,7 @@ class f_generate_projects_from_document(BaseUDTF):
                 _product = list_product[_i%len(list_product)]
                 self.forward(_uuid,page_time,page_time_stamp,docids,project_name,_project_code,tenderee,agency,bidding_budget,win_tenderer,win_bid_price,_product,attrs_json)
 
-@annotate('string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,double,string,double,string,string,string,double,string,string,string,double,string,string,string,string,string,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string -> string,string,bigint,string,string,string,string,string,double,string,double,string,string')
+@annotate('string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,double,string,double,string,string,string,double,string,string,string,double,string,string,string,string,string,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string -> string,string,bigint,string,string,string,string,string,double,string,double,string,string')
 class f_generate_projects_from_project(BaseUDTF):
 
     def __init__(self):
@@ -2013,7 +2042,16 @@ class f_generate_projects_from_project(BaseUDTF):
                 info_source,
                 nlp_enterprise,
                 nlp_enterprise_attachment,
-                update_time):
+                update_time,
+                tenderee_code,
+                agency_code,
+                candidates,
+                win_tenderer_code,
+                second_tenderer_code,
+                third_tenderer_code,
+                win_tenderer_joints,
+                multi_winners
+                ):
         attrs_dict = {}
 
         attrs_dict[project_uuid] = uuid
@@ -2082,9 +2120,18 @@ class f_generate_projects_from_project(BaseUDTF):
         attrs_dict[project_nlp_enterprise_attachment] = nlp_enterprise_attachment
         attrs_dict[project_update_time] = update_time
 
+        attrs_dict[project_tenderee_code] = tenderee_code
+        attrs_dict[project_agency_code] = agency_code
+        attrs_dict[project_candidates] = candidates
+        attrs_dict[project_win_tenderer_code] = win_tenderer_code
+        attrs_dict[project_second_tenderer_code] = second_tenderer_code
+        attrs_dict[project_third_tenderer_code] = third_tenderer_code
+        attrs_dict[project_win_tenderer_joints] = win_tenderer_joints
+        attrs_dict[project_multi_winners] = multi_winners
 
         popNoneFromDict(attrs_dict)
 
+
         attrs_json = json.dumps(attrs_dict,ensure_ascii=False)
         if bidding_budget is None:
             bidding_budget = -1
@@ -2171,7 +2218,7 @@ def update_projects_by_project(project_dict,projects):
     _dict = {}
     #更新公共属性
     for k,v in project_dict.items():
-        if k in (project_project_dynamics,project_page_time,project_sub_project_name,project_product,project_project_codes,project_docids,project_uuid,project_nlp_enterprise,project_nlp_enterprise_attachment):
+        if k in (project_project_dynamics,project_page_time,project_sub_project_name,project_product,project_project_codes,project_docids,project_uuid,project_nlp_enterprise,project_nlp_enterprise_attachment,project_candidates):
             continue
         for _proj in projects:
             if k not in _proj:
@@ -2205,6 +2252,17 @@ def update_projects_by_project(project_dict,projects):
     set_nlp_enterprise = set()
     set_nlp_enterprise_attachment = set()
     set_update_uuid = set()
+
+    set_candidates = set()
+
+
+    try:
+        set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
+        set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
+        list_candidates = json.loads(project_dict.get(project_candidates,"[]"))
+    except Exception as e:
+        pass
+
     for _proj in projects:
         _docids = _proj.get(project_docids,"")
         _codes = _proj.get(project_project_codes,"")
@@ -2221,6 +2279,12 @@ def update_projects_by_project(project_dict,projects):
         try:
             set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
             set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
+
+            for item in json.loads(_proj.get(project_candidates,"[]")):
+                if item.get("name") is not None and item.get("name") not in set_candidates:
+                    list_candidates.append(item)
+                    set_candidates.add(item.get("name"))
+
         except Exception as e:
             pass
     set_docid = set_docid | set(project_dict.get(project_docids,"").split(","))
@@ -2231,11 +2295,7 @@ def update_projects_by_project(project_dict,projects):
     set_delete_uuid = set_delete_uuid | set(project_dict.get(project_delete_uuid,"").split(","))
     set_update_uuid = set_update_uuid | set(project_dict.get("project_uuid","").split(","))
 
-    try:
-        set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
-        set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
-    except Exception as e:
-        pass
+
 
     append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
     append_dict[project_docid_number] = len(set_docid)
@@ -2246,6 +2306,7 @@ def update_projects_by_project(project_dict,projects):
     append_dict["update_uuid"] = ",".join([a for a in list(set_update_uuid) if a!=""])
     append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
     append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
+    append_dict[project_candidates] = json.dumps(list_candidates,ensure_ascii=False)
 
     dict_dynamic = {}
     set_docid = set()
@@ -2984,6 +3045,15 @@ def check_page_time_dup(page_time,n_page_time):
     return False
 
 
+def check_fix_document(doctitle,n_doctitle):
+    _fix = re.search("更正|更新|变更|澄清",doctitle)
+    _n_fix = re.search("更正|更新|变更|澄清",n_doctitle)
+    if _fix is not None and _n_fix is not None:
+        return True
+    if _fix is  None and _n_fix is None:
+        return True
+    return False
+
 def dumplicate_document_in_merge(list_projects,dup_docid):
     '''
     合并时去重
@@ -3033,6 +3103,8 @@ def dumplicate_document_in_merge(list_projects,dup_docid):
                             continue
                         if is_multipack or n_is_multipack:
                             continue
+                        if not check_fix_document(doctitle,n_doctitle):
+                            continue
                         n_title_search = re.search("[一二三四五六七八九十1-9]+(?:次|标|包)",n_doctitle)
                         if title_search is None and n_title_search is None:
                             pass

+ 28 - 27
BaseDataMaintenance/model/ots/document.py

@@ -341,25 +341,25 @@ def turn_document_status():
         #
         # )
 
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
-                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-        list_data = getRow_ots(rows)
-        print(total_count)
-        _count = len(list_data)
-        for _data in list_data:
-            _document = Document_tmp(_data)
-            task_queue.put(_document)
-        while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
-                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            _count += len(list_data)
-            print("%d/%d"%(_count,total_count))
-            for _data in list_data:
-                _document = Document_tmp(_data)
-                task_queue.put(_document)
+        # rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+        #                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+        #                                                                columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
+        # list_data = getRow_ots(rows)
+        # print(total_count)
+        # _count = len(list_data)
+        # for _data in list_data:
+        #     _document = Document_tmp(_data)
+        #     task_queue.put(_document)
+        # while next_token:
+        #     rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+        #                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+        #                                                                    columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
+        #     list_data = getRow_ots(rows)
+        #     _count += len(list_data)
+        #     print("%d/%d"%(_count,total_count))
+        #     for _data in list_data:
+        #         _document = Document_tmp(_data)
+        #         task_queue.put(_document)
 
         # docids = [223820830,224445409]
         # for docid in docids:
@@ -367,13 +367,13 @@ def turn_document_status():
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        # import pandas as pd
-        # df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx")
-        # for docid in df["docid1"]:
-        #     _dict = {document_docid:int(docid),
-        #              document_partitionkey:int(docid)%500+1,
-        #              }
-        #     task_queue.put(Document(_dict))
+        import pandas as pd
+        df = pd.read_excel(r"F:\Workspace2016\DataMining\data\2024-07-24_143135_数据导出.xlsx")
+        for docid in df["docid"]:
+            _dict = {document_docid:int(docid),
+                     document_partitionkey:int(docid)%500+1,
+                     }
+            task_queue.put(Document(_dict))
         # for docid in df["docid2"]:
         #     _dict = {document_docid:int(docid),
         #              document_partitionkey:int(docid)%500+1,
@@ -407,7 +407,8 @@ def turn_document_status():
         # item.setValue(document_district,"金湾区",True)
         # item.setValue(document_status,66,True)
         # print(item.getProperties())
-        # item.update_row(ots_client)
+        item.setValue(document_status,1,True)
+        item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_docid)))
         pass