Explorar el Código

Merge remote-tracking branch 'origin/master'

fangjiasheng hace 1 año
padre
commit
b4119b3f7c

+ 19 - 6
BaseDataMaintenance/common/Utils.py

@@ -26,7 +26,7 @@ USE_PAI_EAS = False
 
 Lazy_load = False
 
-ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]')
+ILLEGAL_CHARACTERS_RE = re.compile(r'[\000-\010]|[\013-\014]|[\016-\037]|\x00')
 
 import smtplib
 from email.mime.application import MIMEApplication
@@ -164,11 +164,23 @@ def article_limit(soup,limit_words=30000):
                 attachment_skip = False
                 for part in attachment_part.find_all(recursive=False):
                     if not attachment_skip:
-                        last_attachment_text_nums = attachment_text_nums
-                        attachment_text_nums = attachment_text_nums + len(re.sub(sub_space, "", part.get_text()))
-                        if attachment_text_nums>=limit_words:
-                            part.string = str(part.get_text())[:limit_words-last_attachment_text_nums]
-                            attachment_skip = True
+                        if part.name == 'div' and 'filemd5' in part.attrs:
+                            for p_part in part.find_all(recursive=False):
+                                last_attachment_text_nums = attachment_text_nums
+                                attachment_text_nums = attachment_text_nums + len(
+                                    re.sub(sub_space, "", p_part.get_text()))
+                                if not attachment_skip:
+                                    if attachment_text_nums >= limit_words:
+                                        p_part.string = str(p_part.get_text())[:limit_words - last_attachment_text_nums]
+                                        attachment_skip = True
+                                else:
+                                    p_part.decompose()
+                        else:
+                            last_attachment_text_nums = attachment_text_nums
+                            attachment_text_nums = attachment_text_nums + len(re.sub(sub_space, "", part.get_text()))
+                            if attachment_text_nums >= limit_words and not attachment_skip:
+                                part.string = str(part.get_text())[:limit_words - last_attachment_text_nums]
+                                attachment_skip = True
                     else:
                         part.decompose()
     soup = str(soup).replace("##attachment##","")
@@ -245,6 +257,7 @@ def cut_str(text_list, only_text_list, max_bytes_length=2000000):
 def getLegal_str(_str):
     if _str is not None:
         return ILLEGAL_CHARACTERS_RE.sub("",str(_str))
+    return ""
 
 def getRow_ots_primary(row):
     _dict = dict()

+ 3 - 3
BaseDataMaintenance/common/documentFingerprint.py

@@ -13,9 +13,9 @@ def getHtmlText(sourceHtml):
         _href = _a.attrs.get("href","")
         if _href.find("www.bidizhaobiao.com")>0:
             _a.decompose()
-    richText = _soup.find("div",attrs={"class":"richTextFetch"})
-    if richText is not None:
-        richText.decompose()
+    # richText = _soup.find("div",attrs={"class":"richTextFetch"})
+    # if richText is not None:
+    #     richText.decompose()
     _text = _soup.get_text()
 
     _text = re.sub("\s*",'',_text)

+ 21 - 8
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -25,7 +25,7 @@ flow_init_log_dir = "/data/python/flow_init_log"
 flow_init_check_dir = "/data/python/flow_init_check"
 
 
-flow_dumplicate_log_path = "/python_log/flow_dumplicate.log"
+flow_dumplicate_log_path = "/home/appuser/python/flow_dumplicate.log"
 
 
 class BaseDataMonitor():
@@ -204,7 +204,7 @@ class BaseDataMonitor():
             #                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
             total_count_todeal = getQueueSize("dataflow_attachment")
 
-            if total_count_todeal>100:
+            if total_count_todeal>1000:
                 # query = BoolQuery(must_queries=[
                 #     RangeQuery("crtime",self.get_last_tenmin_time(16))
                 # ])
@@ -243,7 +243,7 @@ class BaseDataMonitor():
 
                 #通过读取文件获取日志情况
                 dict_type = {}
-                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*recognize takes (?P<costtime>\d+)s"%(re.escape(self.get_last_tenmin_time()))
+                _pattern = "%s.*process filemd5\:[^\s]* (?P<result>(True|False)) of type\:(?P<type>[^\s]*).*download:(?P<downloadtime>\d+\.\d+)s recognize takes (?P<costtime>\d+)s upload takes (?P<uploadtime>\d+\.\d+)s"%(re.escape(self.get_last_tenmin_time()))
                 with open(flow_attachment_log_path,"r",encoding="utf8") as f:
                     while True:
                         line = f.readline()
@@ -253,15 +253,25 @@ class BaseDataMonitor():
                         if _match is not None:
                             _type = _match.groupdict().get("type")
                             _result = _match.groupdict().get("result")
+                            _downtime = _match.groupdict().get("downloadtime")
                             _costtime = _match.groupdict().get("costtime")
+
+                            _uploadtime = _match.groupdict().get("uploadtime")
                             if _type not in dict_type:
-                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0}
+                                dict_type[_type] = {"success":0,"fail":0,"success_costtime":0,"fail_costtime":0,"downtime":0,"downcount":0,"uploadtime":0,"uploadcount":0}
                             if _result=="True":
                                 dict_type[_type]["success"] += 1
                                 dict_type[_type]["success_costtime"] += int(_costtime)
+
                             else:
                                 dict_type[_type]["fail"] += 1
                                 dict_type[_type]["fail_costtime"] += int(_costtime)
+                            if float(_downtime)>0:
+                                dict_type[_type]["downcount"] += 1
+                                dict_type[_type]["downtime"] += float(_downtime)
+                            if float(_uploadtime)>0:
+                                dict_type[_type]["uploadcount"] += 1
+                                dict_type[_type]["uploadtime"] += float(_uploadtime)
 
                 process_count = 0
                 process_succeed_count = 0
@@ -269,9 +279,11 @@ class BaseDataMonitor():
                 for k,v in dict_type.items():
                     process_count += v.get("success",0)+v.get("fail",0)
                     process_succeed_count += v.get("success",0)
-                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")))
+                    _msg_type += "\n类型%s\n\t成功%s,消耗%s秒,%.2f秒/个,\n\t失败%s,消耗%s秒,%.2f秒/个,\n\t下载%s,消耗%s秒,%.2f秒/个,\n\t上传%s,消耗%s秒,%.2f秒/个"%(k,str(v.get("success")),str(v.get("success_costtime")),v.get("success_costtime")/max(1,v.get("success")),str(v.get("fail")),str(v.get("fail_costtime")),v.get("fail_costtime")/max(1,v.get("fail")),str(v.get("downcount")),str(v.get("downtime")),v.get("downtime")/max(1,v.get("downcount")),str(v.get("uploadcount")),str(v.get("uploadtime")),v.get("uploadtime")/max(1,v.get("uploadcount")))
 
                 _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
+                log(_msg)
+                log(_msg_type)
                 sentMsgToDD(_msg+_msg_type,ACCESS_TOKEN_DATAWORKS)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
@@ -296,7 +308,7 @@ class BaseDataMonitor():
 
             total_count_todeal = getQueueSize("dataflow_extract")
 
-            if total_count_todeal>100:
+            if total_count_todeal>500:
                 _cmd = 'cat %s | grep "%s" | grep -c "process.*docid"'%(flow_extract_log_path,self.get_last_tenmin_time())
                 log(_cmd)
                 process_count = self.cmd_execute(_cmd)
@@ -331,6 +343,7 @@ class BaseDataMonitor():
                 #                                                                              columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
                 _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
+                log(_msg)
                 atAll=False
                 if success_count==0:
                     atAll=True
@@ -610,11 +623,11 @@ class BaseDataMonitor():
 
         # scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
         scheduler.add_job(self.monitor_extract,"cron",minute="*/10")
-        scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/3")
+        scheduler.add_job(self.monitor_proposedBuilding,"cron",hour="*/11")
         # scheduler.add_job(self.monitor_dumplicate,"cron",minute="*/10")
         scheduler.add_job(self.monitor_sychr,"cron",minute="*/10")
         scheduler.add_job(self.monitor_preproject,"cron",hour="8")
-        scheduler.add_job(self.monitor_merge,"cron",minute="*/30")
+        scheduler.add_job(self.monitor_merge,"cron",minute="*/60")
         scheduler.add_job(self.monitor_init,"cron",hour="*/3")
         scheduler.start()
 

+ 2 - 2
BaseDataMaintenance/dataSource/interface.py

