luojiehua 1 год назад
Родитель
Сommit
c907fb5599

+ 106 - 7
BaseDataMaintenance/maintenance/dataflow.py

@@ -2187,6 +2187,8 @@ class Dataflow_dumplicate(Dataflow):
         self.fix_doc_docid = None
         self.bdm = BaseDataMonitor()
 
+        self.check_rule = 1
+
         if start_delete_listener:
             self.delete_comsumer_counts = 2
 
@@ -2325,7 +2327,10 @@ class Dataflow_dumplicate(Dataflow):
         if web_source_no_less==web_source_no_greater=="17397-3":
             hard_level=2
 
-        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level)
+        if self.check_rule==1:
+            _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
+        else:
+            _prob = check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
 
         pagetime_stamp_less = getTimeStamp(page_time_less)
         pagetime_stamp_greater = getTimeStamp(page_time_greater)
@@ -4090,13 +4095,16 @@ class Dataflow_dumplicate(Dataflow):
                 if table_name=="document_tmp":
                     self.changeSaveStatus(remove_list)
                     self.changeSaveStatus(list_merge_dump)
+            else:
+                return list_docids
+
 
 
-            log("dumplicate end on:%s"%(str(item.get(document_tmp_docid))))
         except Exception as e:
             traceback.print_exc()
             log("dumplicate error on:%s"%(str(item.get(document_tmp_docid))))
         finally:
+            log("dumplicate end on:%s"%(str(item.get(document_tmp_docid))))
             self.queue_dumplicate_processed.put(item.get(document_tmp_docid))
 
 
@@ -4281,9 +4289,99 @@ class Dataflow_dumplicate(Dataflow):
             return best_docid
         return None
 
