Sfoglia il codice sorgente

公告去重优化,调整时间和产品校验,增加附件及全金额判断

luojiehua 1 anno fa
parent
commit
6739809039

+ 14 - 24
BaseDataMaintenance/maintenance/dataflow.py

@@ -2245,38 +2245,29 @@ class Dataflow_dumplicate(Dataflow):
 
         if len(base_list)>0:
             base_fingerprint = base_list[0]["fingerprint"]
-        for _i in range(1,len(base_list)):
+
+        final_group = []
+        for _i in range(len(base_list)):
             _dict1 = base_list[_i]
             fingerprint_less = _dict1["fingerprint"]
             _pass = True
             if fingerprint_less==base_fingerprint:
                 _index = _i
+                final_group.append(_dict1)
                 continue
-            for _j in range(min(_i,10)):
-                _dict2 = base_list[_j]
+            for _dict2 in final_group:
                 _prob,day_dis = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
                 if _prob<=0.1:
                     _pass = False
                     break
             log("checking index:%d %s %.2f"%(_i,str(_pass),_prob))
             _index = _i
-            if not _pass:
-                _index -= 1
+            if _pass:
+                final_group.append(_dict1)
+            else:
                 break
-        if _index>=1:
-            # #对重复入库的进行去重
-            # _l = the_group[:_index+1]
-            # set_fingerprint = set()
-            # final_l = []
-            # for _dict in _l:
-            #     fingerprint_less = _dict["fingerprint"]
-            #     if fingerprint_less in set_fingerprint:
-            #         continue
-            #     else:
-            #         final_l.append(_dict)
-            #         set_fingerprint.add(fingerprint_less)
-            return the_group[:_index+1]
-        return []
+
+        return final_group
 
     def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
         document_less = _dict1
@@ -4229,7 +4220,7 @@ class Dataflow_dumplicate(Dataflow):
         list_dict = getRow_ots(rows)
 
         for item in list_dict:
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
             return
 
     def test_merge(self,list_docid_less,list_docid_greater):