@@ -39,9 +39,9 @@ def getAttachDealInterface(_data,_type,path="",restry=1,kwargs={},url=interface_
                 _json.update(kwargs)
 
 
-            _json["timeout"] = 10000
+            _json["timeout"] = timeout
             with requests.Session() as sess:
-                _resp = sess.post(url,data=_json,timeout=timeout)
+                _resp = sess.post(url,data=_json,timeout=timeout+100)
 
             if _resp.status_code==200:
                 _result = json.loads(_resp.content.decode())

+ 6 - 5
BaseDataMaintenance/dataSource/setttings.py

@@ -45,10 +45,10 @@ oracle_host = "192.168.0.150"
 oracle_port = 1522
 # oracle_user = "bxkc_data_readonly"
 # oracle_pass = "P7WUrgcz0@#j8pjg"
-# oracle_user = "bxkc_write"
-# oracle_pass = "aBrTKNl9SaPk@Yy3"
-oracle_user = "bxkc_db"
-oracle_pass = "xb9F#24Hd#5rStr9"
+oracle_user = "BXKC_WRITE"
+oracle_pass = "PHNhX3%rVy4@fDB&"
+# oracle_user = "bxkc_db"
+# oracle_pass = "xb9F#24Hd#5rStr9"
 oracle_db = "yanphone"
 
 ots_AccessKeyId = 'LTAI5tFuoxHm8Uxrr5nT8wTZ'
@@ -66,7 +66,8 @@ activateMQ_port = 61613
 activateMQ_user = "admin"
 activateMQ_pswd = "admin"
 
-activateMQ_ali_host = "172.16.147.13"
+activateMQ_ali_host = "172.16.160.72"
+# activateMQ_ali_host = "172.16.147.13"
 # activateMQ_ali_host = "116.62.167.43"
 activateMQ_ali_port = 61613
 activateMQ_ali_user = "admin"

+ 3 - 3
BaseDataMaintenance/maintenance/check_log.py

@@ -6,7 +6,7 @@ def test_speed(logfile):
     a = open(logfile,"r",encoding="utf8").read()
     set_a = set()
     _c = 0
-    for a in re.split("\n",s):
+    for a in re.split("\n",a):
         a = a.strip()
         if a=="":
             continue
@@ -41,5 +41,5 @@ def check_start_end(logfile):
 
 if __name__ == '__main__':
     logfile = "log.txt"
-    # test_speed(logfile)
-    check_start_end(logfile)
+    test_speed(logfile)
+    # check_start_end(logfile)

+ 64 - 48
BaseDataMaintenance/maintenance/dataflow.py

@@ -1351,7 +1351,7 @@ class Dataflow():
 
 
     def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
-        def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json]):
+        def producer(columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_product,document_tmp_fingerprint,document_tmp_tenderee,document_tmp_agency,document_tmp_project_code,document_tmp_project_name,document_tmp_doctitle_refine,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_web_source_name]):
             bool_query = BoolQuery(must_queries=[RangeQuery(document_tmp_status,*status_from,True,True)])
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
                                                                                 SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
@@ -2227,8 +2227,10 @@ class Dataflow_dumplicate(Dataflow):
         _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
         if _dict["doctitle_refine"]=="":
             _dict["doctitle_refine"] = _dict.get("doctitle")
-        _dict["nlp_enterprise"] = str({"indoctextcon":_extract.get("nlp_enterprise",[]),
-                                       "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])})
+        _dict["moneys"] = set(_extract.get("moneys",[]))
+        _dict["moneys_attachment"] = set(_extract.get("moneys_attachment",[]))
+        _dict["nlp_enterprise"] = json.dumps({"indoctextcon":_extract.get("nlp_enterprise",[]),
+                                       "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])},ensure_ascii=False)
         _dict["extract_count"] = self.c_f_get_extractCount.evaluate(extract_json)
         _dict["package"] = self.c_f_get_package.evaluate(extract_json)
         _dict["project_name"] = _extract.get("name","")
@@ -2243,38 +2245,29 @@ class Dataflow_dumplicate(Dataflow):
 
         if len(base_list)>0:
             base_fingerprint = base_list[0]["fingerprint"]
-        for _i in range(1,len(base_list)):
+
+        final_group = []
+        for _i in range(len(base_list)):
             _dict1 = base_list[_i]
             fingerprint_less = _dict1["fingerprint"]
             _pass = True
             if fingerprint_less==base_fingerprint:
                 _index = _i
+                final_group.append(_dict1)
                 continue
-            for _j in range(min(_i,10)):
-                _dict2 = base_list[_j]
+            for _dict2 in final_group:
                 _prob,day_dis = self.dumplicate_check(_dict1,_dict2,_dict1.get("min_counts",10),b_log=b_log)
                 if _prob<=0.1:
                     _pass = False
                     break
             log("checking index:%d %s %.2f"%(_i,str(_pass),_prob))
             _index = _i
-            if not _pass:
-                _index -= 1
+            if _pass:
+                final_group.append(_dict1)
+            else:
                 break
-        if _index>=1:
-            # #对重复入库的进行去重
-            # _l = the_group[:_index+1]
-            # set_fingerprint = set()
-            # final_l = []
-            # for _dict in _l:
-            #     fingerprint_less = _dict["fingerprint"]
-            #     if fingerprint_less in set_fingerprint:
-            #         continue
-            #     else:
-            #         final_l.append(_dict)
-            #         set_fingerprint.add(fingerprint_less)
-            return the_group[:_index+1]
-        return []
+
+        return final_group
 
     def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
         document_less = _dict1
@@ -2299,6 +2292,10 @@ class Dataflow_dumplicate(Dataflow):
         province_less = document_less.get("province")
         city_less = document_less.get("city")
         district_less = document_less.get("district")
+        moneys_less = document_less.get("moneys")
+        moneys_attachment_less = document_less.get("moneys_attachment")
+        page_attachments_less = document_less.get(document_tmp_attachment_path,"[]")
+
 
         document_greater = _dict2
         docid_greater = _dict2["docid"]
@@ -2323,12 +2320,16 @@ class Dataflow_dumplicate(Dataflow):
         city_greater = document_greater.get("city")
         district_greater = document_greater.get("district")
 
+        moneys_greater = document_greater.get("moneys")
+        moneys_attachment_greater = document_greater.get("moneys_attachment")
+        page_attachments_greater = document_greater.get(document_tmp_attachment_path,"[]")
+
         hard_level=1
         if web_source_no_less==web_source_no_greater=="17397-3":
             hard_level=2
 
         if self.check_rule==1:
-            _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
+            _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
         else:
             _prob = check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
 
@@ -2449,7 +2450,7 @@ class Dataflow_dumplicate(Dataflow):
                 check_result["code"] = 1
 
 
-        if not check_product(product_less,product_greater):
+        if not check_product(product_less,product_greater,doctitle_refine_less,doctitle_refine_greater):
             check_result["product"] = 0
             check_result["pass"] = 0
             if b_log:
@@ -2591,7 +2592,7 @@ class Dataflow_dumplicate(Dataflow):
             confidence = _dict["confidence"]
 
             if b_log:
-                log("confidence %d %.3f"%(_docid,confidence))
+                log("confidence %d %.3f total_count %d"%(_docid,confidence,_dict.get('min_counts',0)))
 
             if confidence>0.1:
                 if _docid not in set_docid:
@@ -2622,9 +2623,9 @@ class Dataflow_dumplicate(Dataflow):
         if page_time=='':
             page_time = current_date
 
-        two_day_dict = {"page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]}
+        two_day_dict = {"page_time":[timeAdd(page_time,-7),timeAdd(page_time,7)]}
 
-        if page_time>=timeAdd(current_date,-2):
+        if page_time>=timeAdd(current_date,-7):
             table_name = "document_tmp"
             table_index = "document_tmp_index"
             base_dict = {
@@ -2853,7 +2854,7 @@ class Dataflow_dumplicate(Dataflow):
 
         return list_rules,table_name,table_index
 
-    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]):
+    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_name]):
         q_size = self.queue_dumplicate.qsize()
         log("dumplicate queue size %d"%(q_size))
 
@@ -2912,7 +2913,7 @@ class Dataflow_dumplicate(Dataflow):
 
     def flow_dumpcate_comsumer(self):
         from multiprocessing import Process
-        process_count = 3
+        process_count = 6
         thread_count = 12
         list_process = []
         def start_thread():
@@ -3955,6 +3956,7 @@ class Dataflow_dumplicate(Dataflow):
         page_time = item.get(document_page_time,"")
         has_before = False
         has_after = False
+
         if len(page_time)>0:
             l_page_time = timeAdd(page_time,days=-90)
             dict_time = item.get("dict_time",{})
@@ -3964,9 +3966,23 @@ class Dataflow_dumplicate(Dataflow):
                         has_before = True
                     if v>page_time:
                         has_after = True
-        if not has_after and has_before:
-            log("check page_time false %s==%s-%s"%(l_page_time,k,v))
-            return False
+        log("check page_time has_before %s has_after %s"%(str(has_before),str(has_after)))
+        if has_before:
+            _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
+                               must_not_queries=[TermQuery(document_docid,item.get(document_docid,0))])
+            if not has_after:
+                log("check page_time false %s==%s-%s"%(l_page_time,k,v))
+
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                       SearchQuery(_query,get_total_count=True,limit=1))
+                if total_count>0:
+                    return False
+            if item.get(document_web_source_name,"")=="中国政府采购网":
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                    SearchQuery(_query,get_total_count=True,limit=1))
+                if total_count>0:
+                    return False
+
         return True
 
 
