Browse Source

Merge remote-tracking branch 'origin/master'

fangjiasheng 1 năm trước cách đây
mục cha
commit
69680f6355

+ 9 - 5
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -524,7 +524,8 @@ class BaseDataMonitor():
         last_date = timeAdd(current_date,-1,"%Y-%m-%d")
         check_count = 20000
         query = BoolQuery(must_queries=[
-            RangeQuery("status",201,301)
+            RangeQuery("status",201,301),
+            RangeQuery("docchannel",0,300)
         ])
         list_doc = []
         queue_docid = Queue()
@@ -566,14 +567,16 @@ class BaseDataMonitor():
             if len(_doc.get("projects",[]))>=2:
                 multi_count += 1
                 list_multi.append(str(_doc.get("docid")))
-            if _doc.get("docchannel") in (101,118,119,120):
+            if _doc.get("docchannel") in (101,118,119,120,121,122):
                 zhongbiao_count += 1
                 if len(_doc.get("projects",[]))==1:
                     _project = _doc.get("projects")[0]
                     if _project.get("zhao_biao_page_time","")!="":
                         zhongbiao_find_zhaobiao += 1
 
-        if not_find_count>0 or multi_count>0 or zhongbiao_find_zhaobiao/zhongbiao_count<0.8:
+        _ratio = zhongbiao_find_zhaobiao/zhongbiao_count if zhongbiao_count>0 else 0
+
+        if not_find_count>0 or multi_count>0 or _ratio<0.8:
             if not_find_count>0 or multi_count>0:
                 current_time = getCurrent_date(format="%Y-%m-%d_%H%M%S")
                 logname = os.path.join(self.current_path,"log","%s.log"%current_time)
@@ -581,7 +584,7 @@ class BaseDataMonitor():
                     f.write(",".join(list_notfind))
                     f.write("\n")
                     f.write(",".join(list_multi))
-                _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
+                _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,_ratio)
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
             else:
@@ -616,10 +619,11 @@ class BaseDataMonitor():
         # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
         scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
         scheduler.add_job(self.monitor_preproject,"cron",hour="8")
-        scheduler.add_job(self.monitor_merge,"cron",hour="*/1")
+        scheduler.add_job(self.monitor_merge,"cron",minute="*/30")
         scheduler.add_job(self.monitor_init,"cron",hour="*/3")
         scheduler.start()
 
+
     def start_attach_monitor(self):
         #附件监控
         scheduler = BlockingScheduler()

+ 130 - 83
BaseDataMaintenance/maintenance/dataflow.py

@@ -363,9 +363,12 @@ class Dataflow():
                     list_must_queries.append(BoolQuery(should_queries=l_s))
             elif k in set_nested:
                 _v = v
-                if k!="" and k=="bidding_budget" or k=="win_bid_price":
-                    _v = float(_v)
-                    list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
+                if k!="":
+                    if k=="bidding_budget" or k=="win_bid_price":
+                        _v = float(_v)
+                        list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
+                    else:
+                        list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
             elif k in set_term:
                 list_must_queries.append(TermQuery(k,v))
             elif k in set_phrase:
@@ -384,9 +387,12 @@ class Dataflow():
                     list_must_no_queries.append(BoolQuery(should_queries=l_s))
             elif k in set_nested:
                 _v = v
-                if k!="" and k=="bidding_budget" or k=="win_bid_price":
-                    _v = float(_v)
-                    list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
+                if k!="":
+                    if k=="bidding_budget" or k=="win_bid_price":
+                        _v = float(_v)
+                        list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
+                    else:
+                        list_must_no_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
             elif k in set_term:
                 list_must_no_queries.append(TermQuery(k,v))
             elif k in set_range:
@@ -1593,7 +1599,7 @@ class Dataflow():
 
         def producer():
             current_date = getCurrent_date("%Y-%m-%d")
-            tmp_date = timeAdd(current_date,-4)
+            tmp_date = timeAdd(current_date,-10)
             bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True),
                                                  RangeQuery(document_tmp_crtime,range_to="%s 00:00:00"%(tmp_date))])
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
@@ -2244,8 +2250,7 @@ class Dataflow_dumplicate(Dataflow):
                 continue
             for _j in range(min(_i,10)):
                 _dict2 = base_list[_j]
