Bläddra i källkod

去重规则调整,增加对投诉、审批项目的去重规则;同一个站源附件不同的公告不再去重

luojiehua 9 månader sedan
förälder
incheckning
ec8c5c6d78

+ 1 - 1
BaseDataMaintenance/fixDoc_to_queue_extract.py

@@ -8,4 +8,4 @@ from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract,
 
 if __name__ == '__main__':
     # fixDoc_to_queue_extract()
-    fixDoc_to_queue_init(filename="/data/python/flow_init_check/flow_init_2023-12-28.xlsx")
+    fixDoc_to_queue_init(filename="/data/python/flow_init_check/flow_init_2024-08-13.xlsx")

+ 1 - 1
BaseDataMaintenance/maintenance/attachment/attachmentProcess.py

@@ -811,7 +811,7 @@ class AttachmentRec():
                             attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
                             attach.setValue(attachment_status,ATTACHMENT_PROCESSED_FAILED)
                             log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
-                            sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
+                            # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
 
 
                 attach.update_row(self.ots_client)

+ 26 - 16
BaseDataMaintenance/maintenance/dataflow.py

@@ -260,7 +260,7 @@ class Dataflow():
                         log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
                     else:
                         log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
-                        sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
+                        # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
                         _html = ""
                         return False
 
@@ -415,7 +415,10 @@ class Dataflow():
         if agency is not None and agency!="":
             extract_count += 1
         if sub_docs_json is not None:
-            sub_docs = json.loads(sub_docs_json)
+            try:
+                sub_docs = json.loads(sub_docs_json)
+            except Exception as e:
+                sub_docs = []
             sub_docs.sort(key=lambda x:float(x.get("bidding_budget",0)),reverse=True)
             sub_docs.sort(key=lambda x:float(x.get("win_bid_price",0)),reverse=True)
             # log("==%s"%(str(sub_docs)))
@@ -3780,8 +3783,8 @@ class Dataflow_dumplicate(Dataflow):
                 district = _proj.get(project_district,"")
 
                 if is_yanshou:
-                    page_time_less = timeAdd(page_time,-750)
-                    page_time_greater = timeAdd(page_time,720)
+                    page_time_less = timeAdd(page_time,-850)
+                    page_time_greater = timeAdd(page_time,820)
                 else:
                     page_time_less = timeAdd(page_time,-450)
                     page_time_greater = timeAdd(page_time,420)
@@ -3885,8 +3888,9 @@ class Dataflow_dumplicate(Dataflow):
                         update_projects_by_project(_data,[_proj])
                         projects_update_time += time.time()-_time
 
-            whole_time = time.time()-whole_time_start
-            log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
+                whole_time = time.time()-whole_time_start
+                log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
+
 
             return list_projects
         except Exception as e:
@@ -4050,7 +4054,7 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type],b_log=b_log)
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=b_log)
                 _i += step
 
 
@@ -4075,7 +4079,8 @@ class Dataflow_dumplicate(Dataflow):
 
             dup_docid = set()
             for _dict in final_list:
-                dup_docid.add(_dict.get(document_tmp_docid))
+                if _dict.get("update_document","")!="true":
+                    dup_docid.add(_dict.get(document_tmp_docid))
             if item.get(document_tmp_docid) in dup_docid:
                 dup_docid.remove(item.get(document_tmp_docid))
 
@@ -4097,6 +4102,7 @@ class Dataflow_dumplicate(Dataflow):
                     for _dict in final_list:
                         if _dict.get(document_tmp_docid) in dup_docid:
                             remove_list.append(_dict)
+
                     dmp_docid = ",".join([str(a) for a in list(dup_docid)])
                     dmp_docid = "%d,%s"%(best_docid,dmp_docid)
                 else:
@@ -4108,8 +4114,8 @@ class Dataflow_dumplicate(Dataflow):
             list_docids = list(dup_docid)
             list_docids.append(best_docid)
 