@@ -3988,7 +4004,7 @@ class Dataflow_dumplicate(Dataflow):
             log("dumplicate %s rules:%d"%(str(item.get(document_tmp_docid)),len(list_rules)))
             list_rules = list_rules[:30]
             _i = 0
-            step = 5
+            step = 2
 
 
             item["confidence"] = 999
@@ -4008,7 +4024,7 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle],b_log=b_log)
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path],b_log=b_log)
                 _i += step
 
 
@@ -4070,11 +4086,13 @@ class Dataflow_dumplicate(Dataflow):
                 dtmp.setValue(document_tmp_save,1,True)
 
             list_merge_dump = []
-            if exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0:
+            if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
                 log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
                 dtmp.setValue(document_tmp_projects,"[]",True)
             else:
                 project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
+                if list_merge_dump is not None and str(item.get(document_tmp_docid)) in list_merge_dump:
+                    dtmp.setValue(document_tmp_save,0,True)
                 dtmp.setValue(document_tmp_projects,project_json,True)
             log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
 
@@ -4092,9 +4110,8 @@ class Dataflow_dumplicate(Dataflow):
                             dtmp.setValue(document_tmp_projects,json.dumps(list_proj[:len(list_proj)//2]),True)
                             if dtmp.update_row(self.ots_client):
                                 break
-                if table_name=="document_tmp":
-                    self.changeSaveStatus(remove_list)
-                    self.changeSaveStatus(list_merge_dump)
+                self.changeSaveStatus(remove_list)
+                self.changeSaveStatus(list_merge_dump)
             else:
                 return list_docids
 
@@ -4205,7 +4222,7 @@ class Dataflow_dumplicate(Dataflow):
 
     def test_dumplicate(self,docid):
         # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
-        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district]
+        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_name]
         bool_query = BoolQuery(must_queries=[
             TermQuery("docid",docid)
         ])
@@ -4220,7 +4237,7 @@ class Dataflow_dumplicate(Dataflow):
         list_dict = getRow_ots(rows)
 
         for item in list_dict:
-            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=False,upgrade=False)
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
             return
 
     def test_merge(self,list_docid_less,list_docid_greater):
@@ -4394,14 +4411,13 @@ if __name__ == '__main__':
 
     # download_attachment()
     # test_attachment_interface()
-    # df_dump = Dataflow_dumplicate(start_delete_listener=False)
-    # # df_dump.start_flow_dumplicate()
-
-    # df_dump.test_dumplicate(400929607
-    #                         )
-    compare_dumplicate_check()
-    # df_dump.test_merge([242672995,235300429,240009762
-    #                     ],[243240169,])
+    df_dump = Dataflow_dumplicate(start_delete_listener=False)
+    # df_dump.start_flow_dumplicate()
+    df_dump.test_dumplicate(455485514
+                            )
+    # compare_dumplicate_check()
+    # df_dump.test_merge([391898061
+    #                     ],[371551361,])
     # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()

+ 119 - 24
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -14,6 +14,7 @@ from BaseDataMaintenance.model.ots.document import Document
 from BaseDataMaintenance.common.Utils import article_limit
 from BaseDataMaintenance.common.documentFingerprint import getFingerprint
 from BaseDataMaintenance.model.postgres.document_extract import *
+from BaseDataMaintenance.model.oracle.T_SHEN_PI_XIANG_MU import *
 
 import sys
 sys.setrecursionlimit(1000000)
@@ -57,7 +58,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 log("get message of idx:%s"%(str(self._idx)))
                 message_id = headers.headers["message-id"]
                 body = headers.body
-                _dict = {"frame":headers,"conn":self.conn}
+
+                _dict = {"frame":headers,"conn":self.conn,"idx":self._idx}
                 self._func(_dict=_dict)
             except Exception as e:
                 traceback.print_exc()
@@ -77,7 +79,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
         self.queue_attachment_ocr = Queue()
         self.queue_attachment_not_ocr = Queue()
-        self.comsumer_count = 90
+        self.comsumer_count = 20
         self.comsumer_process_count = 5
         self.retry_comsumer_count = 10
         self.retry_times = 5
@@ -95,12 +97,12 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
         self.session = None
 
-        # for _ in range(self.comsumer_process_count):
-        #     listener_p = Process(target=self.start_attachment_listener)
-        #     listener_p.start()
+        for _ in range(self.comsumer_process_count):
+            listener_p = Process(target=self.start_attachment_listener)
+            listener_p.start()
 
-        listener_p = Process(target=self.start_attachment_listener)
-        listener_p.start()
+        # listener_p = Process(target=self.start_attachment_listener)
+        # listener_p.start()
 
 
 
@@ -115,7 +117,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 if self.list_attachment_comsumer[i].conn.is_connected():
                     continue
                 else:
-                    listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,_i)
+                    listener = self.AttachmentMQListener(getConnect_activateMQ(),self.attachment_listener_handler,i)
                     createComsumer(listener,self.mq_attachment)
                     self.list_attachment_comsumer[i] = listener
             time.sleep(5)
@@ -155,9 +157,16 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
             conn = _dict["conn"]
             message_id = frame.headers["message-id"]
             item = json.loads(frame.body)
+            _idx = _dict.get("idx",1)
             page_attachments = json.loads(item.get(document_tmp_attachment_path,"[]"))
             _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
 
+            if random.random()<0.2:
+                log("jump by random")
+                if send_msg_toacmq(self.pool_mq,frame.body,self.mq_attachment):
+                    ackMsg(conn,message_id)
+                    return
+
             if len(page_attachments)==0:
                 newitem ={"item":item,"list_attach":[],"message_id":message_id,"conn":conn}
             else:
@@ -218,7 +227,14 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                                           "html":_html})
 
                 if _attach.getProperties().get(attachment_filetype)=="swf":
-                    swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
+                    # swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
+                    _swf_urls = _attach.getProperties().get(attachment_swfUrls, "[]")
+                    if _swf_urls:
+                        _swf_urls = _swf_urls.replace('\\', '')
+                    else:
+                        _swf_urls = '[]'
+                    _swf_urls = json.loads(_swf_urls)
+                    swf_urls.extend(_swf_urls)
 
             if not _not_failed:
                 return False,list_html,swf_urls
@@ -316,6 +332,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
         objectPath = attach.getProperties().get(attachment_path)
         docids = attach.getProperties().get(attachment_docids)
 
+        _ots_exists = attach.getProperties().get("ots_exists")
+
         if objectPath is None:
             relative_path = "%s/%s"%(_uuid.hex[:4],_uuid.hex)
         else:
@@ -392,8 +410,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
 
                     if local_exists:
-                        upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
-                    os.remove(localpath)
+                        if not _ots_exists:
+                            upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
+                        os.remove(localpath)
 
                     return True
 
@@ -406,7 +425,9 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 # _data_base64 = base64.b64encode(open(localpath,"rb").read())
                 # _success,_html,swf_images = getAttachDealInterface(_data_base64,_filetype)
                 _success,_html,swf_images,classification = getAttachDealInterface(None,_filetype,path=localpath,session=self.session)