+def compare_dumplicate_check():
+
+    import pandas as pd
+    df_dump = Dataflow_dumplicate(start_delete_listener=False)
+    test_count = 1000
+
+    # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
+    columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]
+    bool_query = BoolQuery(must_queries=[
+        RangeQuery("docid",400453395,400463395)
+    ])
+    rows,next_token,total_count,is_all_succeed = df_dump.ots_client.search("document","document_index",
+                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=10,get_total_count=True),
+                                                                        ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+    log("flow_dumplicate producer total_count:%d"%total_count)
+
+    list_dict = getRow_ots(rows)
+    while 1:
+        if not next_token or len(list_dict)>=test_count:
+            break
+        rows,next_token,total_count,is_all_succeed = df_dump.ots_client.search("document","document_index",
+                                                                               SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
+                                                                               ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        list_dict.extend(getRow_ots(rows))
+
+
+    def _handle1(_item,result_queue):
+        try:
+
+            list_docid = df_dump.dumplicate_comsumer_handle(_item,None,df_dump.ots_client,get_all=True,upgrade=False)
+            _item["before"] = list_docid
+        except Exception as e:
+            pass
+
+    dump_result = {}
+    for item in list_dict:
+        dump_result[item["docid"]] = {}
+    task_queue = Queue()
+    list_item = []
+    for item in list_dict:
+        _item = {}
+        _item.update(item)
+        list_item.append(_item)
+        task_queue.put(_item)
+
+    mt = MultiThreadHandler(task_queue,_handle1,None,30)
+    mt.run()
+
+    for item in list_item:
+        dump_result[item["docid"]]["before"] = item.get("before")
+
+
+    df_dump.check_rule = 2
+    def _handle2(_item,result_queue):
+        try:
+            list_docid1 = df_dump.dumplicate_comsumer_handle(_item,None,df_dump.ots_client,get_all=True,upgrade=False)
+            _item["after"] = list_docid1
+        except Exception as e:
+            pass
+
+    task_queue = Queue()
+    list_item = []
+    for item in list_dict:
+        _item = {}
+        _item.update(item)
+        list_item.append(_item)
+        task_queue.put(_item)
+
+    mt = MultiThreadHandler(task_queue,_handle2,None,30)
+    mt.run()
+
+    for item in list_item:
+        dump_result[item["docid"]]["after"] = item.get("after")
+
+    df_data = {"docid":[],
+               "before":[],
+               "after":[],
+               "before-after":[],
+               "after-before":[]}
+    for docid,_d in dump_result.items():
+        df_data["docid"].append(docid)
+        before = _d.get("before",[])
+        after = _d.get("after",[])
+        df_data["before"].append(str(before))
+        df_data["after"].append(str(after))
+        df_data["before-after"].append(str(set(before)-set(after)))
+        df_data["after-before"].append(str(set(after)-set(before)))
+    df = pd.DataFrame(df_data,columns=["docid","before","after","before-after","after-before"])
+    df.to_excel("compare_dump.xlsx")
 
 
 if __name__ == '__main__':
+    a = time.time()
     # df = Dataflow()
     # df.flow_init()
     # df.flow_test()
@@ -4296,11 +4394,12 @@ if __name__ == '__main__':
 
     # download_attachment()
     # test_attachment_interface()
-    df_dump = Dataflow_dumplicate(start_delete_listener=False)
-    # df_dump.start_flow_dumplicate()
-    a = time.time()
-    df_dump.test_dumplicate(400929607
-                            )
+    # df_dump = Dataflow_dumplicate(start_delete_listener=False)
+    # # df_dump.start_flow_dumplicate()
+
+    # df_dump.test_dumplicate(400929607
+    #                         )
+    compare_dumplicate_check()
     # df_dump.test_merge([242672995,235300429,240009762
     #                     ],[243240169,])
     # df_dump.flow_remove_project_tmp()

+ 187 - 1
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -1036,7 +1036,193 @@ def check_time(json_time_less,json_time_greater):
                         return False
     return True
 
-def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1):
+def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater=""):
+    if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
+        return 1
+    if isinstance(project_codes_less,str):
+        project_codes_less = [a for a in project_codes_less.split(",") if a!=""]
+    elif project_codes_less is None:
+        project_codes_less = []
+
+    if isinstance(project_codes_greater,str):
+        project_codes_greater = [a for a in project_codes_greater.split(",") if a!=""]
+    elif project_codes_greater is None:
+        project_codes_greater = []
+
+
+    same_count = 0
+    all_count = 8
+    if len(set(project_codes_less) & set(project_codes_greater))>0:
+        same_count += 1
+    if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
+        same_count += 1
+    if getLength(agency_less)>0 and agency_less==agency_greater:
+        same_count += 1
+    if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
+        same_count += 1
+    if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
+        same_count += 1
+    if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
+        same_count += 1
+    if getLength(project_name_less)>0 and project_name_less==project_name_greater:
+        same_count += 1
+    if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
+        same_count += 1
+    base_prob = 0
+    if min_counts<3:
+        base_prob = 0.9
+    elif min_counts<5:
+        base_prob = 0.8
+    elif min_counts<8:
+        base_prob = 0.7
+    else:
+        base_prob = 0.6
+    _prob = base_prob*same_count/all_count
+    if min(extract_count_less,extract_count_greater)<=3:
+        if _prob<0.1:
+            _prob = 0.15
+        if province_less!=province_greater:
+            return 0
+    if _prob<0.1:
+        return _prob
+
+    check_result = {"pass":1}
+    if docchannel_less in (51,102,103,104,115,116,117):
+        if doctitle_refine_less!=doctitle_refine_greater:
+            if page_time_less!=page_time_greater:
+                check_result["docchannel"] = 0
+                check_result["pass"] = 0
+            else:
+                check_result["docchannel"] = 2
+    if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
+        check_result["doctitle"] = 0
+        check_result["pass"] = 0
+        if b_log:
+            logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
+    else:
+        check_result["doctitle"] = 2
+
+    #added check
+    if not check_codes(project_codes_less,project_codes_greater):
+        check_result["code"] = 0
+        check_result["pass"] = 0
+        if b_log:
+            logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
+    else:
+        if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
+            check_result["code"] = 2
+        else:
+            check_result["code"] = 1
+
+
+    if not check_product(product_less,product_greater):
+        check_result["product"] = 0
+        check_result["pass"] = 0
+        if b_log:
+            logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
+    else:
+        if getLength(product_less)>0 and getLength(product_greater)>0:
+            check_result["product"] = 2
+        else:
+            check_result["product"] = 1
+
+    if not check_demand():
+        check_result["pass"] = 0
+
+    if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
+                        tenderee_less,tenderee_greater,
+                        agency_less,agency_greater,
+                        win_tenderer_less,win_tenderer_greater):
+        check_result["entity"] = 0
+        check_result["pass"] = 0
+        if b_log:
+            logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
+    else:
+        if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
+            check_result["entity"] = 2
+        elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
+            check_result["entity"] = 2
+        else:
+            check_result["entity"] = 1
+
+    if not check_money(bidding_budget_less,bidding_budget_greater,
+                       win_bid_price_less,win_bid_price_greater):
+        if b_log:
+            logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
+        check_result["money"] = 0
+        check_result["pass"] = 0
+    else:
+        if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
+            check_result["money"] = 2
+        elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
+            check_result["money"] = 2
+        else:
+            check_result["money"] = 1
+
+    #added check
+    if not check_package(package_less,package_greater):
+        if b_log:
+            logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
+        check_result["package"] = 0
+        check_result["pass"] = 0
+    else:
+        if getLength(package_less)>0 and getLength(package_greater)>0:
+            check_result["package"] = 2
+        else:
+            check_result["package"] = 1
+
+    #added check
+    if not check_time(json_time_less,json_time_greater):
+        if b_log:
+            logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
+            if isinstance(json_time_less,dict):
+                time_less = json_time_less
+            else:
+                time_less = json.loads(json_time_less)
+            if isinstance(json_time_greater,dict):
+                time_greater = json_time_greater
+            else:
+                time_greater = json.loads(json_time_greater)
+            for k,v in time_less.items():
+                if getLength(v)>0:
+                    v1 = time_greater.get(k,"")
+                    if getLength(v1)>0:
+                        if v!=v1:
+                            logging.info("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
+
+        check_result["time"] = 0
+        check_result["pass"] = 0
+    else:
+        if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
+            check_result["time"] = 2
+        else:
+            check_result["time"] = 1
+
+    if hard_level==2 and check_result["product"]<=1:
+        return 0
+    if check_result.get("pass",0)==0:
+        if b_log:
+            logging.info(str(check_result))
+
+        if check_result.get("money",1)==0:
+            return 0
+
+        if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
+            return _prob
+        else:
+            return 0
+        if check_result.get("time",1)==0:
+            return 0
+    return _prob
+
+def check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater=""):
+
+    if web_source_no_less==web_source_no_greater:
+        if fingerprint_less==fingerprint_greater:
+            return 1
+        else:
+            return 0
+
     if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
         return 1
     if isinstance(project_codes_less,str):