-                _prob = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
-                print("_prob:",_prob)
+                _prob,day_dis = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
                 if _prob<=0.1:
                     _pass = False
                     break
@@ -2320,7 +2325,19 @@ class Dataflow_dumplicate(Dataflow):
         if web_source_no_less==web_source_no_greater=="17397-3":
             hard_level=2
 
-        return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level)
+        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level)
+
+        pagetime_stamp_less = getTimeStamp(page_time_less)
+        pagetime_stamp_greater = getTimeStamp(page_time_greater)
+        
+        day_dis = abs(pagetime_stamp_greater-pagetime_stamp_less)//86400
+        if day_dis>7:
+            _prob = 0
+        elif day_dis>3:
+            if _prob<0.4:
+                _prob = 0
+
+        return _prob,day_dis
 
 
     def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
@@ -2525,7 +2542,7 @@ class Dataflow_dumplicate(Dataflow):
                 return 0
         return _prob
 
-    def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
+    def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
 
         for _ in range(retry_times):
             try:
@@ -2548,14 +2565,13 @@ class Dataflow_dumplicate(Dataflow):
                     else:
                         if _docid!=item.get(document_tmp_docid):
                             _time1 = time.time()
-                            confidence = self.dumplicate_check(item,_dict,total_count,b_log=False)
+                            confidence,day_dis = self.dumplicate_check(item,_dict,total_count,b_log=b_log)
                             check_time+= time.time()-_time1
 
                             _dict["confidence"] = confidence
                             _dict["min_counts"] = total_count
 
-                            if not confidence<0.1:
-                                list_data.append(_dict)
+                            list_data.append(_dict)
                 all_time = time.time()-_time
                 # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
                 return list_data
@@ -2563,12 +2579,15 @@ class Dataflow_dumplicate(Dataflow):
                 traceback.print_exc()
         return []
 
-    def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
-        list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
+    def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count],b_log=False):
+        list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns,b_log=b_log)
         for _dict in list_dict:
             _docid = _dict.get(document_tmp_docid)
             confidence = _dict["confidence"]
-            print("confidence",_docid,confidence)
+
+            if b_log:
+                log("confidence %d %.3f"%(_docid,confidence))
+
             if confidence>0.1:
                 if _docid not in set_docid:
                     base_list.append(_dict)
@@ -2581,32 +2600,36 @@ class Dataflow_dumplicate(Dataflow):
                 return
         _dict.update(base_dict)
         if b_log:
-            log(str(_dict))
+            log("rule dict:"+str(_dict))
         _query = self.generate_dumplicate_query(_dict,must_not_dict)
         _rule = {"confidence":confidence,
                  "item":item,
                  "query":_query,
                  "singleNum_keys":[],
                  "contain_keys":[],
-                 "multiNum_keys":[]}
+                 "multiNum_keys":[],
+                 "_dict":_dict}
         list_rules.append(_rule)
 
-    def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False):
+    def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False,day_dis=7):
         docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
         current_date = getCurrent_date("%Y-%m-%d")
         if page_time=='':
             page_time = current_date
 