-                log("process filemd5:%s %s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
+
+                _reg_time = time.time()-start_time
+
                 if _success:
                     if len(_html)<5:
                         _html = ""
@@ -419,10 +440,19 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
 
                         return False
 
+
+                # 重跑swf时,删除原来的swf_urls中的"\"
+                if attach.getProperties().get(attachment_filetype) == "swf":
+                    swf_urls = attach.getProperties().get(attachment_swfUrls, "[]")
+                    swf_urls = swf_urls.replace('\\', '') if swf_urls else '[]'
+                    swf_urls = json.loads(swf_urls)
+                    attach.setValue(attachment_swfUrls, json.dumps(swf_urls, ensure_ascii=False), True)
+
                 swf_images = eval(swf_images)
                 if attach.getProperties().get(attachment_filetype)=="swf" and len(swf_images)>0:
 
                     swf_urls = json.loads(attach.getProperties().get(attachment_swfUrls,"[]"))
+
                     if len(swf_urls)==0:
                         objectPath = attach.getProperties().get(attachment_path,"")
                         swf_dir = os.path.join(self.current_path,"swf_images",uuid4().hex)
@@ -440,6 +470,8 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                         if os.path.exists(swf_dir):
                             os.rmdir(swf_dir)
                         attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
+                    else:
+                        attach.setValue(attachment_swfUrls,json.dumps(swf_urls,ensure_ascii=False),True)
 
                 if re.search("<td",_html) is not None:
                     attach.setValue(attachment_has_table,1,True)
@@ -471,14 +503,17 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                 self.putAttach_json_toRedis(filemd5,attach.getProperties())
 
 
+                start_time = time.time()
                 if local_exists:
-                    upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
+                    if not _ots_exists:
+                        upload_status = uploadFileByPath(self.bucket,localpath,objectPath)
                 try:
                     if upload_status and os.exists(localpath):
                         os.remove(localpath)
                 except Exception as e:
                     pass
-
+                _upload_time = time.time()-start_time
+                log("process filemd5:%s %s of type:%s with size:%.3fM download:%.2fs recognize takes %ds upload takes %.2fs _ots_exists %s,ret_size:%d"%(filemd5,str(_success),_filetype,round(_size/1024/1024,4),time_download,_reg_time,_upload_time,str(_ots_exists),len(_html)))
 
                 return True
             else:
@@ -598,7 +633,10 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     if _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True):
                         if _attach_ots.getProperties().get(attachment_status) is not None:
                             log("getAttachments find in ots:%s"%(_filemd5))
-                            list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))
+                            _attach_pg = Attachment_postgres(_attach_ots.getProperties())
+                            _attach_pg.setValue("ots_exists",True,True)
+                            list_attachment.append(_attach_pg)
+
                     else:
                         log("getAttachments search in path:%s"%(_filemd5))
                         if _path:
@@ -782,6 +820,8 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
     def start_extract_listener(self):
 
+        self.list_extract_comsumer = []
+
         for _i in range(self.comsumer_count):
             listener_extract = self.ExtractListener(getConnect_activateMQ(),self.comsumer_handle,_i)
             createComsumer(listener_extract,self.mq_extract)
@@ -953,18 +993,25 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
 
             _dochtmlcon = item.get(document_tmp_dochtmlcon,"")
 
-            html_len = len(_dochtmlcon)
-            if html_len>50000:
+            html_len = len(_dochtmlcon) # html 文本长度
+            limit_text_len = 50000 # 内容(或附件)正文限制文本长度
+            if html_len > limit_text_len:
                 log("docid %s dochtmlcon too long len %d "%(str(item.get("docid")),html_len))
                 try:
                     _dochtmlcon = re.sub("<html>|</html>|<body>|</body>", "", _dochtmlcon)
-                    _soup = BeautifulSoup(_dochtmlcon,"lxml")
-                    if len(_dochtmlcon)>200000:
-                        _find = _soup.find("div",attrs={"class":"richTextFetch"})
-                        if _find is not None:
-                            _find.decompose()
-                    else:
-                        _soup = article_limit(_soup,50000)
+                    _soup = BeautifulSoup(_dochtmlcon,"html5lib")
+                    all_len = len(_soup.get_text()) # 全公告内容text长度
+                    _attachment = _soup.find("div", attrs={"class": "richTextFetch"})
+                    attachment_len = len(_attachment.get_text()) if _attachment else 0 # 附件内容text长度
+                    main_text_len = all_len - attachment_len # 正文内容text长度
+
+                    if attachment_len>150000: # 附件内容过长删除(处理超时)
+                        if _attachment is not None:
+                            _attachment.decompose()
+                            attachment_len = 0
+                    # 正文或附件内容text长度大于limit_text_len才执行article_limit
+                    if main_text_len>limit_text_len or attachment_len>limit_text_len:
+                        _soup = article_limit(_soup,limit_text_len)
                     _dochtmlcon = str(_soup)
                 except Exception as e:
                     traceback.print_exc()
@@ -1279,6 +1326,8 @@ class Dataflow_init(Dataflow):
 
     def __init__(self):
         Dataflow.__init__(self)
+        self.max_shenpi_id = None
+        self.base_shenpi_id = 400000000000
         self.mq_init = "/queue/dataflow_init"
 
         self.mq_attachment = "/queue/dataflow_attachment"
@@ -1330,6 +1379,52 @@ class Dataflow_init(Dataflow):
             traceback.print_exc()
             self.pool_oracle.decrease()
 
+    def shengpi2mq(self):
+
+        conn_oracle = self.pool_oracle.getConnector()
+
+        try:
+            if self.max_shenpi_id is None:
+                # get the max_shenpi_id
+                _query = BoolQuery(must_queries=[ExistsQuery("id")])
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
+                                                                                    SearchQuery(_query,sort=Sort(sorters=[FieldSort("id",SortOrder.DESC)]),limit=1))
+                list_data = getRow_ots(rows)
+                if len(list_data)>0:
+                    max_shenpi_id = list_data[0].get("id")
+                    if max_shenpi_id>self.base_shenpi_id:
+                        max_shenpi_id -= self.base_shenpi_id
+                    self.max_shenpi_id = max_shenpi_id
+            if self.max_shenpi_id is not None:
+                # select data in order
+                list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,self.max_shenpi_id,)
+
+                # send data to mq one by one with max_shenpi_id updated
+                for _data in list_data:
+                    _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
+
+                    ots_dict = _data.getProperties_ots()
+                    if ots_dict["docid"]<self.base_shenpi_id:
+                        ots_dict["docid"] += self.base_shenpi_id
+
+                    if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
+                        if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
+                            self.max_shenpi_id = _id
+                        else:
+                            log("sent shenpi message to mq failed %s"%(_id))
+                            break
+                    else:
+                        if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_extract):
+                            self.max_shenpi_id = _id
+                        else:
+                            log("sent shenpi message to mq failed %s"%(_id))
+                            break
+
+        except Exception as e:
+            traceback.print_exc()
+            self.pool_oracle.decrease()
+
+
 
     def ots2mq(self):
         try:

+ 11 - 8
BaseDataMaintenance/maintenance/document/download_attachment_and_set_status_rerun.py

@@ -161,14 +161,17 @@ bucket = flow.bucket
 def download_attachment_mp(args, queue):
     md5, obj_path = args
     # 设置路径
-    relative_path = obj_path[5:].replace("//","/")
-    localpath = "/FileInfo/%s"%(relative_path)
-    if not os.path.exists(localpath):
-        if not os.path.exists(os.path.dirname(localpath)):
-            os.makedirs(os.path.dirname(localpath))
-    else:
-        logging.info('md5 continue ' + md5 + ' ' + obj_path)
-        return
+    relative_path = obj_path[5:].replace("//", "/")
+    localpath = "/FileInfo/%s" % (relative_path)
+    try:
+        if not os.path.exists(localpath):
+            if not os.path.exists(os.path.dirname(localpath)):
+                os.makedirs(os.path.dirname(localpath))
+        else:
+            logging.info('md5 continue ' + md5 + ' ' + obj_path)
+            return
+    except:
+        pass
 
     # 下载
     try:

+ 146 - 0
BaseDataMaintenance/maintenance/product/extract_data.py