-            if item.get(document_update_document)=="true":
-                dtmp.setValue(document_tmp_save,1,True)
+            # if item.get(document_update_document)=="true":
+            #     dtmp.setValue(document_tmp_save,1,True)
 
             list_merge_dump = []
             if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
@@ -4172,19 +4178,23 @@ class Dataflow_dumplicate(Dataflow):
 
 
 
+        current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+        before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-20)
+        after_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
         if self.fix_doc_docid is None:
-            current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
-            before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
             bool_query = BoolQuery(must_queries=[
                 TermQuery(document_tmp_save,1),
                 RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
-                RangeQuery(document_tmp_opertime,before_date)
+                RangeQuery(document_tmp_docchannel,0,300),
+                RangeQuery(document_tmp_opertime,before_date,after_date)
             ])
         else:
             bool_query = BoolQuery(must_queries=[
                 TermQuery(document_tmp_save,1),
                 RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
-                RangeQuery(document_tmp_docid,self.fix_doc_docid)
+                RangeQuery(document_tmp_docchannel,0,300),
+                RangeQuery(document_tmp_docid,self.fix_doc_docid),
+                RangeQuery(document_tmp_opertime,before_date,after_date)
             ])
 
         list_data = []
@@ -4219,7 +4229,7 @@ class Dataflow_dumplicate(Dataflow):
         schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
         schedule.add_job(self.flow_remove,"cron",hour="20")
         schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
-        # schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
+        schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
         schedule.start()
 
     def changeSaveStatus(self,list_dict):