+        two_day_dict = {"page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]}
+
         if page_time>=timeAdd(current_date,-2):
             table_name = "document_tmp"
             table_index = "document_tmp_index"
             base_dict = {
                 "docchannel":item.get("docchannel",52),
                 "status":[status_from[0]],
-                "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
+                "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
             }
             must_not_dict = {"save":0,"docid":item.get("docid")}
             doctitle_refine_name = "doctitle_refine"
+
         else:
             table_name = "document"
             table_index = "document_index"
@@ -2617,7 +2640,7 @@ class Dataflow_dumplicate(Dataflow):
             base_dict = {
                 "docchannel":item["docchannel"],
                 "status":_status,
-                "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
+                "page_time":[timeAdd(page_time,-day_dis),timeAdd(page_time,day_dis)]
             }
             must_not_dict = {"docid":item.get("docid")}
             doctitle_refine_name = "doctitle"
@@ -2673,82 +2696,95 @@ class Dataflow_dumplicate(Dataflow):
                  "bidding_budget":bidding_budget}
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        confidence=85
-        _dict = {"tenderee":tenderee,
-                 "agency":agency
-                 }
-        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
         _dict = {"tenderee":tenderee,
                  "project_codes":project_code
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
         _dict = {"tenderee":tenderee,
-                 "project_name":project_name
+                 "win_bid_price":win_bid_price
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        if getLength(product)>0:
-            l_p = product.split(",")
-            _dict = {"tenderee":tenderee,
-                     "product":l_p[0]
-                     }
-            self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+        _dict = {"agency":agency,
+                 "project_codes":project_code
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"tenderee":tenderee,
-                 "win_tenderer":win_tenderer
+        _dict = {"win_tenderer":win_tenderer,
+                 "bidding_budget":bidding_budget
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"tenderee":tenderee,
+        _dict = {"project_codes":project_code,
                  "win_bid_price":win_bid_price
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"tenderee":tenderee,
+        _dict = {"project_codes":project_code,
                  "bidding_budget":bidding_budget
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"tenderee":tenderee,
+        _dict = {"project_codes":project_code,
                  doctitle_refine_name:doctitle_refine
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"agency":agency,
-                 "project_codes":project_code
+        _dict = {"tenderee":tenderee,
+                 "bidding_budget":bidding_budget
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"agency":agency,
-                 "project_name":project_name
+        _dict = {"project_codes":project_code,
+                 "win_tenderer":win_tenderer
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"project_codes":project_code,
+        base_dict.update(two_day_dict)
+
+        confidence=85
+        _dict = {"tenderee":tenderee,
+                 "agency":agency
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
+        _dict = {"tenderee":tenderee,
                  "project_name":project_name
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"project_codes":project_code,
+        if getLength(product)>0:
+            l_p = product.split(",")
+            _dict = {"tenderee":tenderee,
+                     "product":l_p[0]
+                     }
+            self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
+        _dict = {"tenderee":tenderee,
                  "win_tenderer":win_tenderer
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"project_codes":project_code,
-                 "win_bid_price":win_bid_price
+
+        _dict = {"tenderee":tenderee,
+                 doctitle_refine_name:doctitle_refine
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"project_codes":project_code,
-                 "bidding_budget":bidding_budget
+
+        _dict = {"agency":agency,
+                 "project_name":project_name
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
         _dict = {"project_codes":project_code,
-                 doctitle_refine_name:doctitle_refine
+                 "project_name":project_name
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
+
+
         _dict = {"project_name":project_name,
                  "win_tenderer":win_tenderer
                  }
@@ -2774,10 +2810,6 @@ class Dataflow_dumplicate(Dataflow):
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
-        _dict = {"win_tenderer":win_tenderer,
-                 "bidding_budget":bidding_budget
-                 }
-        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
         _dict = {"win_tenderer":win_tenderer,
                  doctitle_refine_name:doctitle_refine
@@ -2789,6 +2821,11 @@ class Dataflow_dumplicate(Dataflow):
                  }
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
+        confidence=80
+        _dict = {"project_codes":project_code}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
+
         _dict = {"win_bid_price":win_bid_price,
                  doctitle_refine_name:doctitle_refine
                  }
@@ -2802,13 +2839,13 @@ class Dataflow_dumplicate(Dataflow):
         confidence=80
         _dict = {doctitle_refine_name:doctitle_refine}
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
-        _dict = {"project_codes":project_code}
-        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
+
 
         confidence=70
         _dict = {"project_name":project_name}
         self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item,b_log=to_log)
 
+
         return list_rules,table_name,table_index
 
     def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]):
@@ -3427,7 +3464,7 @@ class Dataflow_dumplicate(Dataflow):
         :param list_docids:
         :return:
         '''
-        print("==",list_docids)
+        log("search_projects_with_document %s"%str(list_docids))
         list_should_q = []
         for _docid in list_docids:
             list_should_q.append(TermQuery("docids",_docid))
@@ -3842,10 +3879,14 @@ class Dataflow_dumplicate(Dataflow):
             list_docids = []
             _docid = item.get(document_tmp_docid)
             list_docids.append(_docid)
+            if save==0:
+                dup_docid.insert(0,_docid)
             if isinstance(dup_docid,list):
                 list_docids.extend(dup_docid)
             list_docids = [a for a in list_docids if a is not None]
 
+
+
             _time = time.time()
             list_projects = self.search_projects_with_document(list_docids)
             # log("search projects takes:%.3f"%(time.time()-_time))
@@ -3872,6 +3913,9 @@ class Dataflow_dumplicate(Dataflow):
             list_merge_dump = dumplicate_document_in_merge(list_projects,dup_docid[:-1])
             # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
 
+            if list_merge_dump is None:
+                list_projects = []
+
             _time = time.time()
             project_json = to_project_json(list_projects)
             # log("json projects takes:%.3f"%(time.time()-_time))
@@ -3924,6 +3968,7 @@ class Dataflow_dumplicate(Dataflow):
     def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
         try:
             start_time = time.time()
+            b_log = False if upgrade else True
             self.post_extract(item)
 
 
@@ -3931,14 +3976,15 @@ class Dataflow_dumplicate(Dataflow):
             base_list = []
             set_docid = set()
 
-            list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=False)
+            list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=b_log)
             # print("len_rules",len(list_rules),table_name,table_index)
             list_rules.sort(key=lambda x:x["confidence"],reverse=True)
+
+            log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
             list_rules = list_rules[:30]
             _i = 0
             step = 5
 
-            print("here 1")
 
             item["confidence"] = 999
             if item.get(document_tmp_docid) not in set_docid:
@@ -3951,17 +3997,17 @@ class Dataflow_dumplicate(Dataflow):
                     must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
                 _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
                                    must_not_queries=must_not_q)
+
                 _rule = list_rules[_i]
                 confidence = _rule["confidence"]
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle])
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle],b_log=b_log)
                 _i += step
 
 
-            print("here 2")
-            b_log = False if upgrade else True
+
             _time = time.time()
             # log("%d start final check with length:%d"%(item["docid"],len(base_list)))
             final_list = self.dumplicate_fianl_check(base_list,b_log)
@@ -4125,26 +4171,27 @@ class Dataflow_dumplicate(Dataflow):
         schedule.start()
 
     def changeSaveStatus(self,list_dict):
-        for _dict in list_dict:
-            if isinstance(_dict,dict):
-                if _dict.get(document_tmp_save,1)==1:
-                    _d = {"partitionkey":_dict["partitionkey"],
-                          "docid":_dict["docid"],
+        if list_dict is not None:
+            for _dict in list_dict:
+                if isinstance(_dict,dict):
+                    if _dict.get(document_tmp_save,1)==1:
+                        _d = {"partitionkey":_dict["partitionkey"],
+                              "docid":_dict["docid"],
+                              document_tmp_save:0
+                              }
+                        _d_tmp = Document_tmp(_d)
+                        if _d_tmp.exists_row(self.ots_client):
+                            _d_tmp.update_row(self.ots_client)
+                elif isinstance(_dict,int):
+                    _d = {"partitionkey":_dict%500+1,
+                          "docid":_dict,
                           document_tmp_save:0
                           }
                     _d_tmp = Document_tmp(_d)
-                    if _d_tmp.exists_row(self.ots_client):
-                        _d_tmp.update_row(self.ots_client)
-            elif isinstance(_dict,int):
-                _d = {"partitionkey":_dict%500+1,
-                      "docid":_dict,
-                      document_tmp_save:0
-                      }
-                _d_tmp = Document_tmp(_d)
-                if _d_tmp.fix_columns(self.ots_client,["status"],True):
-                    if _d_tmp.getProperties().get("status")==1:
-                        _d_tmp.setValue("status",0,True)
-                        _d_tmp.update_row(self.ots_client)
+                    if _d_tmp.fix_columns(self.ots_client,["status"],True):
+                        if _d_tmp.getProperties().get("status")==1:
+                            _d_tmp.setValue("status",0,True)
+                            _d_tmp.update_row(self.ots_client)
 
 
 
@@ -4165,7 +4212,7 @@ class Dataflow_dumplicate(Dataflow):
         list_dict = getRow_ots(rows)
 
         for item in list_dict:
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
             return
 
     def test_merge(self,list_docid_less,list_docid_greater):
@@ -4252,10 +4299,10 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(397656324
+    df_dump.test_dumplicate(400929607
                             )
-    # df_dump.test_merge([385521167
-    #                     ],[385521113])
+    # df_dump.test_merge([242672995,235300429,240009762
+    #                     ],[243240169,])
     # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()

+ 9 - 0
BaseDataMaintenance/maintenance/product/htmlparser.py

@@ -1080,6 +1080,15 @@ def getBestProductText(list_result,_product,products):
         if _check:
             return _result
 
+def format_text(_result):
+    list_result = re.split("\r|\n",_result)
+    _result = ""
+    for _r in list_result:
+        if len(_r)>0:
+            _result+="%s\n"%(_r)
+    _result = '<div style="white-space:pre">%s</div>'%(_result)
+    return _result
+
 def extract_product_parameters(list_data,_product):
 
     list_result = []

+ 139 - 67
BaseDataMaintenance/maintenance/product/product_parameter.py

@@ -9,6 +9,7 @@ from multiprocessing import Queue as PQueue
 from multiprocessing import Process
 from BaseDataMaintenance.model.ots.document_product import *
 from BaseDataMaintenance.model.ots.attachment import *
+from BaseDataMaintenance.model.ots.document import *
 from BaseDataMaintenance.common.Utils import *
 from BaseDataMaintenance.common.ossUtils import *
 from BaseDataMaintenance.maintenance.product.htmlparser import *
@@ -23,6 +24,9 @@ parameter_status_process_failed = 2
 parameter_status_process_jump = 3
 parameter_status_not_found = 4
 
+parameter_status_to_process_his = 100
+
+
 import redis
 
 from BaseDataMaintenance.java.MQInfo import getAllQueueSize,getQueueSize
@@ -32,6 +36,7 @@ class Product_Attachment_Processor():
     def __init__(self,):
         self.ots_client = getConnect_ots()
         self.product_attachment_queue = PQueue()
+        self.product_attachment_processed_queue = PQueue()
         self.product_attachment_queue_size = 50
         self.set_product_attachment = set()
         self.attachment_hub_url = "https://attachment-hub.oss-cn-hangzhou.aliyuncs.com/"
@@ -49,21 +54,55 @@ class Product_Attachment_Processor():
         self.download_path = "%s/%s"%(self.current_path,"download")
         self.test_url="http://192.168.2.102:15011/convert"
 
+
+    def getAttachments(self,docid):
+
+        list_atta = []
+        partitionkey = docid%500+1
+        doc = Document({document_partitionkey:partitionkey,
+                        document_docid:docid})
+        doc.fix_columns(self.ots_client,[document_attachment_path],True)
+
+        page_attachments = doc.getProperties().get(document_attachment_path)
+        if page_attachments is not None and page_attachments!="":
+            attachments = json.loads(page_attachments)
+            for _a in attachments:
+                _filemd5 = _a.get(document_attachment_path_filemd5)
+                _da = {attachment_filemd5:_filemd5}
+                _attach = attachment(_da)
+                if _attach.fix_columns(ots_client,[attachment_classification,attachment_filetype],True):
+                    _da[attachment_classification] = _attach.getProperties().get(attachment_classification)
+                    _da[attachment_filetype] = _attach.getProperties().get(attachment_filetype)
+                    list_atta.append(_da)
+        return json.dumps(list_atta,ensure_ascii=False)
+
+
     def process_parameters_producer(self,):
         attachment_size = getQueueSize("dataflow_attachment")
         if attachment_size<100:
+            while 1:
+                try:
+                    _id = self.product_attachment_processed_queue.get(False)
+                    if _id in self.set_product_attachment:
+                        self.set_product_attachment.remove(_id)
+                except Exception as e:
+                    break
 
             _qsize = self.product_attachment_queue.qsize()
             log("product_attachment_queue %d"%(_qsize))
             if _qsize>self.product_attachment_queue_size/3:
                 return
-            bool_query = BoolQuery(must_queries=[
-                TermQuery("parameter_status",parameter_status_to_process)
+            bool_query = BoolQuery(should_queries=[
+                # TermQuery(DOCUMENT_PRODUCT_DOCID,305253400)
+                TermQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process),
+                TermQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_to_process_his),
+                BoolQuery(must_not_queries=[ExistsQuery(DOCUMENT_PRODUCT_PARAMETER_STATUS)])
+
             ])
             list_id = []
             dict_docid_list = {}
             rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
-                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
+                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",sort_order=SortOrder.DESC)]),limit=100,get_total_count=True),
                                                                                 ColumnsToGet([DOCUMENT_PRODUCT_ATTACHMENTS,DOCUMENT_PRODUCT_NAME,DOCUMENT_PRODUCT_ORIGINAL_NAME,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
 
             list_data = getRow_ots(rows)
@@ -73,6 +112,7 @@ class Product_Attachment_Processor():
                 list_id.append(_id)
                 if _id in self.set_product_attachment:
                     continue
+                self.set_product_attachment.add(_id)
                 docid = data.get(DOCUMENT_PRODUCT_DOCID)
                 if docid not in dict_docid_list:
                     dict_docid_list[docid] = []
@@ -91,6 +131,7 @@ class Product_Attachment_Processor():
                     list_id.append(_id)
                     if _id in self.set_product_attachment:
                         continue
+                    self.set_product_attachment.add(_id)
                     docid = data.get(DOCUMENT_PRODUCT_DOCID)
                     if docid not in dict_docid_list:
                         dict_docid_list[docid] = []
@@ -98,10 +139,14 @@ class Product_Attachment_Processor():
 
                     _count += 1
             for k,v in dict_docid_list.items():
+                if v[0].get(DOCUMENT_PRODUCT_ATTACHMENTS) is None:
+                    _attachments = self.getAttachments(v[0].get(DOCUMENT_PRODUCT_DOCID))
+                    for _v in v:
+                        _v[DOCUMENT_PRODUCT_ATTACHMENTS] = _attachments
                 self.product_attachment_queue.put(v)
             _qsize = self.product_attachment_queue.qsize()
             log("after product_attachment_queue %d"%(_qsize))
-            self.set_product_attachment = set(list_id)
+            # self.set_product_attachment = set(list_id[-20000:])
 
     def get_whole_html(self,_filemd5):
         atta = attachment({attachment_filemd5:_filemd5})
@@ -185,70 +230,90 @@ class Product_Attachment_Processor():
 
     def process_parameters_handler(self,list_item,result_queue):
         for item in list_item:
-            attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
-            product_name = item.get(DOCUMENT_PRODUCT_NAME)
-            product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
-            list_product = []
-            log("processing name:%s original_name:%s attachments:%s"%(product_name,product_original_name,attachments))
-            if product_original_name is not None:
-                _l = product_original_name.split("_")
-                _l.reverse()
-                list_product.extend(_l)
-            if product_name is not None:
-                list_product.append(product_name)
-            list_product = list(set(list_product))
-            dp = Document_product(item)
-            if attachments is None or attachments=="" or len(list_product)==0:
-                dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
-                dp.update_row(self.ots_client)
-                return
-            list_attachment = json.loads(attachments)
-            list_attachment.sort(key=lambda x:0 if x.get("classification")=="招标文件" else 1 if x.get("classification")=="采购清单" else 2)
-            list_filemd5 = [a.get("filemd5","") for a in list_attachment]
-            _find = False
-            _success = False
-            list_text = []
-            for _filemd5 in list_filemd5:
-                _html = self.get_whole_html(_filemd5)
-                if len(_html)>5:
-
-                    pd = ParseDocument(_html,True)
-                    for _product in list_product:
-                        pd.fix_tree(_product)
-                        list_data = pd.tree
-                        _text,_count = extract_product_parameters(list_data,_product)
-                        if _count>0:
-                            _find = True
-                        if _text is not None:
-                            list_text.append(_text)
-
-                    pd = ParseDocument(_html,False)
-                    for _product in list_product:
-                        pd.fix_tree(_product)
-                        list_data = pd.tree
-                        _text,_count = extract_product_parameters(list_data,_product)
-                        if _count>0:
-                            _find = True
-                        if _text is not None:
-                            list_text.append(_text)
-                else:
-                    log("product attachment process filemd5 %s has no content"%(_filemd5))
-                if len(list_text)>0:
-                    _text = getBestProductText(list_text,'',[])
-                    logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
-                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
-                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
-                    dp.update_row(self.ots_client)
-                    _success = True
-                    break
-
-            if not _success:
-                if not _find:
-                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
+            try:
+                attachments = item.get(DOCUMENT_PRODUCT_ATTACHMENTS)
+                product_name = item.get(DOCUMENT_PRODUCT_NAME)
+                product_original_name = item.get(DOCUMENT_PRODUCT_ORIGINAL_NAME)
+                list_product = []
+                log("processing name:%s original_name:%s attachments:%s"%(product_name,product_original_name,attachments))
+                if product_original_name is not None:
+                    _l = product_original_name.split("_")
+                    _l.reverse()
+                    list_product.extend(_l)
+                if product_name is not None:
+                    list_product.append(product_name)
+                list_product = list(set(list_product))
+                dp = Document_product(item)
+                if attachments is None or attachments=="" or len(list_product)==0:
+                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
                     dp.update_row(self.ots_client)
+                    return
+                if isinstance(attachments,str):
+                    list_attachment = json.loads(attachments)
+                elif isinstance(attachments,list):
+                    list_attachment = attachments
                 else:
-                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
+                    log("attachment type error")
+                    dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_no_bidfile,True)
                     dp.update_row(self.ots_client)
+                    return
+                list_attachment.sort(key=lambda x:0 if x.get("classification")=="招标文件" else 1 if x.get("classification")=="采购清单" else 2)
+                list_filemd5 = [a.get("filemd5","") for a in list_attachment]
+                _find = False
+                _success = False
+                list_text = []
+                for _filemd5 in list_filemd5:
+                    _html = self.get_whole_html(_filemd5)
+                    if len(_html)>5:
+
+                        pd = ParseDocument(_html,True)
+                        for _product in list_product:
+                            pd.fix_tree(_product)
+                            list_data = pd.tree
+                            _text,_count = extract_product_parameters(list_data,_product)
+                            if _count>0:
+                                _find = True
+                            if _text is not None:
+                                list_text.append(_text)
+
+                        pd = ParseDocument(_html,False)
+                        for _product in list_product:
+                            pd.fix_tree(_product)
+                            list_data = pd.tree
+                            _text,_count = extract_product_parameters(list_data,_product)
+                            if _count>0:
+                                _find = True
+                            if _text is not None:
+                                list_text.append(_text)
+                    else:
+                        log("product attachment process filemd5 %s has no content"%(_filemd5))
+                    if len(list_text)>0:
+                        _html = getBestProductText(list_text,'',[])
+                        _html = format_text(_html)
+                        _soup = BeautifulSoup(_html,"lxml")
+                        _text = _soup.get_text()
+
+                        logger.info("extract_parameter_text bid_filemd5s:%s name:%s original_name:%s parameter_text:%s"%(str(list_filemd5),product_name,product_original_name,_text))
+                        dp.setValue(DOCUMENT_PRODUCT_PARAMETER,_text,True)
+                        dp.setValue(DOCUMENT_PRODUCT_PARAMETER_HTML,_html,True)
+                        dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_succeed,True)
+                        dp.setValue(DOCUMENT_PRODUCT_IS_PARAMETER,1,True)
+                        dp.update_row(self.ots_client)
+                        _success = True
+                        break
+
+                if not _success:
+                    if not _find:
+                        dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_not_found,True)
+                        dp.update_row(self.ots_client)
+                    else:
+                        dp.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,parameter_status_process_failed,True)
+                        dp.update_row(self.ots_client)
+            except Exception as e:
+                traceback.print_exc()
+            finally:
+                self.product_attachment_processed_queue.put(item.get(DOCUMENT_PRODUCT_ID))
+
 
     def start_process(self):
         self.process_parameters_producer()
@@ -318,5 +383,12 @@ def change_parameters_status():
     mt.run()
 
 if __name__ == '__main__':
-    start_process_parameters()
-    # change_parameters_status()
+    # start_process_parameters()
+    # change_parameters_status()
+    ots_client = getConnect_ots()
+    a = Document_product({DOCUMENT_PRODUCT_ID:"00000d8f94ba32d840c21fc9343ce4fb"})
+    a.fix_columns(ots_client,[DOCUMENT_PRODUCT_PARAMETER,DOCUMENT_PRODUCT_IS_PARAMETER],True)
+    with open("a.html","w",encoding="utf8") as f:
+        f.write(a.getProperties().get(DOCUMENT_PRODUCT_PARAMETER))
+    print(a.getProperties().get(DOCUMENT_PRODUCT_PARAMETER))
+

+ 1 - 1
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -1000,7 +1000,7 @@ def check_product(product_less,product_greater,split_char=","):
                 if getSimilarityOfString(_l,_g)>=0.8:
                     same_count += 1
                     break
-        if same_count/len(_product_l)>0.5:
+        if same_count/len(_product_l)>=0.5:
             return True
         return False
     return True

+ 6 - 5
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2131,7 +2131,7 @@ def dumplicate_projects(list_projects,b_log=False):
     list_projects.sort(key=lambda x:x.get("keyvaluecount",0),reverse=True)
     cluster_projects = list_projects[:10]
     _count = 10
-    print("dumplicate projects rest",len(cluster_projects))
+    log("dumplicate projects rest %d"%len(cluster_projects))
     while _count>0:
         _count -= 1
         _update = False
@@ -2162,7 +2162,7 @@ def dumplicate_projects(list_projects,b_log=False):
             break
         cluster_projects = list_p
 
-    print("dumplicate projects rest",len(cluster_projects))
+    log("dumplicate projects rest %d"%len(cluster_projects))
     return cluster_projects
 
 def update_projects_by_project(project_dict,projects):
@@ -2971,7 +2971,7 @@ def dumplicate_document_in_merge(list_projects,dup_docid):
     :return:
     '''
 
-    dup_docid = set(dup_docid)
+    dup_docid = set([str(a) for a in dup_docid])
     set_dup_total = set()
     for _proj in list_projects:
         try:
@@ -2989,7 +2989,7 @@ def dumplicate_document_in_merge(list_projects,dup_docid):
                 if str(docid) not in set_docids:
                     continue
 
-                if docid in dup_docid:
+                if str(docid) in dup_docid:
                     continue
                 _status = _d.get(document_status,201)
                 is_multipack = _d.get("is_multipack",True)
@@ -3041,10 +3041,11 @@ def dumplicate_document_in_merge(list_projects,dup_docid):
                     else:
                         dict_channel_proj[docchannel] = _d
 
-            set_docids = set_docids-set_dup_docid
+            set_docids = set_docids-set_dup_docid-dup_docid
             set_dup_total |= set_dup_docid
             if len(set_docids)==0:
                 log("projects set_docids length is zero %s"%(docids))
+                return None
             else:
                 _proj[project_docids] = ",".join(list(set_docids))
             _proj[project_project_dynamics] = json.dumps(list_dynamics,ensure_ascii=False)

+ 4 - 0
BaseDataMaintenance/model/ots/document_product.py

@@ -55,8 +55,12 @@ DOCUMENT_PRODUCT_HAS_BIDFILE = "has_bidfile"
 DOCUMENT_PRODUCT_HAS_ATTACHMENTS = "has_attachments"
 
 DOCUMENT_PRODUCT_PARAMETER_STATUS = "parameter_status"
+DOCUMENT_PRODUCT_IS_PARAMETER = "is_parameter"
+
+DOCUMENT_PRODUCT_PARAMETER_HTML = "parameter_html"
 
 Document_product_table_name = "document_product2"
+Document_product_table_name = "document_product"
 
 class Document_product(BaseModel):