@@ -0,0 +1,146 @@
+
+
+from BaseDataMaintenance.maintenance.product.htmlparser import *
+from BaseDataMaintenance.common.Utils import getLegal_str
+
+
+requirement_pattern = "(采购需求|需求分析|项目说明|合同内容|招标项目技术要求|采购内容及其它要求|服务要求|招标内容[及与和]要求|服务需求|采购内容|项目概况|项目目标|项目服务内容|项目服务范围|需求内容如下)([::]|$)"
+policy_pattern = "《.+?》"
+not_policy_pattern = "(表|函|书|证)》$|采购合同|响应方须知|响应文件格式|营业执照|开标一览|采购需求"
+aptitude_pattern = "(资格要求|资质要求)([::]|$)"
+def extract_parameters(_html):
+
+
+    pd = ParseDocument(_html,True)
+
+    list_data = pd.tree
+    list_result = []
+
+
+
+    requirement_text = ''
+    list_policy = []
+    aptitude_text = ''
+
+    _find_count = 0
+    _data_i = -1
+    while _data_i<len(list_data)-1:
+        _data_i += 1
+        _data = list_data[_data_i]
+        _type = _data["type"]
+        _text = _data["text"].strip()
+        # print(_data.keys())
+        if _type=="sentence":
+            if _data["sentence_title"] is not None:
+                if re.search(requirement_pattern,_text) is not None:
+
+                    childs = get_childs([_data])
+                    for c in childs:
+                        requirement_text += c["text"]+"\n"
+                    _data_i += len(childs)
+    _data_i = -1
+    while _data_i<len(list_data)-1:
+        _data_i += 1
+        _data = list_data[_data_i]
+        _type = _data["type"]
+        _text = _data["text"].strip()
+        # print(_data.keys())
+        if _type=="sentence":
+            if _data["sentence_title"] is not None:
+                print("aptitude_pattern",_text)
+                if re.search(aptitude_pattern,_text) is not None:
+                    childs = get_childs([_data])
+
+                    for c in childs:
+                        aptitude_text += c["text"]+"\n"
+                    _data_i += len(childs)
+
+        if _type=="table":
+            list_table = _data["list_table"]
+            parent_title = _data["parent_title"]
+            if list_table is not None:
+                for line in list_table[:2]:
+                    for cell_i in range(len(line)):
+                        cell = line[cell_i]
+                        cell_text = cell[0]
+                        if len(cell_text)>120 and re.search(aptitude_pattern,cell_text) is not None:
+                            aptitude_text += cell_text+"\n"
+
+    _data_i = -1
+    while _data_i<len(list_data)-1:
+        _data_i += 1
+        _data = list_data[_data_i]
+        _type = _data["type"]
+        _text = _data["text"].strip()
+        # print(_data.keys())
+        if _type=="sentence":
+            if re.search(policy_pattern,_text) is not None:
+                for t in re.findall(policy_pattern,_text):
+                    if len(t)>0:
+                        list_policy.append(t)
+
+
+    list_policy = list(set(list_policy))
+    new_list_policy = []
+    for _policy in list_policy:
+        if re.search(not_policy_pattern,_policy) is None:
+            new_list_policy.append(_policy)
+    return requirement_text,new_list_policy,aptitude_text
+
+
+def valid_xml_char_ordinal(c):
+    codepoint = ord(c)
+    # conditions ordered by presumed frequency
+    return (
+            0x20 <= codepoint <= 0xD7FF or
+            codepoint in (0x9, 0xA, 0xD) or
+            0xE000 <= codepoint <= 0xFFFD or
+            0x10000 <= codepoint <= 0x10FFFF
+    )
+
+
+def wash_data(text):
+    cleaned_string = ''.join(c for c in text if valid_xml_char_ordinal(c))
+    return cleaned_string
+
+def extract_datas():
+    import glob
+    import pandas as pd
+
+    list_data = []
+    for file in glob.glob(r"C:\Users\Administrator\Desktop\html_output\*.html"):
+        _content = open(file,"r",encoding="utf-8").read()
+        filename = file.split("\\")[-1]
+        # if filename!='2023年泗门镇公共视频图像通信链路技术服务采购项目.html':
+        #     continue
+        requirement_text,list_policy,aptitude_text = extract_parameters(_content)
+        print("requirement_text",requirement_text)
+        print("list_policy",list_policy)
+        print("aptitude_text",aptitude_text)
+        list_data.append([wash_data(filename),wash_data(requirement_text),wash_data(",".join(list_policy)),wash_data(aptitude_text)])
+    df = pd.DataFrame(list_data,columns=["文件名","采购需求","采购政策","资质要求"])
+    df.to_excel("提取结果.xlsx",columns=["文件名","采购需求","采购政策","资质要求"])
+
+def extract_datas_1():
+    import glob
+    import pandas as pd
+
+    list_data = []
+    for file in glob.glob(r"C:\Users\Administrator\Desktop\html_output1\*.html"):
+        _content = open(file,"r",encoding="utf-8").read()
+        filename = file.split("\\")[-1].split(".")[0]
+        # if filename!='2023年泗门镇公共视频图像通信链路技术服务采购项目.html':
+        #     continue
+        requirement_text,list_policy,aptitude_text = extract_parameters(_content)
+        print("requirement_text",requirement_text)
+        print("list_policy",list_policy)
+        print("aptitude_text",aptitude_text)
+        # list_data.append([wash_data(filename),wash_data(requirement_text),wash_data(",".join(list_policy)),wash_data(aptitude_text)])
+        list_data.append([wash_data(filename),wash_data(",".join(list_policy))])
+    df = pd.DataFrame(list_data,columns=["文件名","采购政策"])
+    df.to_excel("提取结果1.xlsx",columns=["文件名","采购政策"])
+
+if __name__ == '__main__':
+    # extract_datas()
+    extract_datas_1()
+    # print(re.search(aptitude_pattern,'二、申请人的资格要求:'))

+ 5 - 5
BaseDataMaintenance/maintenance/product/htmlparser.py

@@ -139,6 +139,7 @@ class ParseDocument():
         # #识别目录树
         # if self.parseTree:
         #     self.parseTree.printParseTree()
+        # self.print_tree(self.tree,"-|")
 
     def get_soup_objs(self,soup,list_obj=None):
         if list_obj is None:
@@ -158,18 +159,17 @@ class ParseDocument():
             self.tree = self.buildParsetree(self.list_obj,products,self.auto_merge_table)
 
     def print_tree(self,tree,append=""):
+        self.set_tree_id = set()
         if append=="":
-            self.set_tree_id = set()
-
-            # for t in tree:
-            #     logger.debug("%s text:%s title:%s title_text:%s before:%s after%s product:%s"%("==>",t["text"][:50],t["sentence_title"],t["sentence_title_text"],t["title_before"],t["title_after"],t["has_product"]))
+            for t in tree:
+                logger.debug("%s text:%s title:%s title_text:%s before:%s after%s product:%s"%("==>",t["text"][:50],t["sentence_title"],t["sentence_title_text"],t["title_before"],t["title_after"],t["has_product"]))
 
         for t in tree:
             _id = id(t)
             if _id in self.set_tree_id:
                 continue
             self.set_tree_id.add(_id)
-            logger.debug("%s text:%s title:%s title_text:%s before:%s after%s product:%s"%(append,t["text"][:50],t["sentence_title"],t["sentence_title_text"],t["title_before"],t["title_after"],t["has_product"]))
+            logger.info("%s text:%s title:%s title_text:%s before:%s after%s product:%s"%(append,t["text"][:50],t["sentence_title"],t["sentence_title_text"],t["title_before"],t["title_after"],t["has_product"]))
             childs = t["child_title"]
             self.print_tree(childs,append=append+"-|")
 

+ 158 - 44
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -479,7 +479,7 @@ class f_set_docid(BaseUDAF):
             defind_count = list_docs[0]["defind_count"]
         print(defind_count)
         for i in range(len(list_docs)-1):
-            if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*2:
+            if abs(list_docs[i]["page_time_stamp"]-list_docs[i+1]["page_time_stamp"])<=86400*7:
                 continue
             else:
                 _group = []
@@ -590,10 +590,10 @@ class f_group_fingerprint(BaseUDAF):
         buffer[0].append(docid)
 
     def merge(self, buffer, pbuffer):
-        buffer[0].extend(pbuffer[0])
+        buffer[0].extend(pbuffer[0][:100000])
 
     def terminate(self, buffer):
-        list_docid = buffer[0]
+        list_docid = buffer[0][:100000]
         list_docid.sort(key=lambda x:x)
         return ",".join([str(a) for a in list_docid])
 
@@ -635,7 +635,7 @@ class f_dump_probability(BaseUDAF):
             list_data.append(_dict)
             if len(list_data)>10000:
                 break