@@ -4440,7 +4450,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(505243916
+    df_dump.test_dumplicate(519262974
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -453,7 +453,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     if len(_html)>1:
                         _html = "interface return error"
                     else:
-                        sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
+                        # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
                         _html = ""
 
                         return False

+ 21 - 1
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -980,6 +980,7 @@ num_pattern = re.compile("^\d+(?:\.\d+)?$")
 num1_pattern = re.compile("[一二三四五六七八九A-Za-z]+")
 location_pattern = re.compile("[^\[【\(]{1,2}[市区镇县村路]")
 building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|结算审计|招标代理|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
+rebid_pattern = "再次|重新招标|[一二三四五六七八九十]+次"
 date_pattern = re.compile("\d{2,4}[\-\./年]\d{1,2}[\-\./月]\d{1,2}")
 def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[]):
     if code_greater is None:
@@ -1041,7 +1042,7 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
                 return False
 
     #check location and keywords
-    for _p in [num1_pattern,building_pattern]:
+    for _p in [num1_pattern,building_pattern,rebid_pattern]:
         num_all_l = re.findall(_p,doctitle_refind_less)
         num_all_g = re.findall(_p,doctitle_refind_greater)
         set_num_l = set(num_all_l)
@@ -1080,11 +1081,30 @@ def check_product(product_less,product_greater,split_char=",",doctitle_refine_le
 
         _product_l = product_less.split(split_char)
         _product_g = product_greater.split(split_char)
+        _title_l = doctitle_refine_less
+        _title_g = doctitle_refine_greater
         same_count = 0
         if len(_product_l)>len(_product_g):
             a = _product_g
             _product_g = _product_l
             _product_l = a
+            _title_l = doctitle_refine_greater
+            _title_g = doctitle_refine_less
+        set_product_l_in_title = set()
+        set_product_g_in_title = set()
+        for _l in _product_l:
+            if _title_l.find(_l)>=0:
+                set_product_l_in_title.add(_l)
+        for _g in _product_g:
+            if _title_g.find(_g)>=0:
+                set_product_g_in_title.add(_g)
+        # 限制标题出现的产品要有重叠
+        if len(set_product_l_in_title)>0 and len(set_product_g_in_title)>0:
+            _set_union = set_product_l_in_title & set_product_g_in_title
+            if len(_set_union)==0:
+                return False
+            if len(_set_union)>0 and len(_set_union)!=len(set_product_l_in_title) and len(_set_union)!=len(set_product_g_in_title):
+                return False
         for _l in _product_l:
             for _g in _product_g:
                 if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0 or doctitle_refine_less.find(_g)>=0:

+ 1 - 1
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2177,7 +2177,7 @@ def dumplicate_projects(list_projects,b_log=False):
     appendKeyvalueCount(list_projects)
     list_projects.sort(key=lambda x:str(x.get(project_page_time,"")))
     list_projects.sort(key=lambda x:x.get("keyvaluecount",0),reverse=True)
-    cluster_projects = list_projects[:10]
+    cluster_projects = list_projects[:100]
     _count = 10
     log("dumplicate projects rest %d"%len(cluster_projects))
     while _count>0:

+ 36 - 30
BaseDataMaintenance/model/ots/document.py

@@ -308,9 +308,9 @@ def turn_document_status():
         bool_query = BoolQuery(
             must_queries=[
                 # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
-                WildcardQuery("web_source_no","03716-*"),
-                RangeQuery("page_time","2024-04-24"),
-                TermQuery("save",1)
+                # WildcardQuery("web_source_no","03716-*"),
+                RangeQuery("product_number",500),
+                # TermQuery("save",1)
                 # RangeQuery("status",0,1),
                 # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")),
                 # TermQuery("docid",397656324)
@@ -341,25 +341,25 @@ def turn_document_status():
         #
         # )
 
-        # rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
-        #                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-        #                                                                columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-        # list_data = getRow_ots(rows)
-        # print(total_count)
-        # _count = len(list_data)
-        # for _data in list_data:
-        #     _document = Document_tmp(_data)
-        #     task_queue.put(_document)
-        # while next_token:
-        #     rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
-        #                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-        #                                                                    columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-        #     list_data = getRow_ots(rows)
-        #     _count += len(list_data)
-        #     print("%d/%d"%(_count,total_count))
-        #     for _data in list_data:
-        #         _document = Document_tmp(_data)
-        #         task_queue.put(_document)
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+                                                                       columns_to_get=ColumnsToGet(["product","product_number"],return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        print(total_count)
+        _count = len(list_data)
+        for _data in list_data:
+            _document = Document_tmp(_data)
+            task_queue.put(_document)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                           columns_to_get=ColumnsToGet(["product"],return_type=ColumnReturnType.SPECIFIED))
+            list_data = getRow_ots(rows)
+            _count += len(list_data)
+            print("%d/%d"%(_count,total_count))
+            for _data in list_data:
+                _document = Document_tmp(_data)
+                task_queue.put(_document)
 
         # docids = [223820830,224445409]
         # for docid in docids:
@@ -367,13 +367,15 @@ def turn_document_status():
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        import pandas as pd
-        df = pd.read_excel(r"F:\Workspace2016\DataMining\data\2024-07-24_143135_数据导出.xlsx")
-        for docid in df["docid"]:
-            _dict = {document_docid:int(docid),
-                     document_partitionkey:int(docid)%500+1,
-                     }
-            task_queue.put(Document(_dict))
+        # import pandas as pd
+        # df = pd.read_excel(r"F:\Workspace2016\DataMining\data\2024-07-24_143135_数据导出.xlsx")
+        # list_docid = df["docid"]
+        # list_docid = [519497468]
+        # for docid in list_docid:
+        #     _dict = {document_docid:int(docid),
+        #              document_partitionkey:int(docid)%500+1,
+        #              }
+        #     task_queue.put(Document(_dict))
         # for docid in df["docid2"]:
         #     _dict = {document_docid:int(docid),
         #              document_partitionkey:int(docid)%500+1,
@@ -407,7 +409,11 @@ def turn_document_status():
         # item.setValue(document_district,"金湾区",True)
         # item.setValue(document_status,66,True)
         # print(item.getProperties())
-        item.setValue(document_status,1,True)
+        # item.setValue(document_status,1,True)
+        product = item.getProperties().get(document_product)
+        l_product = product.split(",")
+        n_product = ",".join(l_product[:500])
+        item.setValue(document_product,n_product,True)
         item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_docid)))
         pass