Sfoglia il codice sorgente

公告去重和合并的调整;要素提取增加超时

luojiehua 2 anni fa
parent
commit
adab5b785c

+ 139 - 103
BaseDataMaintenance/maintenance/dataflow.py

@@ -2436,6 +2436,7 @@ class Dataflow_dumplicate(Dataflow):
 
                             _dict["confidence"] = confidence
                             _dict["min_counts"] = total_count
+
                             list_data.append(_dict)
                 all_time = time.time()-_time
                 # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
@@ -3494,116 +3495,120 @@ class Dataflow_dumplicate(Dataflow):
         :return:
         '''
 
-        whole_time_start = time.time()
-        set_uuid = set()
-        for _proj in list_projects:
-            _uuid = _proj.get("uuid")
-            if _uuid is not None:
-                set_uuid = set_uuid | set(_uuid.split(","))
-        must_not_q = []
-        for _uuid in list(set_uuid):
-            must_not_q.append(TermQuery("uuid",_uuid))
-
-
-        projects_merge_count = 0
-        projects_check_rule_time = 0
-        projects_update_time = 0
-        projects_query_time = 0
-        projects_prepare_time = 0
-        for _proj in list_projects:
-
-            page_time = _proj.get(project_page_time,"")
-            project_codes = _proj.get(project_project_codes,"")
-            project_name = _proj.get(project_project_name,"")
-            tenderee = _proj.get(project_tenderee,"")
-            agency = _proj.get(project_agency,"")
-            product = _proj.get(project_product,"")
-            sub_project_name = _proj.get(project_sub_project_name,"")
-            bidding_budget = _proj.get(project_bidding_budget,-1)
-            win_tenderer = _proj.get(project_win_tenderer,"")
-            win_bid_price = _proj.get(project_win_bid_price,-1)
-
-            province = _proj.get(project_province,"")
-            city = _proj.get(project_city,"")
-            district = _proj.get(project_district,"")
-
-            page_time_less = timeAdd(page_time,-150)
-            page_time_greater = timeAdd(page_time,120)
-            sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
-            _time = time.time()
-            list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
+        try:
+            whole_time_start = time.time()
+            set_uuid = set()
+            for _proj in list_projects:
+                _uuid = _proj.get("uuid")
+                if _uuid is not None:
+                    set_uuid = set_uuid | set(_uuid.split(","))
+            must_not_q = []
+            for _uuid in list(set_uuid):
+                must_not_q.append(TermQuery("uuid",_uuid))
+
+
+            projects_merge_count = 0
+            projects_check_rule_time = 0
+            projects_update_time = 0
+            projects_query_time = 0
+            projects_prepare_time = 0
+            for _proj in list_projects[:30]:
+
+                page_time = _proj.get(project_page_time,"")
+                project_codes = _proj.get(project_project_codes,"")
+                project_name = _proj.get(project_project_name,"")
+                tenderee = _proj.get(project_tenderee,"")
+                agency = _proj.get(project_agency,"")
+                product = _proj.get(project_product,"")
+                sub_project_name = _proj.get(project_sub_project_name,"")
+                bidding_budget = _proj.get(project_bidding_budget,-1)
+                win_tenderer = _proj.get(project_win_tenderer,"")
+                win_bid_price = _proj.get(project_win_bid_price,-1)
+
+                province = _proj.get(project_province,"")
+                city = _proj.get(project_city,"")
+                district = _proj.get(project_district,"")
+
+                page_time_less = timeAdd(page_time,-150)
+                page_time_greater = timeAdd(page_time,120)
+                sub_project_q = TermQuery(project_sub_project_name,sub_project_name) if sub_project_name.replace("Project","")!="" else None
+                _time = time.time()
+                list_must_query = self.getMerge_rules(page_time,project_codes,project_name,tenderee,agency,product,sub_project_name,bidding_budget,win_tenderer,win_bid_price,province,city,district)
 
 
-            list_merge_data = []
+                list_merge_data = []
 
-            _step = 4
-            _begin = 0
-            must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
-                            ]
+                _step = 4
+                _begin = 0
+                must_queries = [RangeQuery(project_page_time,page_time_less,page_time_greater,True,True),
+                                ]
 
-            #sub_project_name非必要条件
-            # if sub_project_q is not None:
-            #     must_queries.append(sub_project_q)
+                #sub_project_name非必要条件
+                # if sub_project_q is not None:
+                #     must_queries.append(sub_project_q)
 
-            projects_prepare_time += time.time()-_time
-            _time = time.time()
-            while _begin<len(list_must_query):
-                list_should_q = []
-                _limit = 20
-                for must_q,_count in list_must_query[_begin:_begin+_step]:
-                    must_q1 = list(must_q)
-                    must_q1.extend(must_queries)
-                    list_should_q.append(BoolQuery(must_queries=must_q1))
-
-                    # _limit += _count*5
-                _query = BoolQuery(
-                                   should_queries=list_should_q,
-                                   must_not_queries=must_not_q[:100]
-                )
-                # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
-                #                                                                     SearchQuery(_query,limit=_limit),
-                #                                                                     columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
-
-                rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
-                                                                                          SearchQuery(_query,limit=_limit),
-                                                                                          columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
-                list_data = getRow_ots(rows)
-
-                list_merge_data.extend(list_data)
-
-                # print(list_data)
-                for _data in list_data:
-                    must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
-
-                _begin += _step
-            projects_query_time += time.time()-_time
-            #优先匹配招标金额相近的
-            projects_merge_count = len(list_merge_data)
-            list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
-            list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
-            # log(page_time_less+"=="+page_time_greater)
-            # log("list_merge_data:%s"%(str(list_merge_data)))
-            for _data in list_merge_data:
+                projects_prepare_time += time.time()-_time
                 _time = time.time()
-                _check = check_merge_rule(_proj,_data,b_log=b_log)
-                if b_log:
-                    log(str(_check))
-                projects_check_rule_time += time.time()-_time
-                if _check:
+                while _begin<len(list_must_query):
+                    list_should_q = []
+                    _limit = 20
+                    for must_q,_count in list_must_query[_begin:_begin+_step]:
+                        must_q1 = list(must_q)
+                        must_q1.extend(must_queries)
+                        list_should_q.append(BoolQuery(must_queries=must_q1))
+
+                        # _limit += _count*5
+                    _query = BoolQuery(
+                                       should_queries=list_should_q,
+                                       must_not_queries=must_not_q[:100]
+                    )
+                    # rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
+                    #                                                                     SearchQuery(_query,limit=_limit),
+                    #                                                                     columns_to_get=ColumnsToGet(column_names=[project_uuid,project_docids,project_zhao_biao_page_time,project_zhong_biao_page_time,project_page_time,project_area,project_province,project_city,project_district,project_info_type,project_industry,project_qcodes,project_project_name,project_project_code,project_project_codes,project_project_addr,project_tenderee,project_tenderee_addr,project_tenderee_phone,project_tenderee_contact,project_agency,project_agency_phone,project_agency_contact,project_sub_project_name,project_sub_project_code,project_bidding_budget,project_win_tenderer,project_win_bid_price,project_win_tenderer_manager,project_win_tenderer_phone,project_second_tenderer,project_second_bid_price,project_second_tenderer_manager,project_second_tenderer_phone,project_third_tenderer,project_third_bid_price,project_third_tenderer_manager,project_third_tenderer_phone,project_procurement_system,project_bidway,project_dup_data,project_docid_number,project_project_dynamics,project_product,project_moneysource,project_service_time,project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start,project_time_release,project_dup_docid,project_info_source,project_nlp_enterprise,project_nlp_enterprise_attachment],return_type=ColumnReturnType.SPECIFIED))
+
+                    rows,next_token,total_count,is_all_succeed = self.ots_client_merge.search("project2","project2_index_formerge",
+                                                                                              SearchQuery(_query,limit=_limit),
+                                                                                              columns_to_get=ColumnsToGet(column_names=check_columns,return_type=ColumnReturnType.SPECIFIED))
+                    list_data = getRow_ots(rows)
+
+                    list_merge_data.extend(list_data)
+
+                    # print(list_data)
+                    for _data in list_data:
+                        must_not_q.append(TermQuery(project_uuid,_data.get(project_uuid)))
+
+                    _begin += _step
+                projects_query_time += time.time()-_time
+                #优先匹配招标金额相近的
+                projects_merge_count = len(list_merge_data)
+                list_merge_data.sort(key=lambda x:x.get(project_page_time,""))
+                list_merge_data.sort(key=lambda x:x.get(project_bidding_budget,-1))
+                # log(page_time_less+"=="+page_time_greater)
+                # log("list_merge_data:%s"%(str(list_merge_data)))
+                for _data in list_merge_data:
                     _time = time.time()
+                    _check = check_merge_rule(_proj,_data,b_log=b_log)
+                    if b_log:
+                        log(str(_check))
+                    projects_check_rule_time += time.time()-_time
+                    if _check:
+                        _time = time.time()
 
-                    o_proj = Project(_data)
-                    o_proj.fix_columns(self.ots_client,fix_columns,True)
-                    for k in fix_columns:
-                        _data[k] = o_proj.getProperties().get(k)
+                        o_proj = Project(_data)
+                        o_proj.fix_columns(self.ots_client,fix_columns,True)
+                        for k in fix_columns:
+                            _data[k] = o_proj.getProperties().get(k)
 
-                    update_projects_by_project(_data,[_proj])
-                    projects_update_time += time.time()-_time
+                        update_projects_by_project(_data,[_proj])
+                        projects_update_time += time.time()-_time
 
-        whole_time = time.time()-whole_time_start
-        log("merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
+            whole_time = time.time()-whole_time_start
+            log("merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
 
-        return list_projects
+            return list_projects
+        except Exception as e:
+            traceback.print_exc()
+            assert 1==2
 
 
 
@@ -3659,6 +3664,30 @@ class Dataflow_dumplicate(Dataflow):
             log("project_json:%s"%project_json)
         return project_json
 
+    def is_exist_fingerprint(self,final_list,_docid,_fingerprint,table_name):
+        set_fingerprint = set()
+        for _i in range(1,len(final_list)):
+            _dict = final_list[_i]
+            b_docid = _dict[document_tmp_docid]
+            _save = _dict.get(document_tmp_save,0)
+            _status = _dict.get(document_tmp_status,0)
+            if table_name=="document":
+                if _status>=201 and _status<=300:
+                    _save = 1
+            fingerprint_less = _dict.get(document_tmp_fingerprint,"")
+            if b_docid==_docid:
+                pass
+            else:
+                if _save==1:
+                    set_fingerprint.add(fingerprint_less)
+        print("_fingerprint",_fingerprint)
+        print(set_fingerprint)
+        if _fingerprint in set_fingerprint:
+            return True
+        return False
+
+
+
     def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
         try:
             start_time = time.time()
@@ -3697,6 +3726,8 @@ class Dataflow_dumplicate(Dataflow):
             _time = time.time()
             log("%d start final check with length:%d"%(item["docid"],len(base_list)))
             final_list = self.dumplicate_fianl_check(base_list)
+
+            exist_finterprint = self.is_exist_fingerprint(final_list,item.get(document_tmp_docid),item.get(document_tmp_fingerprint),table_name)
             log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
             best_docid = self.get_best_docid(final_list)
 
@@ -3744,7 +3775,12 @@ class Dataflow_dumplicate(Dataflow):
             list_docids = list(dup_docid)
             list_docids.append(best_docid)
             b_log = False if upgrade else True
-            dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log),True)
+
+            if exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0:
+                log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
+                dtmp.setValue(document_tmp_projects,"[]",True)
+            else:
+                dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log),True)
 
             log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
             if upgrade:
@@ -3818,8 +3854,8 @@ class Dataflow_dumplicate(Dataflow):
             for _data in list_data:
                 task_queue.put(_data)
 
-        mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
-        mt.run()
+            mt = MultiThreadHandler(task_queue,fix_doc_handle,None,30)
+            mt.run()
 
 
 
@@ -3930,7 +3966,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(288272156)
+    df_dump.test_dumplicate(275752337)
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()
     # df_dump.delete_projects_by_document(16288036)

+ 1 - 1
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -600,7 +600,7 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
         # _url = self.extract_interfaces[_i]
         _url = self.getExtract_url()
         log("extract_url:%s"%(str(_url)))
-        resp = requests.post(_url,json=json,headers=headers)
+        resp = requests.post(_url,json=json,headers=headers,timeout=5*60)
         return resp
 
 

+ 49 - 0
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -329,6 +329,55 @@ class f_decode_sub_docs_json(BaseUDTF):
                                 columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
         self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
 
+@annotate('string,string -> string,string,string,string,string')
+class f_decode_for_dumplicate(BaseUDTF):
+
+    def __init__(self):
+        import logging
+        import json
+        global json,logging
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+
+    def process(self,sub_docs_json,extractjson):
+        if extractjson is None or extractjson=="":
+            extractjson = "{}"
+        try:
+            _extract = json.loads(extractjson)
+        except Exception as e:
+            _extract = {}
+
+        product = ",".join(_extract.get("product",[]))
+        list_product =  product.split(",")
+
+        project_codes = ",".join(_extract.get("code",[]))
+        list_code = project_codes.split(",")
+
+        if sub_docs_json is not None:
+            list_sub_docs = json.loads(sub_docs_json)
+        else:
+            list_sub_docs = [{}]
+        max_len = max([len(list_product),len(list_code),len(list_sub_docs)])
+
+        for _i in range(max_len):
+            _product = list_product[_i%len(list_product)]
+            _code = list_code[_i%len(list_code)]
+            _subdoc = list_sub_docs[_i%len(list_sub_docs)]
+            win_tenderer = _subdoc.get("win_tenderer","")
+            bidding_budget = _subdoc.get("bidding_budget","0")
+            if float(bidding_budget)==0:
+                bidding_budget = ""
+            else:
+                bidding_budget = str(float(bidding_budget))
+            win_bid_price = _subdoc.get("win_bid_price","0")
+            if float(win_bid_price)==0:
+                win_bid_price = ""
+            else:
+                win_bid_price = str(float(win_bid_price))
+            self.forward(_product,_code,win_tenderer,bidding_budget,win_bid_price)
+
+
+
 @annotate("string->bigint")
 class totimestamp(object):
 

+ 3 - 4
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2540,11 +2540,10 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
 
     _project_name_check = check_project_name_merge(project_name,project_name_to_merge,b_log)
 
-    if _project_name_check==-1:
-        _project_name_check = check_dynamics_title_merge(project_dynamics,project_dynamics_to_merge,b_log)
+    _title_check = check_dynamics_title_merge(project_dynamics,project_dynamics_to_merge,b_log)
 
-    #事件判断--产品和名称、标题只需要满足一
-    if _product_check==-1 and _project_name_check==-1:
+    #事件判断--产品和名称、标题需要满足两个
+    if _project_name_check+_product_check+_title_check<2:
         if return_prob:
             return False,0
         return False

+ 16 - 13
BaseDataMaintenance/model/ots/document_tmp.py

@@ -258,7 +258,8 @@ def turn_document_tmp_status():
 
         bool_query = BoolQuery(
             must_queries=[
-                RangeQuery("status",66,71),
+                TermQuery("fingerprint","md5=2cc044b81ec13acddcc970b71b780365")
+                # RangeQuery("status",66,71),
                 # BoolQuery(should_queries=[
                 #                           # TermQuery("tenderee","山西利民工业有限责任公司"),
                 #                           # MatchPhraseQuery("doctitle","中国电信"),
@@ -271,15 +272,16 @@ def turn_document_tmp_status():
                 #                                  ]
                 # )
             ],
-            # must_not_queries=[ExistsQuery("status"),
-            #                   ExistsQuery("page_time"),
-            #
-            #                   ]
+            must_not_queries=[
+                TermQuery("docid",288599518)
+                # ExistsQuery("status"),
+                # ExistsQuery("page_time"),
+                              ]
         )
 
         rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(["extract_json"],return_type=ColumnReturnType.SPECIFIED))
+                                                                       columns_to_get=ColumnsToGet(["doctitle"],return_type=ColumnReturnType.SPECIFIED))
         list_data = getRow_ots(rows)
         print(total_count)
         # print(list_data)
@@ -290,7 +292,7 @@ def turn_document_tmp_status():
         while next_token:
             rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(["extract_json"],return_type=ColumnReturnType.SPECIFIED))
+                                                                           columns_to_get=ColumnsToGet(["doctitle"],return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
             _count += len(list_data)
             print("%d/%d"%(_count,total_count))
@@ -334,15 +336,16 @@ def turn_document_tmp_status():
 
         #change status
         # item.setValue(document_tmp_docchannel,item.getProperties().get(document_tmp_original_docchannel),True)
-        _extract_json = item.getProperties().get(document_tmp_extract_json,"")
-        _extract_json = _extract_json.replace("\x06", "").replace("\x05", "").replace("\x07", "").replace('\\', '')
-        item.setValue(document_tmp_extract_json,_extract_json,True)
-        json.loads(_extract_json)
+        # _extract_json = item.getProperties().get(document_tmp_extract_json,"")
+        # _extract_json = _extract_json.replace("\x06", "").replace("\x05", "").replace("\x07", "").replace('\\', '')
+        # item.setValue(document_tmp_extract_json,_extract_json,True)
+        # json.loads(_extract_json)
         # item.setValue(document_tmp_status,71,True)
         # item.setValue(document_tmp_save,1,True)
-        item.update_row(ots_client)
+        print(item.getProperties())
+        # item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
-        # item.delete_row(ots_client)
+        item.delete_row(ots_client)
         pass
 
 

+ 1 - 1
BaseDataMaintenance/start_dataflow_dumplicate.py

@@ -7,5 +7,5 @@ from BaseDataMaintenance.maintenance.dataflow import *
 
 if __name__ == '__main__':
     # flow = Dataflow()
-    flow = Dataflow_dumplicate(start_delete_listener=True)
+    flow = Dataflow_dumplicate(start_delete_listener=False)
     flow.start_flow_dumplicate()