-        list_group = split_with_time(list_data,sort_key="page_time_stamp",timedelta=86400*2)
+        list_group = split_with_time(list_data,sort_key="page_time_stamp",timedelta=86400*7)
         return json.dumps(list_group)
 
 
@@ -779,7 +779,9 @@ def getLength(_str):
     return len(_str if _str is not None else "")
 
 def check_money(bidding_budget_less,bidding_budget_greater,
-                win_bid_price_less,win_bid_price_greater):
+                win_bid_price_less,win_bid_price_greater,
+                moneys_less,moneys_greater,
+                moneys_attachment_less,moneys_attachment_greater):
 
     #只判断最高前六位
     if getLength(bidding_budget_less)>0:
@@ -811,6 +813,10 @@ def check_money(bidding_budget_less,bidding_budget_greater,
                     budget_is_same = True
             if budget_less>10000 and budget_greater>10000 and round(budget_less/10000,2)==round(budget_greater/10000,2):
                 budget_is_same = True
+            if budget_less in moneys_greater or budget_less in moneys_attachment_greater:
+                budget_is_same = True
+            if budget_greater in moneys_less or budget_greater in moneys_attachment_less:
+                budget_is_same = True
             if budget_is_same=="":
                 return False
 
@@ -824,6 +830,10 @@ def check_money(bidding_budget_less,bidding_budget_greater,
                     price_is_same = True
             if price_less>10000 and price_greater>10000 and round(price_less/10000,2)==round(price_greater/10000,2):
                 price_is_same = True
+            if price_less in moneys_greater or price_less in moneys_attachment_greater:
+                price_is_same = True
+            if price_greater in moneys_less or price_greater in moneys_attachment_less:
+                price_is_same = True
             if price_is_same=="":
                 return False
     return True
@@ -889,7 +899,7 @@ code_pattern = re.compile("[A-Za-z0-9\-\(\)()【】\.-]+")
 num_pattern = re.compile("^\d+(?:\.\d+)?$")
 num1_pattern = re.compile("[一二三四五六七八九A-Za-z]+")
 location_pattern = re.compile("[^\[【\(]{1,2}[市区镇县村路]")
-building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
+building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|结算审计|招标代理|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
 date_pattern = re.compile("\d{2,4}[\-\./年]\d{1,2}[\-\./月]\d{1,2}")
 def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[]):
     if code_greater is None:
@@ -985,7 +995,7 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
                     return False
     return True
 
-def check_product(product_less,product_greater,split_char=","):
+def check_product(product_less,product_greater,split_char=",",doctitle_refine_less='',doctitle_refine_greater=''):
     if getLength(product_less)>0 and getLength(product_greater)>0:
 
         _product_l = product_less.split(split_char)
@@ -997,7 +1007,7 @@ def check_product(product_less,product_greater,split_char=","):
             _product_l = a
         for _l in _product_l:
             for _g in _product_g:
-                if getSimilarityOfString(_l,_g)>=0.8:
+                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>-0 or doctitle_refine_less.find(_g)>=0:
                     same_count += 1
                     break
         if same_count/len(_product_l)>=0.5:
@@ -1019,6 +1029,8 @@ def check_package(package_less,package_greater,split_char=","):
     return True
 
 def check_time(json_time_less,json_time_greater):
+    has_same = False
+    has_diff = False
     if getLength(json_time_less)>0 and getLength(json_time_greater)>0:
         if isinstance(json_time_less,dict):
             time_less = json_time_less
@@ -1033,12 +1045,62 @@ def check_time(json_time_less,json_time_greater):
                 v1 = time_greater.get(k,"")
                 if getLength(v1)>0:
                     if v[:10]!=v1[:10]:
-                        return False
-    return True
+                        has_diff = True
+                    else:
+                        has_same = True
+    if has_same:
+        if has_diff:
+            return 1
+        return 2
+    if has_diff:
+        return 0
+    return 1
 
-def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater=""):
+def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater="",moneys_less=set(),moneys_greater=set(),moneys_attachment_less=set(),moneys_attachment_greater=set(),page_attachments_less="[]",page_attachments_greater="[]"):
     if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
         return 1
+
+
+    #一篇要素都在附件,且两篇附件md5有重叠
+    set_md5_less = set()
+    set_md5_greater = set()
+    list_md5_less = []
+    if page_attachments_less:
+        try:
+            list_md5_less = json.loads(page_attachments_less)
+        except Exception as e:
+            pass
+    list_md5_greater = []
+    if page_attachments_greater:
+        try:
+            list_md5_greater = json.loads(page_attachments_greater)
+        except Exception as e:
+            pass
+    for _l in list_md5_less:
+        _md5 = _l.get("fileMd5")
+        if _md5 is not None:
+            set_md5_less.add(_md5)
+    for _l in list_md5_greater:
+        _md5 = _l.get("fileMd5")
+        if _md5 is not None:
+            set_md5_greater.add(_md5)
+    if len(set_md5_less&set_md5_greater)>0 and len(set_md5_less&set_md5_greater)==len(set_md5_less):
+        one_in_attach = False
+        dict_enterprise_less = json.loads(nlp_enterprise_less)
+        dict_enterprise_greater = json.loads(nlp_enterprise_greater)
+        indoctextcon_less = dict_enterprise_less.get("indoctextcon",[])
+        notindoctextcon_less = dict_enterprise_less.get("notindoctextcon",[])
+        indoctextcon_greater = dict_enterprise_greater.get("indoctextcon",[])
+        notindoctextcon_greater = dict_enterprise_greater.get("notindoctextcon",[])
+        if len(indoctextcon_less)<=1 and len(notindoctextcon_less)>=2:
+            one_in_attach = True
+        if len(indoctextcon_greater)<=1 and len(notindoctextcon_greater)>=2:
+            one_in_attach = True
+        if one_in_attach:
+            if check_product(product_less,product_greater,doctitle_refine_less=doctitle_refine_less,doctitle_refine_greater=doctitle_refine_greater):
+                return 1
+
+
     if isinstance(project_codes_less,str):
         project_codes_less = [a for a in project_codes_less.split(",") if a!=""]
     elif project_codes_less is None:
@@ -1081,7 +1143,7 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
     if min(extract_count_less,extract_count_greater)<=3:
         if _prob<0.1:
             _prob = 0.15
-        if province_less!=province_greater:
+        if getLength(province_less)>0 and getLength(province_greater)>0 and province_less not in ("全国","未知") and province_greater not in ("全国","未知") and province_less!=province_greater:
             return 0
     if _prob<0.1:
         return _prob
@@ -1115,7 +1177,7 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
             check_result["code"] = 1
 
 
-    if not check_product(product_less,product_greater):
+    if not check_product(product_less,product_greater,doctitle_refine_less=doctitle_refine_less,doctitle_refine_greater=doctitle_refine_greater):
         check_result["product"] = 0
         check_result["pass"] = 0
         if b_log:
@@ -1145,8 +1207,12 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
         else:
             check_result["entity"] = 1
 
+    logging.info("moneys_less"+str(moneys_less)+"---"+str(moneys_attachment_less))
+    logging.info("moneys_less"+str(moneys_greater)+"---"+str(moneys_attachment_greater))
     if not check_money(bidding_budget_less,bidding_budget_greater,
-                       win_bid_price_less,win_bid_price_greater):
+                       win_bid_price_less,win_bid_price_greater,
+                       moneys_less,moneys_greater,
+                       moneys_attachment_less,moneys_attachment_greater):
         if b_log:
             logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
         check_result["money"] = 0
@@ -1172,7 +1238,8 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
             check_result["package"] = 1
 
     #added check
-    if not check_time(json_time_less,json_time_greater):
+    _time_check = check_time(json_time_less,json_time_greater)
+    if not _time_check or (_time_check==1 and docchannel_less in (51,103)):
         if b_log:
             logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
             if isinstance(json_time_less,dict):
@@ -1211,8 +1278,6 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
             return _prob
         else:
             return 0
-        if check_result.get("time",1)==0:
-            return 0
     return _prob
 
 def check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater=""):
@@ -1401,7 +1466,7 @@ def check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerp
             return 0
     return _prob
 
-@annotate("bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string->double")
+@annotate("bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->double")
 class f_dumplicate_check(BaseUDTF):
     def __init__(self):
         import logging
@@ -1414,18 +1479,34 @@ class f_dumplicate_check(BaseUDTF):
                 project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,
                 extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,
                 page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,