@@ -4405,12 +4396,11 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(405004237
+    df_dump.test_dumplicate(378760606
                             )
-
     # compare_dumplicate_check()
-    # df_dump.test_merge([242672995,235300429,240009762
-    #                     ],[243240169,])
+    # df_dump.test_merge([391898061
+    #                     ],[371551361,])
     # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()

+ 45 - 15
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -801,8 +801,6 @@ def check_money(bidding_budget_less,bidding_budget_greater,
     #check saming
     budget_is_same = ""
     price_is_same = ""
-    logging.info("moneys_less"+str(moneys_less)+"---"+str(moneys_attachment_less))
-    logging.info("moneys_less"+str(moneys_greater)+"---"+str(moneys_attachment_greater))
     if getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
         budget_less = float(bidding_budget_less)
         budget_greater = float(bidding_budget_greater)
@@ -901,7 +899,7 @@ code_pattern = re.compile("[A-Za-z0-9\-\(\)()【】\.-]+")
 num_pattern = re.compile("^\d+(?:\.\d+)?$")
 num1_pattern = re.compile("[一二三四五六七八九A-Za-z]+")
 location_pattern = re.compile("[^\[【\(]{1,2}[市区镇县村路]")
-building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
+building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|结算审计|招标代理|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
 date_pattern = re.compile("\d{2,4}[\-\./年]\d{1,2}[\-\./月]\d{1,2}")
 def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[]):
     if code_greater is None:
@@ -1619,7 +1617,7 @@ class f_dumplicate_featureMatrix(BaseUDTF):
         self.forward(json_matrix,_prob)
         return
 
-@annotate('bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,double->string')
+@annotate('bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,double,string,string,string,string,string,string->string')
 class f_redump_probability_final_check(BaseUDAF):
     '''
     去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
@@ -1634,10 +1632,12 @@ class f_redump_probability_final_check(BaseUDAF):
     def new_buffer(self):
         return [list()]
 
-    def iterate(self, buffer,main_docid,docid,newly,docchannel,nlp_enterprise,product,package,json_dicttime,page_time,project_codes,project_name,doctitle_refine,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,confidence):
+    def iterate(self, buffer,main_docid,docid,newly,docchannel,nlp_enterprise,product,package,json_dicttime,page_time,project_codes,project_name,doctitle_refine,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,confidence,
+                province,city,district,web_source_no,extract_json,page_attachments):
         buffer[0].append({"main_docid":main_docid,"docid":docid,"docchannel":docchannel,"nlp_enterprise":nlp_enterprise,"product":product,"package":package,"json_dicttime":json_dicttime,"page_time":page_time,
                           "project_codes":project_codes,"project_name":project_name,"doctitle_refine":doctitle_refine,"tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,
-                          "win_bid_price":win_bid_price,"extract_count":extract_count,"confidence":confidence})
+                          "win_bid_price":win_bid_price,"extract_count":extract_count,"confidence":confidence,
+                          "province":province,"city":city,"district":district,"web_source_no":web_source_no,"extract_json":extract_json,"page_attachments":page_attachments})
 
     def merge(self, buffer, pbuffer):
         buffer[0].extend(pbuffer[0])
@@ -1647,8 +1647,10 @@ class f_redump_probability_final_check(BaseUDAF):
         the_group = buffer[0]
         the_group.sort(key=lambda x:x["confidence"],reverse=True)
         _index = 0
+
+        final_group = []
         if len(the_group)>0:
-            _index = 1
+            _index = 0
             while _index<len(the_group):
                 document_greater = the_group[_index]
                 docid_greater = document_greater["docid"]
@@ -1668,10 +1670,16 @@ class f_redump_probability_final_check(BaseUDAF):
                 fingerprint_greater = document_greater.get("fingerprint","")
                 project_name_greater = document_greater["project_name"]
                 extract_count_greater = document_greater["extract_count"]
-                _less_index = 0
-                while _less_index<_index:
+                province_greater = document_greater["province"]
+                city_greater = document_greater["city"]
+                district_greater = document_greater["district"]
+                web_source_no_greater = document_greater["web_source_no"]
+                extract_json_greater = document_greater["extract_json"]
+                page_attachments_greater = document_greater["page_attachments"]
+                _pass = True
+
+                for document_less in final_group:
 
-                    document_less = the_group[_less_index]
                     docid_less = document_less["docid"]
                     docchannel_less = document_less["docchannel"]
                     page_time_less = document_less["page_time"]
@@ -1689,21 +1697,43 @@ class f_redump_probability_final_check(BaseUDAF):
                     fingerprint_less = document_less.get("fingerprint","")
                     project_name_less = document_less["project_name"]
                     extract_count_less = document_less["extract_count"]
-
-                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,len(the_group),b_log=False)
+                    province_less = document_less["province"]
+                    city_less = document_less["city"]
+                    district_less = document_less["district"]
+                    web_source_no_less = document_less["web_source_no"]
+                    extract_json_less = document_less["extract_json"]
+                    page_attachments_less = document_less["page_attachments"]
+
+                    if extract_json_less is not None:
+                        _extract_less = json.loads(extract_json_less)
+                    _extract_greater = {}
+                    if extract_json_greater is not None:
+                        _extract_greater = json.loads(extract_json_greater)
+                    moneys_less = set(_extract_less.get("moneys",[]))
+                    moneys_attachment_less = set(_extract_less.get("moneys_attachment",[]))
+                    moneys_greater = set(_extract_greater.get("moneys",[]))
+                    moneys_attachment_greater = set(_extract_greater.get("moneys_attachment",[]))
+
+                    if page_attachments_less is None:
+                        page_attachments_less = '[]'
+                    if page_attachments_greater is None:
+                        page_attachments_greater = '[]'
+
+                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,len(the_group),b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
 
                     if _prob<0.1:
+                        _pass = False
                         break
 
-                    _less_index += 1
-                if _less_index!=_index:
+                if _pass:
+                    final_group.append(document_greater)
+                else:
                     break
                 _index += 1
 
         dumplicates = ""
         if _index>1:
             logging.info("index/whole:%d/%d"%(_index,len(the_group)))
-            final_group = the_group[:_index]
             final_group.sort(key=lambda x:x["docid"])
             final_group.sort(key=lambda x:x["extract_count"],reverse=True)
             _set = set()

+ 1 - 1
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2287,7 +2287,7 @@ def timeAdd(_time,days,format="%Y-%m-%d",minutes=0):
 #     except Exception as e:
 #         return None
 
-def check_time_merge(json_time_less,json_time_greater,b_log,set_time_key=set([project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start])):
+def check_time_merge(json_time_less,json_time_greater,b_log,set_time_key=set([project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_registration_end,project_time_registration_start])):
 
     same_count = 0
     if getLength(json_time_less)>0 and getLength(json_time_greater)>0:

+ 31 - 26
BaseDataMaintenance/model/ots/document.py

@@ -337,25 +337,25 @@ def turn_document_status():
         #
         # )
 
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-        list_data = getRow_ots(rows)
-        print(total_count)
-        _count = len(list_data)
-        for _data in list_data:
-            _document = Document(_data)
-            task_queue.put(_document)
-        while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            _count += len(list_data)
-            print("%d/%d"%(_count,total_count))
-            for _data in list_data:
-                _document = Document(_data)
-                task_queue.put(_document)
+        # rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+        #                                                                columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
+        # list_data = getRow_ots(rows)
+        # print(total_count)
+        # _count = len(list_data)
+        # for _data in list_data:
+        #     _document = Document(_data)
+        #     task_queue.put(_document)
+        # while next_token:
+        #     rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+        #                                                                    columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
+        #     list_data = getRow_ots(rows)
+        #     _count += len(list_data)
+        #     print("%d/%d"%(_count,total_count))
+        #     for _data in list_data:
+        #         _document = Document(_data)
+        #         task_queue.put(_document)
 
         # docids = [223820830,224445409]
         # for docid in docids:
@@ -363,13 +363,18 @@ def turn_document_status():
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        # import pandas as pd
-        # df = pd.read_excel("G:\\20221212error.xlsx")
-        # for docid in df["docid"]:
-        #     _dict = {document_docid:int(docid),
-        #              document_partitionkey:int(docid)%500+1,
-        #              }
-        #     task_queue.put(Document(_dict))
+        import pandas as pd
+        df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx")
+        for docid in df["docid1"]:
+            _dict = {document_docid:int(docid),
+                     document_partitionkey:int(docid)%500+1,
+                     }
+            task_queue.put(Document(_dict))
+        for docid in df["docid2"]:
+            _dict = {document_docid:int(docid),
+                     document_partitionkey:int(docid)%500+1,
+                     }
+            task_queue.put(Document(_dict))
         log("task_queue size:%d"%(task_queue.qsize()))
 
     def _handle(item,result_queue,ots_client):