+ 29 - 0
BaseDataMaintenance/model/ots/designed_project.py

@@ -56,6 +56,35 @@ class designed_project(BaseModel):
                 _list.append((_key,_v))
         return _list
 
+    def update_row(self,ots_client,retrytimes=3):
+        primary_key = self.getPrimaryKey_turple()
+        update_of_attribute_columns = {
+            'PUT' : self.getAttribute_turple()
+        }
+        row = Row(primary_key,update_of_attribute_columns)
+        condition = Condition('IGNORE')
+        for i in range(retrytimes):
+            try:
+                if self.exists_row(ots_client):
+                    self.delete_row(ots_client)
+                consumed, return_row = ots_client.update_row(self.table_name, row, condition)
+                return True
+            # 客户端异常,一般为参数错误或者网络异常。
+            except OTSClientError as e:
+                traceback.print_exc()
+                log("update row failed, http_status:%s, error_message:%s" % (str(e.get_http_status()), e.get_error_message()))
+                # raise e
+            # 服务端异常,一般为参数错误或者流控错误。
+            except OTSServiceError as e:
+                traceback.print_exc()
+                log("update row failed, http_status:%s, error_code:%s, error_message:%s, request_id:%s" % (str(e.get_http_status()), e.get_error_code(), e.get_error_message(), e.get_request_id()))
+                # raise e
+            except Exception as e:
+                traceback.print_exc()
+                pass
+                # raise e
+        return False
+        # log ('update succeed, consume %s write cu.' % consumed.write)
 
     def update_project(self,ots_client):
         docids = self.__dict__.get("docids","")

Разница между файлами не показана из-за своего большого размера
+ 20 - 14
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py


Некоторые файлы не были показаны из-за большого количества измененных файлов