-                package_less,package_greater,json_time_less,json_time_greater,json_context):
-        _context = json.loads(json_context)
+                package_less,package_greater,json_time_less,json_time_greater,json_context,
+                province_less,province_greater,city_less,city_greater,district_less,district_greater,
+                web_source_no_less,web_source_no_greater,
+                extract_json_less,extract_json_greater,page_attachments_less,page_attachments_greater):
 
         min_counts = 100
-
-
-
-        for item in _context:
-            if item["counts"]<min_counts:
-                min_counts = item["counts"]
-
-        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=False)
+        if json_context is not None:
+            _context = json.loads(json_context)
+
+            for item in _context:
+                if item.get("counts",0)>0 and item.get("counts",0)<min_counts:
+                    min_counts = item["counts"]
+        _extract_less = {}
+        if extract_json_less is not None:
+            _extract_less = json.loads(extract_json_less)
+        _extract_greater = {}
+        if extract_json_greater is not None:
+            _extract_greater = json.loads(extract_json_greater)
+        moneys_less = set(_extract_less.get("moneys",[]))
+        moneys_attachment_less = set(_extract_less.get("moneys_attachment",[]))
+        moneys_greater = set(_extract_greater.get("moneys",[]))
+        moneys_attachment_greater = set(_extract_greater.get("moneys_attachment",[]))
+
+        if page_attachments_less is None:
+            page_attachments_less = '[]'
+        if page_attachments_greater is None:
+            page_attachments_greater = '[]'
+        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
         self.forward(_prob)
 
 @annotate("string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string,double")
@@ -1472,7 +1553,7 @@ class f_dumplicate_featureMatrix(BaseUDTF):
                 _error += str(a)
             self.forward("[6-%s]"%_error,0)
             return
-        if not check_product(product_less,product_greater):
+        if not check_product(product_less,product_greater,doctitle_refine_less=doctitle_refine_less,doctitle_refine_greater=doctitle_refine_greater):
             _error = "%s=%s"%(str(product_less),str(product_greater))
             self.forward("7-%s"%_error,0)
             return
@@ -1546,7 +1627,7 @@ class f_dumplicate_featureMatrix(BaseUDTF):
         self.forward(json_matrix,_prob)
         return
 
-@annotate('bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,double->string')
+@annotate('bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,double,string,string,string,string,string,string->string')
 class f_redump_probability_final_check(BaseUDAF):
     '''
     去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
@@ -1561,10 +1642,12 @@ class f_redump_probability_final_check(BaseUDAF):
     def new_buffer(self):
         return [list()]
 
-    def iterate(self, buffer,main_docid,docid,newly,docchannel,nlp_enterprise,product,package,json_dicttime,page_time,project_codes,project_name,doctitle_refine,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,confidence):
+    def iterate(self, buffer,main_docid,docid,newly,docchannel,nlp_enterprise,product,package,json_dicttime,page_time,project_codes,project_name,doctitle_refine,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,confidence,
+                province,city,district,web_source_no,extract_json,page_attachments):
         buffer[0].append({"main_docid":main_docid,"docid":docid,"docchannel":docchannel,"nlp_enterprise":nlp_enterprise,"product":product,"package":package,"json_dicttime":json_dicttime,"page_time":page_time,
                           "project_codes":project_codes,"project_name":project_name,"doctitle_refine":doctitle_refine,"tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,
-                          "win_bid_price":win_bid_price,"extract_count":extract_count,"confidence":confidence})
+                          "win_bid_price":win_bid_price,"extract_count":extract_count,"confidence":confidence,
+                          "province":province,"city":city,"district":district,"web_source_no":web_source_no,"extract_json":extract_json,"page_attachments":page_attachments})
 
     def merge(self, buffer, pbuffer):
         buffer[0].extend(pbuffer[0])
@@ -1574,8 +1657,10 @@ class f_redump_probability_final_check(BaseUDAF):
         the_group = buffer[0]
         the_group.sort(key=lambda x:x["confidence"],reverse=True)
         _index = 0
+
+        final_group = []
         if len(the_group)>0:
-            _index = 1
+            _index = 0
             while _index<len(the_group):
                 document_greater = the_group[_index]
                 docid_greater = document_greater["docid"]
@@ -1595,10 +1680,16 @@ class f_redump_probability_final_check(BaseUDAF):
                 fingerprint_greater = document_greater.get("fingerprint","")
                 project_name_greater = document_greater["project_name"]
                 extract_count_greater = document_greater["extract_count"]
-                _less_index = 0
-                while _less_index<_index:
+                province_greater = document_greater["province"]
+                city_greater = document_greater["city"]
+                district_greater = document_greater["district"]
+                web_source_no_greater = document_greater["web_source_no"]
+                extract_json_greater = document_greater["extract_json"]
+                page_attachments_greater = document_greater["page_attachments"]
+                _pass = True
+
+                for document_less in final_group:
 
-                    document_less = the_group[_less_index]
                     docid_less = document_less["docid"]
                     docchannel_less = document_less["docchannel"]
                     page_time_less = document_less["page_time"]
@@ -1616,21 +1707,44 @@ class f_redump_probability_final_check(BaseUDAF):
                     fingerprint_less = document_less.get("fingerprint","")
                     project_name_less = document_less["project_name"]
                     extract_count_less = document_less["extract_count"]
-
-                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,len(the_group),b_log=False)
+                    province_less = document_less["province"]
+                    city_less = document_less["city"]
+                    district_less = document_less["district"]
+                    web_source_no_less = document_less["web_source_no"]
+                    extract_json_less = document_less["extract_json"]
+                    page_attachments_less = document_less["page_attachments"]
+
+                    _extract_less = {}
+                    if extract_json_less is not None:
+                        _extract_less = json.loads(extract_json_less)
+                    _extract_greater = {}
+                    if extract_json_greater is not None:
+                        _extract_greater = json.loads(extract_json_greater)
+                    moneys_less = set(_extract_less.get("moneys",[]))
+                    moneys_attachment_less = set(_extract_less.get("moneys_attachment",[]))
+                    moneys_greater = set(_extract_greater.get("moneys",[]))
+                    moneys_attachment_greater = set(_extract_greater.get("moneys_attachment",[]))
+
+                    if page_attachments_less is None:
+                        page_attachments_less = '[]'
+                    if page_attachments_greater is None:
+                        page_attachments_greater = '[]'
+
+                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,len(the_group),b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
 
                     if _prob<0.1:
+                        _pass = False
                         break
 
-                    _less_index += 1
-                if _less_index!=_index:
+                if _pass:
+                    final_group.append(document_greater)
+                else:
                     break
                 _index += 1
 
         dumplicates = ""
         if _index>1:
             logging.info("index/whole:%d/%d"%(_index,len(the_group)))
-            final_group = the_group[:_index]
             final_group.sort(key=lambda x:x["docid"])
             final_group.sort(key=lambda x:x["extract_count"],reverse=True)
             _set = set()
@@ -1855,7 +1969,7 @@ class f_set_docid_binaryChart(BaseUDAF):
 
     def terminate(self, buffer):
         list_docs = buffer[0]
-        list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*2)
+        list_timeGroups = split_with_time(list_docs,"page_time_stamp",86400*7)
 
         list_group = []
 
@@ -1898,7 +2012,7 @@ class f_set_docid_binaryChart(BaseUDAF):
 
 
 
-def split_with_time(list_dict,sort_key,timedelta=86400*2):
+def split_with_time(list_dict,sort_key,timedelta=86400*7):
     if len(list_dict)>0:
         if sort_key in list_dict[0]:
             list_dict.sort(key=lambda x:x[sort_key])
@@ -2013,7 +2127,7 @@ class f_stamp_squence(BaseUDAF):
         list_stamp.sort(key=lambda x:x)
         list_stamp_final = []
         _begin = 0
-        _time_decase = 86400*2
+        _time_decase = 86400*7
         logging.info(str(list_stamp))
         for _index in range(len(list_stamp)-1):
             if list_stamp[_index+1]-list_stamp[_index]<_time_decase:

+ 1 - 1
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2287,7 +2287,7 @@ def timeAdd(_time,days,format="%Y-%m-%d",minutes=0):
 #     except Exception as e:
 #         return None
 
-def check_time_merge(json_time_less,json_time_greater,b_log,set_time_key=set([project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_publicity_end,project_time_publicity_start,project_time_registration_end,project_time_registration_start])):
+def check_time_merge(json_time_less,json_time_greater,b_log,set_time_key=set([project_time_bidclose,project_time_bidopen,project_time_bidstart,project_time_commencement,project_time_completion,project_time_earnest_money_start,project_time_earnest_money_end,project_time_get_file_end,project_time_get_file_start,project_time_registration_end,project_time_registration_start])):
 
     same_count = 0
     if getLength(json_time_less)>0 and getLength(json_time_greater)>0:

+ 104 - 0
BaseDataMaintenance/model/oracle/T_SHEN_PI_XIANG_MU.py

@@ -0,0 +1,104 @@
+
+
+from BaseDataMaintenance.model.oracle.BaseModel import BaseModel
+from datetime import datetime
+from BaseDataMaintenance.common.Utils import getCurrent_date,log
+
+T_SHEN_PI_XIANG_MU_ID = "id"
+T_SHEN_PI_XIANG_MU_WEB_SOURCE_NO = "web_source_no"
+T_SHEN_PI_XIANG_MU_AREA = "area"
+T_SHEN_PI_XIANG_MU_PROVINCE = "province"
+T_SHEN_PI_XIANG_MU_CITY = "city"
+T_SHEN_PI_XIANG_MU_DISTRICT = "district"
+T_SHEN_PI_XIANG_MU_WEB_SOURCE_NAME = "web_source_name"
+T_SHEN_PI_XIANG_MU_SP_TYPE = "sp_type"
+T_SHEN_PI_XIANG_MU_DETAILLINK = "detaillink"
+T_SHEN_PI_XIANG_MU_RECORD_ID = "record_id"
+T_SHEN_PI_XIANG_MU_PAGE_TITLE = "page_title"
+T_SHEN_PI_XIANG_MU_PAGE_TIME = "page_time"
+T_SHEN_PI_XIANG_MU_PAGE_CONTENT = "page_content"
+T_SHEN_PI_XIANG_MU_PAGE_CODE = "page_code"
+T_SHEN_PI_XIANG_MU_CREATE_TIME = "create_time"
+T_SHEN_PI_XIANG_MU_SOURCE_TYPE = "source_type"
+T_SHEN_PI_XIANG_MU_SOURCE_STAGE = "source_stage"
+T_SHEN_PI_XIANG_MU_ATTACHMENT_PATH = "attachment_path"
+T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS = "page_attachments"
+
+
+class T_SHEN_PI_XIANG_MU(BaseModel):
+
+
+    def __init__(self,_dict):
+        self.all_columns = []
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+
+
+
+    def getProperties(self):
+        return self.__dict__
+
+    def getProperties_ots(self):
+        new_dict = {}
+        for k,v in self.__dict__.items():
+            if v is not None:
+                if isinstance(v,(str,int,float)):
+                    pass
+                elif isinstance(v,(datetime)):
+                    v = v.strftime("%Y-%m-%d %H:%M:%S")
+                else:
+                    v = str(v)
+                new_dict[k] = v
+        docid = int(new_dict.get("id",0))
+        partition_key = docid%500+1
+
+        new_dict["partition_key"] = partition_key
+        new_dict["docid"] = docid
+        new_dict["original_id"] = str(new_dict.get(T_SHEN_PI_XIANG_MU_ID))
+        new_dict.pop(T_SHEN_PI_XIANG_MU_ID)
+
+        new_dict["uuid"] = str(new_dict.get(T_SHEN_PI_XIANG_MU_ID))
+
+        new_dict["crtime"] = new_dict.get(T_SHEN_PI_XIANG_MU_CREATE_TIME)
+        new_dict["docchannel"] = 302
+
+        new_dict["doctitle"] = new_dict.get(T_SHEN_PI_XIANG_MU_PAGE_TITLE,"")
+        new_dict.pop(T_SHEN_PI_XIANG_MU_PAGE_TITLE)
+
+        new_dict["dochtmlcon"] = new_dict.get(T_SHEN_PI_XIANG_MU_PAGE_CONTENT)
+        new_dict.pop(T_SHEN_PI_XIANG_MU_PAGE_CONTENT)
+
+        new_dict["detail_link"] = new_dict.get(T_SHEN_PI_XIANG_MU_DETAILLINK)
+        new_dict.pop(T_SHEN_PI_XIANG_MU_DETAILLINK)
+
+        new_dict[T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS] = new_dict.get(T_SHEN_PI_XIANG_MU_ATTACHMENT_PATH,"[]")
+
+        opertime = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+        publishtime = "%s %s"%(new_dict.get("page_time",""),opertime.split(" ")[1])
+        new_dict["opertime"] = opertime
+        new_dict["publishtime"] = publishtime
+        if "docchannel" in new_dict:
+            new_dict["original_docchannel"] = new_dict["docchannel"]
+        return new_dict
+
+    def select_rows(conn,max_shenpi_id,limit=500):
+        list_result = []
+        s_limit = ""
+        if limit is not None:
+            s_limit = "limit %d"%limit
+        s_where = " where id>%d "%(max_shenpi_id)
+
+        cursor = conn.cursor()
+        sql = "select %s from %s %s %s order by id asc"%("*","t_shen_pi_xiang_mu_new",s_where,s_limit)
+        log("select rows:%s"%(sql))
+        cursor.execute(sql)
+
+        vol = cursor.description
+        rows = cursor.fetchall()
+        for row in rows:
+            _dict = {}
+            for _vol,_val in zip(vol,row):
+                _name = _vol[0]
+                _dict[_name] = _val
+            list_result.append(T_SHEN_PI_XIANG_MU(_dict))
+        return list_result

+ 19 - 10
BaseDataMaintenance/model/ots/document.py

@@ -20,6 +20,7 @@ document_status = "status"
 document_page_time = "page_time"
 document_attachment_extract_status = "attachment_extract_status"
 document_web_source_no = "web_source_no"
+document_web_source_name = "web_source_name"
 document_fingerprint = "fingerprint"
 document_opertime = "opertime"
 document_docchannel = "docchannel"
@@ -301,13 +302,16 @@ def turn_document_status():
     from BaseDataMaintenance.model.ots.attachment import attachment_filemd5,attachment_file_title,attachment_file_link
     ots_client = getConnect_ots()
     def producer(task_queue,ots_client):
+        from BaseDataMaintenance.model.ots.document_tmp import Document_tmp
 
 
         bool_query = BoolQuery(
             must_queries=[
                 # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
-                # WildcardQuery("web_source_no","14763*"),
-                RangeQuery("status",0,1),
+                WildcardQuery("web_source_no","03716-*"),
+                RangeQuery("page_time","2024-04-24"),
+                TermQuery("save",1)
+                # RangeQuery("status",0,1),
                 # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")),
                 # TermQuery("docid",397656324)
                 # BoolQuery(should_queries=[
@@ -337,24 +341,24 @@ def turn_document_status():
         #
         # )
 
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
                                                                        columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
         list_data = getRow_ots(rows)
         print(total_count)
         _count = len(list_data)
         for _data in list_data:
-            _document = Document(_data)
+            _document = Document_tmp(_data)
             task_queue.put(_document)
         while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                            columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
             _count += len(list_data)
             print("%d/%d"%(_count,total_count))
             for _data in list_data:
-                _document = Document(_data)
+                _document = Document_tmp(_data)
                 task_queue.put(_document)
 
         # docids = [223820830,224445409]
@@ -364,13 +368,18 @@ def turn_document_status():
         #              }
         #     task_queue.put(Document(_dict))
         # import pandas as pd
-        # df = pd.read_excel("G:\\20221212error.xlsx")
-        # for docid in df["docid"]:
+        # df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx")
+        # for docid in df["docid1"]:
         #     _dict = {document_docid:int(docid),
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        log("task_queue size:%d"%(task_queue.qsize()))
+        # for docid in df["docid2"]:
+        #     _dict = {document_docid:int(docid),
+        #              document_partitionkey:int(docid)%500+1,
+        #              }
+        #     task_queue.put(Document(_dict))
+        # log("task_queue size:%d"%(task_queue.qsize()))
 
     def _handle(item,result_queue,ots_client):
         #change attach value
@@ -396,7 +405,7 @@ def turn_document_status():
         # item.setValue(document_province,"广东",True)
         # item.setValue(document_city,"珠海",True)
         # item.setValue(document_district,"金湾区",True)
-        item.setValue(document_status,1,True)
+        item.setValue(document_status,66,True)
         # print(item.getProperties())
         item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_docid)))