Procházet zdrojové kódy

Merge branch 'master' of http://192.168.2.103:3000/luojiehua/BaseDataMaintenance

znj před 6 měsíci
rodič
revize
7eeb6c31c2

+ 2 - 0
.idea/encodings.xml

@@ -2,8 +2,10 @@
 <project version="4">
   <component name="Encoding">
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/attachmentProcessTime.xlsx" charset="GBK" />
+    <file url="file://$PROJECT_DIR$/BaseDataMaintenance/chat/chatUtil.py" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/dataSource/searchPaddle.py" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/maintenance/attachment/2022-01-18_183521_export11.xlsx" charset="GBK" />
+    <file url="file://$PROJECT_DIR$/BaseDataMaintenance/maintenance/gpt_extract.py" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/maintenance/product/select_product_exclude_name_from_tw_prod.csv" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/maintenance/product/select_product_product_name_exclude_name.csv" charset="GBK" />
     <file url="file://$PROJECT_DIR$/BaseDataMaintenance/maintenance/product/update_product.csv" charset="GBK" />

+ 2 - 3
BaseDataMaintenance/common/Utils.py

@@ -720,7 +720,6 @@ def getMultipleFactor(unit):
     MultipleFactor = {"兆":Decimal(1000000000000),"亿":Decimal(100000000),"万":Decimal(10000),"仟":Decimal(1000),"千":Decimal(1000),"佰":Decimal(100),"百":Decimal(100),"拾":Decimal(10),"十":Decimal(10),"元":Decimal(1),"圆":Decimal(1),"角":round(Decimal(0.1),1),"分":round(Decimal(0.01),2)}
     return MultipleFactor.get(unit)
 
-
 def getUnifyMoney(money):
     '''
     @summary:将中文金额字符串转换为数字金额
@@ -735,9 +734,9 @@ def getUnifyMoney(money):
     money = re.sub("[,,]","",money)
     money = re.sub("[^0-9.零壹贰叁肆伍陆柒捌玖拾佰仟萬億圆十百千万亿元角分]","",money)
     result = Decimal(0)
-    chnDigits = ["零", "壹", "贰", "叁", "肆", "伍", "陆", "柒", "捌", "玖","一","二","三","四","五","六","七","八","九"]
+    chnDigits = ["零", "壹", "贰", "叁", "肆", "伍", "陆", "柒", "捌", "玖"]
     # chnFactorUnits = ["兆", "亿", "万", "仟", "佰", "拾","圆","元","角","分"]
-    chnFactorUnits = ["圆", "元","兆", "亿", "万", "仟", "佰", "拾", "角", "分", '十', '百', '千']
+    chnFactorUnits = ["兆", "亿", "万", "仟", '千', "佰", '百', "拾", '十',"圆", "元", "角", "分"]  # 20240611 修复大写提取错误 '陆拾陆亿伍千柒佰零叁万肆千叁佰陆拾伍元' Decimal('11607430365')
 
     LowMoneypattern = re.compile("^[\d,]+(\.\d+)?$")
     BigMoneypattern = re.compile("^零?(?P<BigMoney>[%s])$"%("".join(chnDigits)))

+ 2 - 2
BaseDataMaintenance/common/ossUtils.py

@@ -108,7 +108,7 @@ def test_download(filemd5):
 
 
 if __name__=="__main__":
-    # print(getMDFFromFile('8a9c96a68803c2ad01881d0ee93618e5.pdf'))
-    test_download("892bde698088f1d61b5310782550d0e1")
+    print(getMDFFromFile(r'G:\新建文件夹\WeChat Files\wxid_kluerlj8cn3b21\FileStorage\File\2024-09\中国区超低氮锅炉电锅炉招标文件与附件(1).zip'))
+    # test_download("892bde698088f1d61b5310782550d0e1")
     # print(bucket.sign_url("GET","0015//20220623/2022-06-22/WGH001018/1655926900020.png",86500*30))
     # print(time.strftime("%Y-%m-%d",time.localtime(1658655178)))

+ 50 - 0
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -20,6 +20,7 @@ from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 
 from BaseDataMaintenance.maintenance.dataflow_settings import *
 
+
 import pandas as pd
 
 
@@ -35,6 +36,48 @@ flow_init_check_dir = "/data/python/flow_init_check"
 flow_dumplicate_log_path = "/home/appuser/python/flow_dumplicate.log"
 
 
+def fixDoc_to_queue_init(filename=""):
+    import pandas as pd
+    from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
+    from BaseDataMaintenance.model.oracle.TouSuTemp import dict_oracle2ots as dict_oracle2ots_tousu
+
+    from BaseDataMaintenance.dataSource.source import getConnection_oracle
+    current_path = os.path.abspath(os.path.dirname(__file__))
+    if filename=="":
+        filename = os.path.join(current_path,"check.xlsx")
+    df = pd.read_excel(filename)
+    if "docchannel" in dict_oracle2ots:
+        dict_oracle2ots.pop("docchannel")
+    row_name = ",".join(list(dict_oracle2ots.keys()))
+
+    list_tousu_keys = []
+    for k,v in dict_oracle2ots_tousu.items():
+        if str(k).isupper():
+            list_tousu_keys.append(k)
+    row_name_tousu = ",".join(list(list_tousu_keys))
+    conn = getConnection_oracle()
+    cursor = conn.cursor()
+    _count = 0
+    for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
+        if _exists==0 and _toolong==0:
+            _count += 1
+            is_tousu = False
+            if tablename in ('bxkc.t_wei_fa_ji_lu_temp','bxkc.t_tou_su_chu_li_temp','bxkc.t_qi_ta_shi_xin_temp'):
+                is_tousu = True
+            _source = str(tablename).replace("_TEMP","")
+            if is_tousu:
+                _source = str(tablename).replace("_temp","")
+            _rowname = row_name_tousu if is_tousu else row_name
+
+            sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,_rowname,_rowname,_source,uuid)
+            log("%d:%s"%(_count,sql))
+            cursor.execute(sql)
+
+    conn.commit()
+    conn.close()
+
+    return _count
+
 class BaseDataMonitor():
 
     def __init__(self):
@@ -200,6 +243,13 @@ class BaseDataMonitor():
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
+            _count = fixDoc_to_queue_init(check_filename)
+            if _count>0:
+                _msg = "数据遗漏检查%d条公告已重新同步"%(_count)
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                df_data.to_excel("%s_bak.xlsx"%check_filename)
+
+
 
 
         except Exception as e:

+ 3 - 3
BaseDataMaintenance/dataSource/setttings.py

@@ -45,10 +45,10 @@ oracle_host = "192.168.0.150"
 oracle_port = 1522
 # oracle_user = "BXKC_DATA_READONLY"
 # oracle_pass = "nXcQG3Z8DW=Hzr!h"
-oracle_user = "BXKC_WRITE"
+# oracle_user = "BXKC_WRITE"
+# oracle_pass = "PHNhX3%rVy4@fDB&"
+oracle_user = "bxkc_db"
 oracle_pass = "PHNhX3%rVy4@fDB&"
-# oracle_user = "bxkc_db"
-# oracle_pass = "xb9F#24Hd#5rStr9"
 oracle_db = "yanphone"
 
 ots_AccessKeyId = 'LTAI5tFuoxHm8Uxrr5nT8wTZ'

+ 3 - 2
BaseDataMaintenance/fixDoc_to_queue_extract.py

@@ -3,9 +3,10 @@ import sys,os
 
 sys.path.append(os.path.dirname(__file__)+"/..")
 
-from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract,fixDoc_to_queue_init
+from BaseDataMaintenance.maintenance.dataflow_mq import fixDoc_to_queue_extract
+from BaseDataMaintenance.dataMonitor.data_monitor import fixDoc_to_queue_init
 
 
 if __name__ == '__main__':
     # fixDoc_to_queue_extract()
-    fixDoc_to_queue_init(filename="/data/python/flow_init_check/flow_init_2023-12-28.xlsx")
+    fixDoc_to_queue_init(filename="/data/python/flow_init_check/flow_init_2024-12-02.xlsx")

+ 1 - 1
BaseDataMaintenance/maintenance/attachment/attachmentProcess.py

@@ -811,7 +811,7 @@ class AttachmentRec():
                             attach.setValue(attachment_process_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
                             attach.setValue(attachment_status,ATTACHMENT_PROCESSED_FAILED)
                             log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
-                            sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
+                            # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
 
 
                 attach.update_row(self.ots_client)

+ 206 - 122
BaseDataMaintenance/maintenance/dataflow.py

@@ -260,7 +260,7 @@ class Dataflow():
                         log("process filemd5:%s of type:%s with size:%.3fM download:%ds recognize takes %ds,ret_size:%d"%(filemd5,_filetype,round(_size/1024/1024,4),time_download,time.time()-start_time,len(_html)))
                     else:
                         log("attach interface failed of docid:%s filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
-                        sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
+                        # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
                         _html = ""
                         return False
 
@@ -350,8 +350,8 @@ class Dataflow():
 
 
     def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
-                                  set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
-                                  set_range=set(["page_time","status"]),set_phrase=set(["doctitle"])):
+                                  set_term=set(["doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
+                                  set_range=set(["page_time","status"]),set_phrase=set(["doctitle","project_name"])):
         list_must_queries = []
         list_must_no_queries = []
         for k,v in _dict.items():
@@ -415,7 +415,10 @@ class Dataflow():
         if agency is not None and agency!="":
             extract_count += 1
         if sub_docs_json is not None:
-            sub_docs = json.loads(sub_docs_json)
+            try:
+                sub_docs = json.loads(sub_docs_json)
+            except Exception as e:
+                sub_docs = []
             sub_docs.sort(key=lambda x:float(x.get("bidding_budget",0)),reverse=True)
             sub_docs.sort(key=lambda x:float(x.get("win_bid_price",0)),reverse=True)
             # log("==%s"%(str(sub_docs)))
@@ -2235,6 +2238,8 @@ class Dataflow_dumplicate(Dataflow):
         _dict["package"] = self.c_f_get_package.evaluate(extract_json)
         _dict["project_name"] = _extract.get("name","")
         _dict["dict_time"] = self.get_dict_time(_extract)
+        _dict["punish"] = _extract.get("punish",{})
+        _dict["approval"] = _extract.get("approval",[])
 
     def dumplicate_fianl_check(self,base_list,b_log=False):
         the_group = base_list
@@ -2272,22 +2277,22 @@ class Dataflow_dumplicate(Dataflow):
     def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
         document_less = _dict1
         docid_less = _dict1["docid"]
-        docchannel_less = document_less["docchannel"]
-        page_time_less = document_less["page_time"]
+        docchannel_less = document_less.get("docchannel",0)
+        page_time_less = document_less.get("page_time")
         doctitle_refine_less = document_less["doctitle_refine"]
-        project_codes_less = document_less["project_codes"]
+        project_codes_less = document_less.get("project_codes")
         nlp_enterprise_less = document_less["nlp_enterprise"]
-        tenderee_less = document_less["tenderee"]
-        agency_less = document_less["agency"]
+        tenderee_less = document_less.get("tenderee","")
+        agency_less = document_less.get("agency")
         win_tenderer_less = document_less["win_tenderer"]
         bidding_budget_less = document_less["bidding_budget"]
         win_bid_price_less = document_less["win_bid_price"]
-        product_less = document_less["product"]
-        package_less = document_less["package"]
-        json_time_less = document_less["dict_time"]
-        project_name_less = document_less["project_name"]
-        fingerprint_less = document_less["fingerprint"]
-        extract_count_less = document_less["extract_count"]
+        product_less = document_less.get("product")
+        package_less = document_less.get("package")
+        json_time_less = document_less.get("dict_time")
+        project_name_less = document_less.get("project_name")
+        fingerprint_less = document_less.get("fingerprint")
+        extract_count_less = document_less.get("extract_count",0)
         web_source_no_less = document_less.get("web_source_no")
         province_less = document_less.get("province")
         city_less = document_less.get("city")
@@ -2295,26 +2300,29 @@ class Dataflow_dumplicate(Dataflow):
         moneys_less = document_less.get("moneys")
         moneys_attachment_less = document_less.get("moneys_attachment")
         page_attachments_less = document_less.get(document_tmp_attachment_path,"[]")
+        punish_less = document_less.get("punish",{})
+        approval_less = document_less.get("approval",[])
+        source_type_less = document_less.get("source_type")
 
 
         document_greater = _dict2
         docid_greater = _dict2["docid"]
         page_time_greater = document_greater["page_time"]
-        docchannel_greater = document_greater["docchannel"]
-        doctitle_refine_greater = document_greater["doctitle_refine"]
+        docchannel_greater = document_greater.get("docchannel",0)
+        doctitle_refine_greater = document_greater.get("doctitle_refine","")
         project_codes_greater = document_greater["project_codes"]
         nlp_enterprise_greater = document_greater["nlp_enterprise"]
-        tenderee_greater = document_greater["tenderee"]
-        agency_greater = document_greater["agency"]
+        tenderee_greater = document_greater.get("tenderee","")
+        agency_greater = document_greater.get("agency","")
         win_tenderer_greater = document_greater["win_tenderer"]
         bidding_budget_greater = document_greater["bidding_budget"]
         win_bid_price_greater = document_greater["win_bid_price"]
-        product_greater = document_greater["product"]
-        package_greater = document_greater["package"]
+        product_greater = document_greater.get("product")
+        package_greater = document_greater.get("package")
         json_time_greater = document_greater["dict_time"]
-        project_name_greater = document_greater["project_name"]
-        fingerprint_greater = document_greater["fingerprint"]
-        extract_count_greater = document_greater["extract_count"]
+        project_name_greater = document_greater.get("project_name")
+        fingerprint_greater = document_greater.get("fingerprint")
+        extract_count_greater = document_greater.get("extract_count",0)
         web_source_no_greater = document_greater.get("web_source_no")
         province_greater = document_greater.get("province")
         city_greater = document_greater.get("city")
@@ -2324,12 +2332,16 @@ class Dataflow_dumplicate(Dataflow):
         moneys_attachment_greater = document_greater.get("moneys_attachment")
         page_attachments_greater = document_greater.get(document_tmp_attachment_path,"[]")
 
+        punish_greater = document_greater.get("punish",{})
+        approval_greater = document_greater.get("approval",[])
+        source_type_greater = document_greater.get("source_type")
+
         hard_level=1
         if web_source_no_less==web_source_no_greater=="17397-3":
             hard_level=2
 
         if self.check_rule==1:
-            _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
+            _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater,punish_less=punish_less,punish_greater=punish_greater,approval_less=approval_less,approval_greater=approval_greater,source_type_less=source_type_less,source_type_greater=source_type_greater)
         else:
             _prob = check_dumplicate_rule_test(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=b_log,hard_level=hard_level,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater)
 
@@ -2559,7 +2571,7 @@ class Dataflow_dumplicate(Dataflow):
                 else:
                     bool_query = _query
                 rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
-                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=30,get_total_count=True),
+                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=60,get_total_count=True),
                                                                                     ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
                 list_dict = getRow_ots(rows)
                 list_data = []
@@ -2854,7 +2866,7 @@ class Dataflow_dumplicate(Dataflow):
 
         return list_rules,table_name,table_index
 
-    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_name]):
+    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]):
         q_size = self.queue_dumplicate.qsize()
         log("dumplicate queue size %d"%(q_size))
 
@@ -2939,7 +2951,7 @@ class Dataflow_dumplicate(Dataflow):
         # mt.run()
 
 
-    def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
+    def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment,document_tenderee_code,document_agency_code,document_candidates]):
         '''
         根据docid查询公告内容,先查询document_tmp,再查询document
         :param list_docids:
@@ -3049,7 +3061,7 @@ class Dataflow_dumplicate(Dataflow):
                     continue
             if v is None or v=="" or v=="[]" or v=="未知":
                 continue
-            if k in (project_project_dynamics,project_product,project_project_codes,project_docids):
+            if k in (project_project_dynamics,project_product,project_project_codes,project_docids,project_candidates):
                 continue
             _dict[k] = v
         for _proj in projects:
@@ -3058,14 +3070,19 @@ class Dataflow_dumplicate(Dataflow):
             if _proj.get(project_page_time,"")<project_dict.get(project_page_time,""):
                 _proj[project_page_time] = project_dict.get(project_page_time,"")
 
-        #拼接属性
-        append_dict = {}
-        set_docid = set()
-        set_product = set()
-        set_code = set()
-        set_nlp_enterprise = set()
-        set_nlp_enterprise_attachment = set()
+
         for _proj in projects:
+            #拼接属性
+            append_dict = {}
+            set_docid = set()
+            set_product = set()
+            set_code = set()
+            set_nlp_enterprise = set()
+            set_nlp_enterprise_attachment = set()
+            set_candidates = set()
+
+
+
             _docids = _proj.get(project_docids,"")
             _codes = _proj.get(project_project_codes,"")
             _product = _proj.get(project_product,"")
@@ -3081,15 +3098,22 @@ class Dataflow_dumplicate(Dataflow):
             try:
                 set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
                 set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
-            except Exception as e:
-                pass
+                list_candidates = json.loads(project_dict.get(project_candidates,"[]"))
+                for item in list_candidates:
+                    if item.get("name") is not None and item.get("name") not in set_candidates:
+                        set_candidates.add(item.get("name"))
 
-            set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
-            set_product = set_product | set(project_dict.get(project_product,"").split(","))
 
-            try:
+                set_code = set_code | set(project_dict.get(project_project_codes,"").split(","))
+                set_product = set_product | set(project_dict.get(project_product,"").split(","))
+
                 set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
                 set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
+
+                for item in json.loads(_proj.get(project_candidates,"[]")):
+                    if item.get("name") is not None and item.get("name") not in set_candidates:
+                        set_candidates.add(item.get("name"))
+                        list_candidates.append(item)
             except Exception as e:
                 pass
 
@@ -3101,6 +3125,7 @@ class Dataflow_dumplicate(Dataflow):
 
             append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
             append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
+            append_dict[project_candidates] = json.dumps(list_candidates,ensure_ascii=False)
 
 
             dict_dynamic = {}
@@ -3119,6 +3144,7 @@ class Dataflow_dumplicate(Dataflow):
             list_dynamics.sort(key=lambda x:x.get(document_page_time,""))
 
             append_dict[project_project_dynamics] = json.dumps(list_dynamics[:100],ensure_ascii=False)
+
             _proj.update(append_dict)
 
 
@@ -3151,74 +3177,84 @@ class Dataflow_dumplicate(Dataflow):
 
 
         #更新私有属性
-        for _pp in list_package_properties:
-
-            flag_update = False
-            sub_project_name = _pp.get(project_sub_project_name,"")
-            if sub_project_name=="Project":
-                sub_project_name = ""
-            win_tenderer = _pp.get(project_win_tenderer,"")
-            win_bid_price = _pp.get(project_win_bid_price,0)
-            bidding_budget = _pp.get(project_bidding_budget,0)
-            if win_tenderer!="" and bidding_budget!=0:
-                _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
-                if _key in dict_package:
-                    if self.is_same_package(_pp,dict_package[_key]):
-                        ud = self.getUpdate_dict(_pp)
-                        self.set_project_uuid(ud,dict_package[_key].get("uuid"))
-                        dict_package[_key].update(ud)
-                        flag_update = True
-                        continue
-            if win_tenderer!="" and  win_bid_price!=0:
-                _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
-                if _key in dict_package:
-                    if self.is_same_package(_pp,dict_package[_key]):
-                        ud = self.getUpdate_dict(_pp)
-                        self.set_project_uuid(ud,dict_package[_key].get("uuid"))
-                        dict_package[_key].update(ud)
-                        flag_update = True
-                        continue
-            if win_tenderer!="":
-                _key = "%s-%s"%(sub_project_name,win_tenderer)
-                if _key in dict_package:
-                    if self.is_same_package(_pp,dict_package[_key]):
-                        ud = self.getUpdate_dict(_pp)
-                        self.set_project_uuid(ud,dict_package[_key].get("uuid"))
-                        dict_package[_key].update(ud)
-                        flag_update = True
-                        continue
-            if bidding_budget!=0:
-                _key = "%s-%s"%(sub_project_name,str(bidding_budget))
-                if _key in dict_package:
-                    if self.is_same_package(_pp,dict_package[_key]):
-                        ud = self.getUpdate_dict(_pp)
-                        self.set_project_uuid(ud,dict_package[_key].get("uuid"))
-                        dict_package[_key].update(ud)
-                        flag_update = True
-                        continue
-            if not flag_update:
-                _pp.update(project_dict)
-                projects.append(_pp)
+        if len(projects)==1 and len(list_package_properties)==1:
+            _pp = list_package_properties[0]
+            pp = projects[0]
+            ud = self.getUpdate_dict(_pp)
+            self.set_project_uuid(ud,pp.get("uuid"))
+            pp.update(_pp)
+        else:
 
+            for _pp in list_package_properties:
 
-                _counts = 0
+                flag_update = False
+                sub_project_name = _pp.get(project_sub_project_name,"")
+                if sub_project_name=="Project":
+                    sub_project_name = ""
+                win_tenderer = _pp.get(project_win_tenderer,"")
+                win_bid_price = _pp.get(project_win_bid_price,0)
+                bidding_budget = _pp.get(project_bidding_budget,0)
                 if win_tenderer!="" and bidding_budget!=0:
                     _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
-                    dict_package[_key] = _pp
-                _counts += 1
+                    if _key in dict_package:
+                        if self.is_same_package(_pp,dict_package[_key]):
+                            ud = self.getUpdate_dict(_pp)
+                            self.set_project_uuid(ud,dict_package[_key].get("uuid"))
+                            dict_package[_key].update(ud)
+                            flag_update = True
+                            continue
                 if win_tenderer!="" and  win_bid_price!=0:
                     _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
-                    dict_package[_key] = _pp
-                    _counts +=1
-                if _counts==0:
-                    if win_tenderer!="":
-                        _key = "%s-%s"%(sub_project_name,win_tenderer)
+                    if _key in dict_package:
+                        if self.is_same_package(_pp,dict_package[_key]):
+                            ud = self.getUpdate_dict(_pp)
+                            self.set_project_uuid(ud,dict_package[_key].get("uuid"))
+                            dict_package[_key].update(ud)
+                            flag_update = True
+                            continue
+                if win_tenderer!="":
+                    _key = "%s-%s"%(sub_project_name,win_tenderer)
+                    if _key in dict_package:
+                        if self.is_same_package(_pp,dict_package[_key]):
+                            ud = self.getUpdate_dict(_pp)
+                            self.set_project_uuid(ud,dict_package[_key].get("uuid"))
+                            dict_package[_key].update(ud)
+                            flag_update = True
+                            continue
+                if bidding_budget!=0:
+                    _key = "%s-%s"%(sub_project_name,str(bidding_budget))
+                    if _key in dict_package:
+                        if self.is_same_package(_pp,dict_package[_key]):
+                            ud = self.getUpdate_dict(_pp)
+                            self.set_project_uuid(ud,dict_package[_key].get("uuid"))
+                            dict_package[_key].update(ud)
+                            flag_update = True
+                            continue
+                if not flag_update:
+                    _pp.update(project_dict)
+                    projects.append(_pp)
+
+
+                    _counts = 0
+                    if win_tenderer!="" and bidding_budget!=0:
+                        _key = "%s-%s-%s"%(sub_project_name,str(win_tenderer),str(bidding_budget))
                         dict_package[_key] = _pp
-                        _counts += 1
-                    if bidding_budget!=0:
-                        _key = "%s-%s"%(sub_project_name,str(bidding_budget))
+                    _counts += 1
+                    if win_tenderer!="" and  win_bid_price!=0:
+                        _key = "%s-%s-%s"%(sub_project_name,win_tenderer,str(win_bid_price))
                         dict_package[_key] = _pp
-                        _counts += 1
+                        _counts +=1
+                    if _counts==0:
+                        if win_tenderer!="":
+                            _key = "%s-%s"%(sub_project_name,win_tenderer)
+                            dict_package[_key] = _pp
+                            _counts += 1
+                        if bidding_budget!=0:
+                            _key = "%s-%s"%(sub_project_name,str(bidding_budget))
+                            dict_package[_key] = _pp
+                            _counts += 1
+
+
 
 
 
@@ -3539,6 +3575,9 @@ class Dataflow_dumplicate(Dataflow):
             project_info_source,
             project_nlp_enterprise,
             project_nlp_enterprise_attachment,
+            project_tenderee_code,
+            project_agency_code,
+            project_candidates
         ],sort="page_time",table_name="project2",table_index="project2_index")
 
         return list_project_dict
@@ -3754,8 +3793,8 @@ class Dataflow_dumplicate(Dataflow):
                 district = _proj.get(project_district,"")
 
                 if is_yanshou:
-                    page_time_less = timeAdd(page_time,-750)
-                    page_time_greater = timeAdd(page_time,720)
+                    page_time_less = timeAdd(page_time,-850)
+                    page_time_greater = timeAdd(page_time,820)
                 else:
                     page_time_less = timeAdd(page_time,-450)
                     page_time_greater = timeAdd(page_time,420)
@@ -3859,8 +3898,9 @@ class Dataflow_dumplicate(Dataflow):
                         update_projects_by_project(_data,[_proj])
                         projects_update_time += time.time()-_time
 
-            whole_time = time.time()-whole_time_start
-            log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
+                whole_time = time.time()-whole_time_start
+                log("%s %s merge_project whole_time:%.3f projects_prepare_time:%.3f projects_query_time:%.3f projects_merge_count:%d rules%d projects_check_rule_time %.3f projects_update_time %.3f"%(search_table,docids,whole_time,projects_prepare_time,projects_query_time,projects_merge_count,len(list_must_query),projects_check_rule_time,projects_update_time))
+
 
             return list_projects
         except Exception as e:
@@ -3923,6 +3963,27 @@ class Dataflow_dumplicate(Dataflow):
                 list_projects = []
 
             _time = time.time()
+
+            projects = list_projects
+            for _proj in projects:
+                dup_docid = _proj.get(project_dup_docid,"")
+                list_dup_docid = dup_docid.split(",")
+                new_dup_docid = []
+                for _docid in list_dup_docid:
+                    if _docid=="":
+                        continue
+                    docid = int(_docid)
+                    _d = {"partitionkey":docid%500+1,
+                          "docid":docid,
+                          }
+                    _doc = Document(_d)
+
+                    if _doc.fix_columns(self.ots_client,[document_update_document],True):
+                        if _doc.getProperties().get(document_update_document,"")!="true":
+                            new_dup_docid.append(str(docid))
+                _proj[project_dup_docid] = ",".join(new_dup_docid)
+            list_projects = projects
+
             project_json = to_project_json(list_projects)
             # log("json projects takes:%.3f"%(time.time()-_time))
             if b_log:
@@ -3957,6 +4018,11 @@ class Dataflow_dumplicate(Dataflow):
         has_before = False
         has_after = False
 
+        bidclose_time = page_time
+        web_source_name = item.get(document_tmp_web_source_name,"")
+
+
+
         if len(page_time)>0:
             l_page_time = timeAdd(page_time,days=-90)
             dict_time = item.get("dict_time",{})
@@ -3966,6 +4032,14 @@ class Dataflow_dumplicate(Dataflow):
                         has_before = True
                     if v>page_time:
                         has_after = True
+                    if k==document_tmp_time_bidclose:
+                        bidclose_time = v
+
+        set_web_source = {"中国招标投标公共服务平台","比地招标"}
+
+        if web_source_name in set_web_source and bidclose_time<page_time:
+            return False
+
         log("check page_time has_before %s has_after %s"%(str(has_before),str(has_after)))
         if has_before:
             _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
@@ -4024,7 +4098,7 @@ class Dataflow_dumplicate(Dataflow):
                 singleNum_keys = _rule["singleNum_keys"]
                 contain_keys = _rule["contain_keys"]
                 multiNum_keys = _rule["multiNum_keys"]
-                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path],b_log=b_log)
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status,document_province,document_city,document_district,document_doctitle,document_tmp_attachment_path,document_tmp_source_stage,document_tmp_source_type,document_update_document],b_log=b_log)
                 _i += step
 
 
@@ -4049,7 +4123,8 @@ class Dataflow_dumplicate(Dataflow):
 
             dup_docid = set()
             for _dict in final_list:
-                dup_docid.add(_dict.get(document_tmp_docid))
+                if _dict.get("update_document","")!="true":
+                    dup_docid.add(_dict.get(document_tmp_docid))
             if item.get(document_tmp_docid) in dup_docid:
                 dup_docid.remove(item.get(document_tmp_docid))
 
@@ -4057,7 +4132,7 @@ class Dataflow_dumplicate(Dataflow):
             remove_list = []
 
 
-            if self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid)):
+            if (self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
                 dtmp.setValue(document_tmp_save,1,True)
                 # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
                 dmp_docid = ",".join([str(a) for a in list(dup_docid)])
@@ -4071,6 +4146,7 @@ class Dataflow_dumplicate(Dataflow):
                     for _dict in final_list:
                         if _dict.get(document_tmp_docid) in dup_docid:
                             remove_list.append(_dict)
+
                     dmp_docid = ",".join([str(a) for a in list(dup_docid)])
                     dmp_docid = "%d,%s"%(best_docid,dmp_docid)
                 else:
@@ -4082,16 +4158,19 @@ class Dataflow_dumplicate(Dataflow):
             list_docids = list(dup_docid)
             list_docids.append(best_docid)
 
-            if item.get(document_update_document)=="true":
-                dtmp.setValue(document_tmp_save,1,True)
+            # if item.get(document_update_document)=="true":
+            #     dtmp.setValue(document_tmp_save,1,True)
 
             list_merge_dump = []
             if (exist_finterprint and dtmp.getProperties().get(document_tmp_save)==0) or item.get(document_docchannel,0) in (301,302):
-                log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
+                if exist_finterprint:
+                    log("exist_finterprint %s"%(str(item.get(document_tmp_docid))))
                 dtmp.setValue(document_tmp_projects,"[]",True)
             else:
                 project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
-                if list_merge_dump is not None and str(item.get(document_tmp_docid)) in list_merge_dump:
+
+
+                if list_merge_dump is not None and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
                     dtmp.setValue(document_tmp_save,0,True)
                 dtmp.setValue(document_tmp_projects,project_json,True)
             log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
@@ -4145,19 +4224,23 @@ class Dataflow_dumplicate(Dataflow):
 
 
 
+        current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+        before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-20)
+        after_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
         if self.fix_doc_docid is None:
-            current_date = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
-            before_date = timeAdd(current_date,0,format="%Y-%m-%d %H:%M:%S",minutes=-5)
             bool_query = BoolQuery(must_queries=[
                 TermQuery(document_tmp_save,1),
                 RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
-                RangeQuery(document_tmp_opertime,before_date)
+                RangeQuery(document_tmp_docchannel,0,300),
+                RangeQuery(document_tmp_opertime,before_date,after_date)
             ])
         else:
             bool_query = BoolQuery(must_queries=[
                 TermQuery(document_tmp_save,1),
                 RangeQuery(document_tmp_status,flow_dumplicate_status_to[0]),
-                RangeQuery(document_tmp_docid,self.fix_doc_docid)
+                RangeQuery(document_tmp_docchannel,0,300),
+                RangeQuery(document_tmp_docid,self.fix_doc_docid),
+                RangeQuery(document_tmp_opertime,before_date,after_date)
             ])
 
         list_data = []
@@ -4192,7 +4275,7 @@ class Dataflow_dumplicate(Dataflow):
         schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
         schedule.add_job(self.flow_remove,"cron",hour="20")
         schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
-        # schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
+        schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="*/10")
         schedule.start()
 
     def changeSaveStatus(self,list_dict):
@@ -4213,16 +4296,17 @@ class Dataflow_dumplicate(Dataflow):
                           document_tmp_save:0
                           }
                     _d_tmp = Document_tmp(_d)
-                    if _d_tmp.fix_columns(self.ots_client,["status"],True):
+                    if _d_tmp.fix_columns(self.ots_client,["status",document_update_document],True):
                         if _d_tmp.getProperties().get("status")==1:
-                            _d_tmp.setValue("status",0,True)
-                            _d_tmp.update_row(self.ots_client)
+                            if _d_tmp.getProperties().get(document_update_document,"")!="true":
+                                _d_tmp.setValue("status",0,True)
+                                _d_tmp.update_row(self.ots_client)
 
 
 
     def test_dumplicate(self,docid):
         # columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_tmp_web_source_no,document_tmp_fingerprint,document_attachment_extract_status]
-        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_name]
+        columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document,document_province,document_city,document_district,document_tmp_attachment_path,document_tmp_web_source_no,document_tmp_web_source_name,document_tmp_source_stage,document_tmp_source_type]
         bool_query = BoolQuery(must_queries=[
             TermQuery("docid",docid)
         ])
@@ -4413,7 +4497,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(497234586
+    df_dump.test_dumplicate(562889387
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 113 - 48
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -453,7 +453,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     if len(_html)>1:
                         _html = "interface return error"
                     else:
-                        sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
+                        # sentMsgToDD("attach interface failed of docid:%s of filemd5:%s of type:%s size:%.3fM with result:%s"%(str(docids),filemd5,_filetype,round(_size/1024/1024,4),str(_html)))
                         _html = ""
 
                         return False
@@ -1449,6 +1449,7 @@ class Dataflow_init(Dataflow):
                             ots_dict = _data.getProperties_ots()
                             if ots_dict["docid"]<self.base_shenpi_id:
                                 ots_dict["docid"] += self.base_shenpi_id
+                                ots_dict["partitionkey"] = ots_dict["docid"]%500+1
 
                             if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
                                 if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
@@ -1469,7 +1470,87 @@ class Dataflow_init(Dataflow):
             traceback.print_exc()
             self.pool_oracle.decrease()
 
+    def fix_shenpi(self):
+
+        pool_oracle = ConnectorPool(10,15,getConnection_oracle)
+        begin_id = 0
+        end_id = 64790010
+        thread_num = 15
+        step = (end_id-begin_id)//thread_num
+        list_items = []
+        for _i in range(thread_num):
+            _begin = _i*step
+            _end = (_i+1)*step-1
+            if _i==thread_num-1:
+                _end = end_id
+            list_items.append((_begin,_end,_i))
+        task_queue = Queue()
+        for item in list_items:
+            task_queue.put(item)
+
+        fix_count_list = []
+
+        def _handle(item,result_queue):
+            conn_oracle = pool_oracle.getConnector()
+            (begin_id,end_id,thread_id) = item
+
+            _count = 0
+            for _id_i in range(begin_id,end_id):
+                try:
+                    bool_query = BoolQuery(must_queries=[
+                        TermQuery("docchannel",302),
+                        TermQuery("original_id",_id_i)
+                    ])
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                        SearchQuery(bool_query,get_total_count=True))
+                    if total_count>0:
+                        continue
+
+                    # bool_query = BoolQuery(must_queries=[
+                    #     TermQuery("id",_id_i),
+                    # ])
+                    # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
+                    #                                                                     SearchQuery(bool_query,get_total_count=True))
+                    # if total_count>0:
+                    #     continue
+
+                    try:
+                        list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
+                    except Exception as e:
+                        continue
+
+                    # send data to mq one by one with max_shenpi_id updated
+                    for _data in list_data:
+
+                        _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
+
+                        ots_dict = _data.getProperties_ots()
+                        if ots_dict["docid"]<self.base_shenpi_id:
+                            ots_dict["docid"] += self.base_shenpi_id
+                            ots_dict["partitionkey"] = ots_dict["docid"]%500+1
+                        ots_dict["status"] = 201
+                        dict_1 = {}
+                        dict_2 = {}
+                        for k,v in ots_dict.items():
+                            if k!="dochtmlcon":
+                                dict_1[k] = v
+                            if k in ('partitionkey',"docid","dochtmlcon"):
+                                dict_2[k] = v
+                        d_1 = Document(dict_1)
+                        d_2 = Document(dict_2)
+                        d_1.update_row(self.ots_client)
+                        d_2.update_row(self.ots_capacity)
+                        _count += 1
+                except Exception as e:
+                    traceback.print_exc()
+
+                log("thread_id:%d=%d/%d/%d"%(thread_id,_id_i-begin_id,_count,end_id-begin_id))
+            fix_count_list.append(_count)
+            pool_oracle.putConnector(conn_oracle)
 
+        mt = MultiThreadHandler(task_queue,_handle,None,thread_count=thread_num)
+        mt.run()
+        print(fix_count_list,sum(fix_count_list))
 
     def ots2mq(self):
         try:
@@ -1477,13 +1558,34 @@ class Dataflow_init(Dataflow):
 
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
                                                                                 SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
-                                                                                ColumnsToGet(return_type=ColumnReturnType.ALL))
+                                                                                ColumnsToGet(return_type=ColumnReturnType.NONE))
             list_data = getRow_ots(rows)
+            task_queue = Queue()
             for _data in list_data:
+                task_queue.put(_data)
+
+
+            while next_token:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                    SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                                    ColumnsToGet(return_type=ColumnReturnType.NONE))
+                list_data = getRow_ots(rows)
+
+                for _data in list_data:
+                    task_queue.put(_data)
+
+                if task_queue.qsize()>=1000:
+                    break
+
+            def _handle(_data,result_queue):
+
                 _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
                       document_tmp_docid:_data.get(document_tmp_docid),
                       document_tmp_status:0}
                 _document = Document(_d)
+                _document.fix_columns(self.ots_client,None,True)
+                _data = _document.getProperties()
+
                 page_attachments = _data.get(document_tmp_attachment_path,"[]")
 
                 _document_html = Document(_data)
@@ -1498,36 +1600,16 @@ class Dataflow_init(Dataflow):
                     _data[document_tmp_status] = status
                     send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
                 if send_succeed:
+                    _document.setValue(document_tmp_status,0,True)
                     _document.update_row(self.ots_client)
                 else:
                     log("send_msg_error2222")
-            while next_token:
-                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
-                                                                                    SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
-                                                                                    ColumnsToGet(return_type=ColumnReturnType.ALL))
-                list_data = getRow_ots(rows)
-                for _data in list_data:
-                    _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
-                          document_tmp_docid:_data.get(document_tmp_docid),
-                          document_tmp_status:0}
-                    _document = Document(_d)
-                    page_attachments = _data.get(document_tmp_attachment_path,"[]")
 
-                    _document_html = Document(_data)
-                    _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
+            if task_queue.qsize()>0:
+                mt = MultiThreadHandler(task_queue,_handle,None,15)
+                mt.run()
+
 
-                    if page_attachments!="[]":
-                        status = random.randint(1,10)
-                        _data[document_tmp_status] = status
-                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
-                    else:
-                        status = random.randint(11,50)
-                        _data[document_tmp_status] = status
-                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
-                    if send_succeed:
-                        _document.update_row(self.ots_client)
-                    else:
-                        log("send_msg_error2222")
         except Exception as e:
             traceback.print_exc()
 
@@ -1546,6 +1628,8 @@ class Dataflow_init(Dataflow):
                 _document = Document_tmp(_d)
                 page_attachments = _data.get(document_tmp_attachment_path,"[]")
 
+                log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid))))
+
                 _document_html = Document_html(_data)
                 _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
 
@@ -1675,6 +1759,7 @@ class Dataflow_init(Dataflow):
 
 
 
+
 def transform_attachment():
     from BaseDataMaintenance.model.ots.attachment import attachment
     from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres
@@ -1863,27 +1948,7 @@ def check_data_synchronization():
 
 current_path = os.path.abspath(os.path.dirname(__file__))
 
-def fixDoc_to_queue_init(filename=""):
-    import pandas as pd
-    from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_oracle2ots
-    if filename=="":
-        filename = os.path.join(current_path,"check.xlsx")
-    df = pd.read_excel(filename)
-    if "docchannel" in dict_oracle2ots:
-        dict_oracle2ots.pop("docchannel")
-    row_name = ",".join(list(dict_oracle2ots.keys()))
-    conn = getConnection_oracle()
-    cursor = conn.cursor()
-    _count = 0
-    for uuid,tablename,_exists,_toolong in zip(df["uuid"],df["tablename"],df["exists"],df["tolong"]):
-        if _exists==0 and _toolong==0:
-            _count += 1
-            _source = str(tablename).replace("_TEMP","")
-            sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
-            cursor.execute(sql)
-            log("%d:%s"%(_count,sql))
-    conn.commit()
-    conn.close()
+
 
 if __name__ == '__main__':
     # di = Dataflow_init()

+ 213 - 10
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -777,25 +777,33 @@ def getSimLevel(str1,str2):
     return _v
 
 def getLength(_str):
-    return len(_str if _str is not None else "")
+    return len(str(_str) if _str is not None else "")
 
 def check_money(bidding_budget_less,bidding_budget_greater,
                 win_bid_price_less,win_bid_price_greater,
                 moneys_less,moneys_greater,
                 moneys_attachment_less,moneys_attachment_greater):
 
+    bidding_budget_less_source = bidding_budget_less
+    bidding_budget_greater_source = bidding_budget_greater
+    win_bid_price_less_source = win_bid_price_less
+    win_bid_price_greater_source = win_bid_price_greater
     #只判断最高前六位
     if getLength(bidding_budget_less)>0:
+        bidding_budget_less_source = float(bidding_budget_less_source)
         bidding_budget_less = round(float(bidding_budget_less))
         bidding_budget_less = str(round(bidding_budget_less,6-len(str(bidding_budget_less))))
     if getLength(bidding_budget_greater)>0:
+        bidding_budget_greater_source = float(bidding_budget_greater_source)
         bidding_budget_greater = round(float(bidding_budget_greater))
         bidding_budget_greater = str(round(bidding_budget_greater,6-len(str(bidding_budget_greater))))
 
     if getLength(win_bid_price_less)>0:
+        win_bid_price_less_source = float(win_bid_price_less_source)
         win_bid_price_less = round(float(win_bid_price_less))
         win_bid_price_less = str(round(win_bid_price_less,6-len(str(win_bid_price_less))))
     if getLength(win_bid_price_greater)>0:
+        win_bid_price_greater_source = float(win_bid_price_greater_source)
         win_bid_price_greater = round(float(win_bid_price_greater))
         win_bid_price_greater = str(round(win_bid_price_greater,6-len(str(win_bid_price_greater))))
 
@@ -816,14 +824,21 @@ def check_money(bidding_budget_less,bidding_budget_greater,
                 budget_is_same = True
             if budget_less in moneys_greater or budget_less in moneys_attachment_greater:
                 budget_is_same = True
+            if bidding_budget_less_source in moneys_greater or bidding_budget_less_source in moneys_attachment_greater:
+                budget_is_same = True
             if budget_greater in moneys_less or budget_greater in moneys_attachment_less:
                 budget_is_same = True
+            if bidding_budget_greater_source in moneys_less or bidding_budget_greater_source in moneys_attachment_less:
+                budget_is_same = True
             if budget_is_same=="":
                 return False
 
     if getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
+
+
         price_less = float(win_bid_price_less)
         price_greater = float(win_bid_price_greater)
+
         if price_less!=price_greater:
 
             if min(price_less,price_greater)>0:
@@ -833,8 +848,12 @@ def check_money(bidding_budget_less,bidding_budget_greater,
                 price_is_same = True
             if price_less in moneys_greater or price_less in moneys_attachment_greater:
                 price_is_same = True
+            if win_bid_price_less_source in moneys_greater or win_bid_price_less_source in moneys_attachment_greater:
+                price_is_same = True
             if price_greater in moneys_less or price_greater in moneys_attachment_less:
                 price_is_same = True
+            if win_bid_price_greater_source in moneys_less or win_bid_price_greater_source in moneys_attachment_less:
+                price_is_same = True
             if price_is_same=="":
                 return False
     return True
@@ -868,6 +887,85 @@ def check_entity(nlp_enterprise_less,nlp_enterprise_greater,
         return False
     return True
 
+
+def check_punish(punish_less,punish_greater):
+    same_count = 0
+    not_same_count = 0
+    _flag = True
+    keys = list(set(list(punish_less.keys())) | set(list(punish_greater.keys())))
+    for k in keys:
+        v1 = punish_less.get(k)
+        v2 = punish_greater.get(k)
+        if getLength(v1)>0 and getLength(v2)>0:
+            if k=="punish_code":
+                if not check_codes([v1],[v2]):
+                    not_same_count += 1
+                    _flag = False
+                else:
+                    same_count += 1
+            if k=="punishDecision":
+                if getSimilarityOfString(v1,v2)>0.8:
+                    same_count += 1
+            if k in ("complainants","punishPeople","institutions"):
+                if v1==v2:
+                    same_count += 1
+                else:
+                    not_same_count == 1
+                    _flag = False
+    return _flag,same_count,not_same_count
+
+def check_source_type(source_type_less,source_type_greater):
+    if getLength(source_type_less)>0 and getLength(source_type_greater)>0:
+        if source_type_less!=source_type_greater:
+            return False
+    return True
+def check_approval(approval_less,approval_greater,b_log):
+
+    if b_log:
+        logging.info("approval_less %s==approval_greater %s"%(approval_less,approval_greater))
+    for _less in approval_less:
+        for _greater in approval_greater:
+            same_count = 0
+            not_same_count = 0
+            flag = True
+            keys = ["source_stage","source_type","doc_num","project_code","project_name","approval_items","approval_result","approver","construct_company","construction_scale","declare_company","evaluation_agency","legal_person","compilation_unit","time_approval"]
+            for k in keys:
+                v1 = _less.get(k)
+                v2 = _greater.get(k)
+                if getLength(v1)>0 and getLength(v2)>0:
+                    if k in ("source_stage","source_type"):
+                        if v1!=v2:
+                            flag = False
+
+                    if k in ("project_code","doc_num"):
+                        if check_codes([v1],[v2]):
+                            same_count += 1
+                        else:
+                            not_same_count -= 1
+                            if b_log:
+                                logging.info("check approval %s false %s-%s"%(k,v1,v2))
+                            flag = False
+                    if k in ("approval_items","approval_result","project_name"):
+                        if getSimilarityOfString(v1,v2)>0.8:
+                            same_count += 1
+                        else:
+                            not_same_count -= 1
+                    if k in ("approver","construct_company","declare_company","evaluation_agency","legal_person","compilation_unit"):
+                        if v1==v2:
+                            same_count += 1
+                        else:
+                            not_same_count -= 1
+                            if b_log:
+                                logging.info("check approval %s false %s-%s"%(k,v1,v2))
+                            flag = False
+            if flag and same_count>1:
+                return flag,same_count,not_same_count
+    flag = True
+    if len(approval_less)>0 and len(approval_greater)>0:
+        flag = False
+    return flag,0,0
+
+
 def check_codes(project_codes_less,project_codes_greater):
     #check the similarity
     is_same = False
@@ -876,6 +974,8 @@ def check_codes(project_codes_less,project_codes_greater):
 
     for project_code_less in project_codes_less:
         for project_code_greater in project_codes_greater:
+            project_code_less = str(project_code_less).upper()
+            project_code_greater = str(project_code_greater).upper()
             code_sim = getSimilarityOfString(project_code_less,project_code_greater)
             if project_code_less is not None and project_code_greater is not None:
                 if code_sim>0.6:
@@ -901,6 +1001,7 @@ num_pattern = re.compile("^\d+(?:\.\d+)?$")
 num1_pattern = re.compile("[一二三四五六七八九A-Za-z]+")
 location_pattern = re.compile("[^\[【\(]{1,2}[市区镇县村路]")
 building_pattern = "工程招标代理|工程设计|暂停|继续|工程造价咨询|施工图设计文件审查|咨询|环评|设计|施工监理|施工|监理|EPC|epc|总承包|水土保持|选址论证|勘界|勘察|预算编制|预算审核|结算审计|招标代理|设备类|第?[\((]?[一二三四五六七八九1-9][)\)]?[次批]"
+rebid_pattern = "再次|重新招标|[一二三四五六七八九十]+次"
 date_pattern = re.compile("\d{2,4}[\-\./年]\d{1,2}[\-\./月]\d{1,2}")
 def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[], code_greater=[]):
     if code_greater is None:
@@ -962,7 +1063,7 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
                 return False
 
     #check location and keywords
-    for _p in [num1_pattern,building_pattern]:
+    for _p in [num1_pattern,building_pattern,rebid_pattern]:
         num_all_l = re.findall(_p,doctitle_refind_less)
         num_all_g = re.findall(_p,doctitle_refind_greater)
         set_num_l = set(num_all_l)
@@ -996,19 +1097,70 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
                     return False
     return True
 
+
+def product_dump(list_product):
+    _product_l_l = []
+    list_product.sort(key=lambda x:len(x))
+    for _l in list_product:
+        _exists = False
+        for l1 in _product_l_l:
+            if l1 in _l:
+                _exists = True
+                break
+        if not _exists:
+            _product_l_l.append(_l)
+    return _product_l_l
 def check_product(product_less,product_greater,split_char=",",doctitle_refine_less='',doctitle_refine_greater=''):
     if getLength(product_less)>0 and getLength(product_greater)>0:
 
         _product_l = product_less.split(split_char)
+        _product_l = product_dump(_product_l)
         _product_g = product_greater.split(split_char)
+        _product_g = product_dump(_product_g)
+        _title_l = doctitle_refine_less
+        _title_g = doctitle_refine_greater
         same_count = 0
         if len(_product_l)>len(_product_g):
             a = _product_g
             _product_g = _product_l
             _product_l = a
+            _title_l = doctitle_refine_greater
+            _title_g = doctitle_refine_less
+        set_product_l_in_title = set()
+        set_product_g_in_title = set()
+        for _l in _product_l:
+            if _title_l.find(_l)>=0:
+                set_product_l_in_title.add(_l)
+        for _g in _product_g:
+            if _title_g.find(_g)>=0:
+                set_product_g_in_title.add(_g)
+        # 限制标题出现的产品要有重叠
+        if len(set_product_l_in_title)>0 and len(set_product_g_in_title)>0:
+
+            
+            _set_union = set_product_l_in_title & set_product_g_in_title
+
+            # 不同的部门若有重叠则通过
+            diff_l = set_product_l_in_title-_set_union
+            diff_g = set_product_g_in_title-_set_union
+
+            diff_dump = product_dump(list(diff_l.union(diff_g)))
+            if not(len(diff_dump)<=len(diff_l) or len(diff_dump)<=len(diff_g)):
+                return False
+
+            # 过于严格,暂时取消
+            # if len(_set_union)==0:
+            #     return False
+            # if len(_set_union)!=len(set_product_l_in_title) and len(_set_union)!=len(set_product_g_in_title):
+            #     _l1 = list(set_product_l_in_title)
+            #     _l2 = list(set_product_g_in_title)
+            #     _l1.extend(_l2)
+            #     _l1 = product_dump(_l1)
+            #     if len(_l1)!=len(_set_union):
+            #         return False
         for _l in _product_l:
             for _g in _product_g:
-                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>-0 or doctitle_refine_less.find(_g)>=0:
+                if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0 or doctitle_refine_less.find(_g)>=0:
                     same_count += 1
                     break
         if same_count/len(_product_l)>=0.5:
@@ -1021,12 +1173,15 @@ def check_package(package_less,package_greater,split_char=","):
 
         _product_l = package_less.split(split_char)
         _product_g = package_greater.split(split_char)
+        same_level = False
         for _l in _product_l:
             for _g in _product_g:
+                if abs(len(_l)-len(_g))<=2:
+                    save_level = True
                 if _l==_g:
                     return True
-
-        return False
+        if same_level:
+            return False
     return True
 
 def check_time(json_time_less,json_time_greater):
@@ -1057,7 +1212,7 @@ def check_time(json_time_less,json_time_greater):
         return 0
     return 1
 
-def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater="",moneys_less=set(),moneys_greater=set(),moneys_attachment_less=set(),moneys_attachment_greater=set(),page_attachments_less="[]",page_attachments_greater="[]"):
+def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,hard_level=1,web_source_no_less="",web_source_no_greater="",moneys_less=set(),moneys_greater=set(),moneys_attachment_less=set(),moneys_attachment_greater=set(),page_attachments_less="[]",page_attachments_greater="[]",punish_less = {},punish_greater = {},approval_less = [],approval_greater = [],source_type_less = None,source_type_greater=None):
     if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
         return 1
 
@@ -1101,6 +1256,11 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
             if check_product(product_less,product_greater,doctitle_refine_less=doctitle_refine_less,doctitle_refine_greater=doctitle_refine_greater):
                 return 1
 
+    #同一个站源,都有附件但附件没有重叠则不去重
+    if web_source_no_less==web_source_no_greater and len(set_md5_less)>0 and len(set_md5_greater)>0 and len(set_md5_less&set_md5_greater)==0:
+        if b_log:
+            logging.info("same web_site,both has attach but not same web_source_no_less:%s,web_source_no_greater:%s"%(web_source_no_less,web_source_no_greater))
+        return 0
 
     if isinstance(project_codes_less,str):
         project_codes_less = [a for a in project_codes_less.split(",") if a!=""]
@@ -1131,6 +1291,33 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
         same_count += 1
     if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
         same_count += 1
+
+    _flag,_c1,_c2 = check_punish(punish_less,punish_greater)
+    if not _flag:
+        if b_log:
+            logging.info("check_punish failed")
+        return 0
+    else:
+        if b_log:
+            logging.info("check_punish true %d"%(_c1))
+        same_count += _c1
+
+    _flag,_c1,_c2 = check_approval(approval_less,approval_greater,b_log)
+    if not _flag:
+        if b_log:
+            logging.info("check approval failed")
+        return 0
+    else:
+        if b_log:
+            logging.info("check approval true %d"%(_c1))
+        same_count += _c1
+
+    _flag = check_source_type(source_type_less,source_type_greater)
+    if not _flag:
+        if b_log:
+            logging.info("check source type failed")
+        return 0
+
     base_prob = 0
     if min_counts<3:
         base_prob = 0.9
@@ -1145,8 +1332,12 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
         if _prob<0.1:
             _prob = 0.15
         if getLength(province_less)>0 and getLength(province_greater)>0 and province_less not in ("全国","未知") and province_greater not in ("全国","未知") and province_less!=province_greater:
+            if b_log:
+                logging.info("province not same:%s-%s"%(province_less,province_greater))
             return 0
     if _prob<0.1:
+        if b_log:
+            logging.info("prob too low:%f"%(_prob))
         return _prob
 
     check_result = {"pass":1}
@@ -1208,8 +1399,7 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
         else:
             check_result["entity"] = 1
 
-    logging.info("moneys_less"+str(moneys_less)+"---"+str(moneys_attachment_less))
-    logging.info("moneys_less"+str(moneys_greater)+"---"+str(moneys_attachment_greater))
+
     if not check_money(bidding_budget_less,bidding_budget_greater,
                        win_bid_price_less,win_bid_price_greater,
                        moneys_less,moneys_greater,
@@ -1267,6 +1457,8 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
             check_result["time"] = 1
 
     if hard_level==2 and check_result["product"]<=1:
+        if b_log:
+            logging.inf("hard_level %s and check_product less than 2"%(str(hard_level)))
         return 0
     if check_result.get("pass",0)==0:
         if b_log:
@@ -1507,7 +1699,11 @@ class f_dumplicate_check(BaseUDTF):
             page_attachments_less = '[]'
         if page_attachments_greater is None:
             page_attachments_greater = '[]'
-        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
+        punish_less = _extract_less.get("punish",{})
+        punish_greater = _extract_greater.get("punish",{})
+        approval_less = _extract_less.get("approval",[])
+        approval_greater = _extract_greater.get("approval",[])
+        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater,punish_less = punish_less,punish_greater = punish_greater,approval_less = approval_less,approval_greater = approval_greater)
         self.forward(_prob)
 
 @annotate("string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string,double")
@@ -1687,6 +1883,8 @@ class f_redump_probability_final_check(BaseUDAF):
                 web_source_no_greater = document_greater["web_source_no"]
                 extract_json_greater = document_greater["extract_json"]
                 page_attachments_greater = document_greater["page_attachments"]
+
+
                 _pass = True
 
                 for document_less in final_group:
@@ -1731,7 +1929,12 @@ class f_redump_probability_final_check(BaseUDAF):
                     if page_attachments_greater is None:
                         page_attachments_greater = '[]'
 
-                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,len(the_group),b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
+                    punish_less = _extract_less.get("punish",{})
+                    punish_greater = _extract_greater.get("punish",{})
+                    approval_less = _extract_less.get("approval",[])
+                    approval_greater = _extract_greater.get("approval",[])
+
+                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,len(the_group),b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater,punish_less = punish_less,punish_greater = punish_greater,approval_less = approval_less,approval_greater = approval_greater)
 
                     if _prob<0.1:
                         _pass = False

+ 207 - 24
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -87,6 +87,15 @@ project_nlp_enterprise = "nlp_enterprise"
 project_nlp_enterprise_attachment = "nlp_enterprise_attachment"
 project_update_time = "update_time"
 project_tmp_attrs = "tmp_attrs"
+project_tenderee_code = "tenderee_code"
+project_agency_code = "agency_code"
+project_candidates = "candidates"
+
+project_win_tenderer_code = "win_tenderer_code"
+project_second_tenderer_code = "second_tenderer_code"
+project_third_tenderer_code = "third_tenderer_code"
+project_win_tenderer_joints = "win_tenderer_joints"
+project_multi_winners = "multi_winners"
 
 document_partitionkey = "partitionkey"
 document_docid = "docid"
@@ -148,6 +157,9 @@ document_time_release = "time_release"
 document_info_source = "info_source"
 document_nlp_enterprise = "nlp_enterprise"
 document_nlp_enterprise_attachment = "nlp_enterprise_attachment"
+document_tenderee_code = "tenderee_code"
+document_agency_code = "agency_code"
+document_candidates = "candidates"
 
 document_tmp_partitionkey = "partitionkey"
 document_tmp_docid = "docid"
@@ -183,6 +195,9 @@ document_tmp_opertime = "opertime"
 document_tmp_docchannel = "docchannel"
 document_tmp_original_docchannel = "original_docchannel"
 
+document_tmp_source_stage = "source_stage"
+document_tmp_source_type = "source_type"
+
 document_tmp_extract_json = "extract_json"
 document_tmp_industry_json = "industry_json"
 document_tmp_other_json = "other_json"
@@ -1516,7 +1531,7 @@ def generate_common_properties(list_docs):
     #计数法选择
     choose_dict = {}
     project_dict = {}
-    for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count]:
+    for _key in [document_bidway,document_industry,document_info_type,document_info_source,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_tenderee_code,document_agency_code]:
         for _doc in list_docs:
             _value = _doc.get(_key,"")
             if _value!="":
@@ -1616,6 +1631,9 @@ def generate_common_properties(list_docs):
     remove_docids = set()
     set_nlp_enterprise = set()
     set_nlp_enterprise_attachment = set()
+
+    set_candidates = set()
+    list_candidates = []
     for _doc in list_docs:
         table_name = _doc.get("table_name")
         status = _doc.get(document_status,0)
@@ -1632,13 +1650,30 @@ def generate_common_properties(list_docs):
 
         is_multipack = True if len(sub_docs)>1 else False
         extract_count = _doc.get(document_tmp_extract_count,0)
+        candidates = _doc.get(document_candidates,"[]")
+
+        _province = _doc.get(document_province,"")
+        _city = _doc.get(document_city,"")
+        _district = _doc.get(document_district,"")
+
+        tenderee = _doc.get(document_tenderee,"")
+        agency = _doc.get(document_agency,"")
+
 
         try:
             set_nlp_enterprise |= set(json.loads(_doc.get(document_nlp_enterprise,"[]")))
             set_nlp_enterprise_attachment |= set(json.loads(_doc.get(document_nlp_enterprise_attachment,"[]")))
+
+            for item in json.loads(candidates):
+                if item.get("name") is not None and item.get("name") not in set_candidates:
+                    list_candidates.append(item)
+                    set_candidates.add(item.get("name"))
+
         except Exception as e:
             traceback.print_exc()
 
+
+
         if product is not None:
             list_product.extend(product.split(","))
 
@@ -1651,7 +1686,7 @@ def generate_common_properties(list_docs):
 
         if zhao_biao_page_time=="" and _docchannel in (51,52,102,103,114):
             zhao_biao_page_time = page_time
-        if zhong_biao_page_time=="" and _docchannel in (101,118,119,120):
+        if zhong_biao_page_time=="" and _docchannel in (101,118,119,120,121,122):
             zhong_biao_page_time = page_time
         is_visuable = 0
         if table_name=="document":
@@ -1675,7 +1710,12 @@ def generate_common_properties(list_docs):
                               document_page_time:page_time,
                               document_status:201 if is_visuable==1 else 401,
                               "is_multipack":is_multipack,
-                              document_tmp_extract_count:extract_count
+                              document_tmp_extract_count:extract_count,
+                              document_tenderee:tenderee,
+                              document_agency:agency,
+                              document_province:_province,
+                              document_city:_city,
+                              document_district:_district
                               }
                              )
 
@@ -1691,6 +1731,7 @@ def generate_common_properties(list_docs):
     project_dict[project_product] = ",".join(list(set(list_product)))
     project_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
     project_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
+    project_dict[project_candidates] = json.dumps(list_candidates[:100],ensure_ascii=False)
 
     return project_dict
 
@@ -1938,7 +1979,7 @@ class f_generate_projects_from_document(BaseUDTF):
                 _product = list_product[_i%len(list_product)]
                 self.forward(_uuid,page_time,page_time_stamp,docids,project_name,_project_code,tenderee,agency,bidding_budget,win_tenderer,win_bid_price,_product,attrs_json)
 
-@annotate('string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,double,string,double,string,string,string,double,string,string,string,double,string,string,string,string,string,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string -> string,string,bigint,string,string,string,string,string,double,string,double,string,string')
+@annotate('string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,double,string,double,string,string,string,double,string,string,string,double,string,string,string,string,string,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string -> string,string,bigint,string,string,string,string,string,double,string,double,string,string')
 class f_generate_projects_from_project(BaseUDTF):
 
     def __init__(self):
@@ -2013,7 +2054,16 @@ class f_generate_projects_from_project(BaseUDTF):
                 info_source,
                 nlp_enterprise,
                 nlp_enterprise_attachment,
-                update_time):
+                update_time,
+                tenderee_code,
+                agency_code,
+                candidates,
+                win_tenderer_code,
+                second_tenderer_code,
+                third_tenderer_code,
+                win_tenderer_joints,
+                multi_winners
+                ):
         attrs_dict = {}
 
         attrs_dict[project_uuid] = uuid
@@ -2082,9 +2132,18 @@ class f_generate_projects_from_project(BaseUDTF):
         attrs_dict[project_nlp_enterprise_attachment] = nlp_enterprise_attachment
         attrs_dict[project_update_time] = update_time
 
+        attrs_dict[project_tenderee_code] = tenderee_code
+        attrs_dict[project_agency_code] = agency_code
+        attrs_dict[project_candidates] = candidates
+        attrs_dict[project_win_tenderer_code] = win_tenderer_code
+        attrs_dict[project_second_tenderer_code] = second_tenderer_code
+        attrs_dict[project_third_tenderer_code] = third_tenderer_code
+        attrs_dict[project_win_tenderer_joints] = win_tenderer_joints
+        attrs_dict[project_multi_winners] = multi_winners
 
         popNoneFromDict(attrs_dict)
 
+
         attrs_json = json.dumps(attrs_dict,ensure_ascii=False)
         if bidding_budget is None:
             bidding_budget = -1
@@ -2130,7 +2189,7 @@ def dumplicate_projects(list_projects,b_log=False):
     appendKeyvalueCount(list_projects)
     list_projects.sort(key=lambda x:str(x.get(project_page_time,"")))
     list_projects.sort(key=lambda x:x.get("keyvaluecount",0),reverse=True)
-    cluster_projects = list_projects[:10]
+    cluster_projects = list_projects[:100]
     _count = 10
     log("dumplicate projects rest %d"%len(cluster_projects))
     while _count>0:
@@ -2171,7 +2230,7 @@ def update_projects_by_project(project_dict,projects):
     _dict = {}
     #更新公共属性
     for k,v in project_dict.items():
-        if k in (project_project_dynamics,project_page_time,project_sub_project_name,project_product,project_project_codes,project_docids,project_uuid,project_nlp_enterprise,project_nlp_enterprise_attachment):
+        if k in (project_project_dynamics,project_page_time,project_sub_project_name,project_product,project_project_codes,project_docids,project_uuid,project_nlp_enterprise,project_nlp_enterprise_attachment,project_candidates):
             continue
         for _proj in projects:
             if k not in _proj:
@@ -2205,6 +2264,17 @@ def update_projects_by_project(project_dict,projects):
     set_nlp_enterprise = set()
     set_nlp_enterprise_attachment = set()
     set_update_uuid = set()
+
+    set_candidates = set()
+
+
+    try:
+        set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
+        set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
+        list_candidates = json.loads(project_dict.get(project_candidates,"[]"))
+    except Exception as e:
+        pass
+
     for _proj in projects:
         _docids = _proj.get(project_docids,"")
         _codes = _proj.get(project_project_codes,"")
@@ -2221,6 +2291,12 @@ def update_projects_by_project(project_dict,projects):
         try:
             set_nlp_enterprise |= set(json.loads(_proj.get(project_nlp_enterprise,"[]")))
             set_nlp_enterprise_attachment |= set(json.loads(_proj.get(project_nlp_enterprise_attachment,"[]")))
+
+            for item in json.loads(_proj.get(project_candidates,"[]")):
+                if item.get("name") is not None and item.get("name") not in set_candidates:
+                    list_candidates.append(item)
+                    set_candidates.add(item.get("name"))
+
         except Exception as e:
             pass
     set_docid = set_docid | set(project_dict.get(project_docids,"").split(","))
@@ -2231,11 +2307,7 @@ def update_projects_by_project(project_dict,projects):
     set_delete_uuid = set_delete_uuid | set(project_dict.get(project_delete_uuid,"").split(","))
     set_update_uuid = set_update_uuid | set(project_dict.get("project_uuid","").split(","))
 
-    try:
-        set_nlp_enterprise |= set(json.loads(project_dict.get(project_nlp_enterprise,"[]")))
-        set_nlp_enterprise_attachment |= set(json.loads(project_dict.get(project_nlp_enterprise_attachment,"[]")))
-    except Exception as e:
-        pass
+
 
     append_dict[project_docids] = ",".join([a for a in list(set_docid) if a!=""])
     append_dict[project_docid_number] = len(set_docid)
@@ -2246,6 +2318,7 @@ def update_projects_by_project(project_dict,projects):
     append_dict["update_uuid"] = ",".join([a for a in list(set_update_uuid) if a!=""])
     append_dict[project_nlp_enterprise] = json.dumps(list(set_nlp_enterprise)[:100],ensure_ascii=False)
     append_dict[project_nlp_enterprise_attachment] = json.dumps(list(set_nlp_enterprise_attachment)[:100],ensure_ascii=False)
+    append_dict[project_candidates] = json.dumps(list_candidates,ensure_ascii=False)
 
     dict_dynamic = {}
     set_docid = set()
@@ -2684,21 +2757,20 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*300,return_prob=Fa
 
     prob_count += _codes_check
 
-    if is_few:
-        if _codes_check!=1:
-            if _title_check!=1:
-                if return_prob:
-                    return False,0
-                return False
-            if len(enterprise)>0 and len(enterprise_to_merge)>0:
-                if len(enterprise & enterprise_to_merge)==0:
-                    if return_prob:
-                        return False,0
-                    return False
-            if _product_check==-1:
+    if _codes_check!=1:
+        if _title_check!=1:
+            if return_prob:
+                return False,0
+            return False
+        if len(enterprise)>0 and len(enterprise_to_merge)>0:
+            if len(enterprise & enterprise_to_merge)==0:
                 if return_prob:
                     return False,0
                 return False
+        if _product_check==-1:
+            if return_prob:
+                return False,0
+            return False
 
     min_count = 2
     if product=="" or product_to_merge=="":
@@ -2929,6 +3001,105 @@ class MyEncoder(json.JSONEncoder):
             return obj
         return json.JSONEncoder.default(self, obj)
 
+def update_document_from_dynamic(_proj):
+    try:
+        list_dynamic = []
+        try:
+            list_dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
+        except Exception as e:
+            pass
+
+        dict_update_dict = {}
+        dict_column_count = {}
+        dict_addr_count = {}
+        for _dynamic in list_dynamic:
+            docid = _dynamic.get(document_docid)
+            tenderee = _dynamic.get(document_tenderee)
+            agency = _dynamic.get(document_agency)
+            province = _dynamic.get(document_province)
+            city = _dynamic.get(document_city)
+            district = _dynamic.get(document_district)
+
+
+            if getLength(tenderee)>0:
+                if tenderee not in dict_column_count:
+                    dict_column_count[tenderee] = {"count":1,"type":document_tenderee,"value":tenderee}
+                else:
+                    dict_column_count[tenderee]["count"] += 1
+            if getLength(agency)>0:
+                if agency not in dict_column_count:
+                        dict_column_count[agency] = {"count":1,"type":document_agency,"value":agency}
+                else:
+                    dict_column_count[agency]["count"] += 1
+
+            if province is not None and city is not None and district is not None:
+                addr = "%s%s%s"%(province,city,district)
+                if addr not in dict_addr_count:
+                    dict_addr_count[addr] = {"count":1}
+                    dict_addr_count[addr][document_province] = province
+                    dict_addr_count[addr][document_city] = city
+                    dict_addr_count[addr][document_district] = district
+                    if district!="":
+                        dict_addr_count[addr]["level"] = 3
+                    elif city!="":
+                        dict_addr_count[addr]["level"] = 2
+                    else:
+                        dict_addr_count[addr]["level"] = 1
+                else:
+                    dict_addr_count[addr]["count"] += 1
+
+        dict_list_v = {}
+        for k,v in dict_column_count.items():
+            _type = v.get("type")
+            if _type not in dict_list_v:
+                dict_list_v[_type] = []
+            dict_list_v[_type].append(v)
+        for k,v in dict_list_v.items():
+            v.sort(key=lambda x:x["count"],reverse=True)
+            if len(v)>0:
+                _proj[k] = v[0]["value"]
+                for _dynamic in list_dynamic:
+                    docid = _dynamic.get(document_docid)
+                    _v = _dynamic.get(k)
+                    if _v is not None and _v!="":
+                        if _v!=v[0]["value"]:
+                            if docid not in dict_update_dict:
+                                dict_update_dict[docid] = {document_docid:docid}
+                            dict_update_dict[docid][k] = v[0]["value"]
+        list_v = []
+        for k,v in dict_addr_count.items():
+            list_v.append(v)
+        list_v.sort(key=lambda x:x.get("count",0),reverse=True)
+        list_v.sort(key=lambda x:x.get("level",0),reverse=True)
+        if len(list_v)>0:
+            province = list_v[0].get(document_province)
+            city = list_v[0].get(document_city)
+            district = list_v[0].get(document_district)
+
+            _proj[document_province] = province
+            _proj[document_city] = city
+            _proj[document_district] = district
+            for _dynamic in list_dynamic:
+                docid = _dynamic.get(document_docid)
+
+                if document_province in _dynamic:
+                    if _dynamic.get(document_province,"")==province or _dynamic.get(document_province,"") in ("全国","未知",""):
+                        if province!=_dynamic.get(document_province,"") or city!=_dynamic.get(document_city,"") or district!=_dynamic.get(document_district,""):
+                            if docid not in dict_update_dict:
+                                dict_update_dict[docid] = {document_docid:docid}
+                            dict_update_dict[docid][document_province] = province
+                            dict_update_dict[docid][document_city] = city
+                            dict_update_dict[docid][document_district] = district
+        update_v = []
+        for k,v in dict_update_dict.items():
+            update_v.append(v)
+        _proj["document_update"] = update_v
+    except Exception as e:
+        pass
+
+
+
+
 def to_project_json(projects):
 
     list_proj = []
@@ -2964,6 +3135,7 @@ def to_project_json(projects):
             _proj.pop(project_uuid)
         if "project_uuid" in _proj:
             _proj.pop("project_uuid")
+        update_document_from_dynamic(_proj)
     return json.dumps(list_proj,cls=MyEncoder,ensure_ascii=False)
 
 def get_page_time_dis(page_time,n_page_time):
@@ -2984,6 +3156,15 @@ def check_page_time_dup(page_time,n_page_time):
     return False
 
 
+def check_fix_document(doctitle,n_doctitle):
+    _fix = re.search("更正|更新|变更|澄清",doctitle)
+    _n_fix = re.search("更正|更新|变更|澄清",n_doctitle)
+    if _fix is not None and _n_fix is not None:
+        return True
+    if _fix is  None and _n_fix is None:
+        return True
+    return False
+
 def dumplicate_document_in_merge(list_projects,dup_docid):
     '''
     合并时去重
@@ -3033,6 +3214,8 @@ def dumplicate_document_in_merge(list_projects,dup_docid):
                             continue
                         if is_multipack or n_is_multipack:
                             continue
+                        if not check_fix_document(doctitle,n_doctitle):
+                            continue
                         n_title_search = re.search("[一二三四五六七八九十1-9]+(?:次|标|包)",n_doctitle)
                         if title_search is None and n_title_search is None:
                             pass

+ 4 - 2
BaseDataMaintenance/model/oracle/T_SHEN_PI_XIANG_MU.py

@@ -61,8 +61,10 @@ class T_SHEN_PI_XIANG_MU(BaseModel):
         new_dict.pop(T_SHEN_PI_XIANG_MU_ID)
 
         try:
-            new_dict[T_SHEN_PI_XIANG_MU_SOURCE_STAGE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_STAGE,0))
-            new_dict[T_SHEN_PI_XIANG_MU_SOURCE_TYPE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_TYPE,0))
+            if new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_STAGE) is not None:
+                new_dict[T_SHEN_PI_XIANG_MU_SOURCE_STAGE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_STAGE,0))
+            if new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_TYPE) is not None:
+                new_dict[T_SHEN_PI_XIANG_MU_SOURCE_TYPE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_TYPE,0))
         except Exception as e:
             pass
 

+ 54 - 31
BaseDataMaintenance/model/ots/document.py

@@ -308,10 +308,10 @@ def turn_document_status():
         bool_query = BoolQuery(
             must_queries=[
                 # MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
-                WildcardQuery("web_source_no","03716-*"),
-                RangeQuery("page_time","2024-04-24"),
-                TermQuery("save",1)
-                # RangeQuery("status",0,1),
+                # WildcardQuery("web_source_no","03716-*"),
+                # RangeQuery("product_number",500),
+                # TermQuery("save",1)
+                RangeQuery("status",0,1),
                 # NestedQuery("page_attachments",ExistsQuery("page_attachments.fileMd5")),
                 # TermQuery("docid",397656324)
                 # BoolQuery(should_queries=[
@@ -341,25 +341,25 @@ def turn_document_status():
         #
         # )
 
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
-                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-        list_data = getRow_ots(rows)
-        print(total_count)
-        _count = len(list_data)
-        for _data in list_data:
-            _document = Document_tmp(_data)
-            task_queue.put(_document)
-        while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
-                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            _count += len(list_data)
-            print("%d/%d"%(_count,total_count))
-            for _data in list_data:
-                _document = Document_tmp(_data)
-                task_queue.put(_document)
+        # rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+        #                                                                columns_to_get=ColumnsToGet(["product","product_number"],return_type=ColumnReturnType.SPECIFIED))
+        # list_data = getRow_ots(rows)
+        # print(total_count)
+        # _count = len(list_data)
+        # for _data in list_data:
+        #     _document = Document(_data)
+        #     task_queue.put(_document)
+        # while next_token:
+        #     rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+        #                                                                    columns_to_get=ColumnsToGet(["product"],return_type=ColumnReturnType.SPECIFIED))
+        #     list_data = getRow_ots(rows)
+        #     _count += len(list_data)
+        #     print("%d/%d"%(_count,total_count))
+        #     for _data in list_data:
+        #         _document = Document(_data)
+        #         task_queue.put(_document)
 
         # docids = [223820830,224445409]
         # for docid in docids:
@@ -368,18 +368,36 @@ def turn_document_status():
         #              }
         #     task_queue.put(Document(_dict))
         # import pandas as pd
-        # df = pd.read_excel(r"F:\Workspace2016\DataMining\export\abc1.xlsx")
-        # for docid in df["docid1"]:
-        #     _dict = {document_docid:int(docid),
-        #              document_partitionkey:int(docid)%500+1,
-        #              }
-        #     task_queue.put(Document(_dict))
+        # df = pd.read_excel(r"F:\Workspace2016\DataMining\data\2024-07-24_143135_数据导出.xlsx")
+        # list_docid = df["docid"]
+        # list_docid = [519497468]
+
+        list_docid = []
+        filename = r"G:\新建文件夹\WeChat Files\wxid_kluerlj8cn3b21\FileStorage\File\2024-10\金额缺失的id (1).txt"
+        with open(filename,"r",encoding="utf8") as f:
+            while 1:
+                line = f.readline()
+                if not line:
+                    break
+                line = line.strip()
+                docid = line.split('-')[-1]
+                if re.search("^\d+$",docid) is not None:
+                    list_docid.append(int(docid))
+
+        for docid in list_docid:
+            _dict = {document_docid:int(docid),
+                     document_partitionkey:int(docid)%500+1,
+                     }
+            task_queue.put(Document(_dict))
         # for docid in df["docid2"]:
         #     _dict = {document_docid:int(docid),
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        # log("task_queue size:%d"%(task_queue.qsize()))
+        log("task_queue size:%d"%(task_queue.qsize()))
+
+
+
 
     def _handle(item,result_queue,ots_client):
         #change attach value
@@ -407,7 +425,12 @@ def turn_document_status():
         # item.setValue(document_district,"金湾区",True)
         # item.setValue(document_status,66,True)
         # print(item.getProperties())
-        # item.update_row(ots_client)
+        item.setValue(document_status,1,True)
+        # product = item.getProperties().get(document_product)
+        # l_product = product.split(",")
+        # n_product = ",".join(l_product[:500])
+        # item.setValue(document_product,n_product,True)
+        item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_docid)))
         pass
 

+ 5 - 5
BaseDataMaintenance/model/ots/document_tmp.py

@@ -268,7 +268,7 @@ def turn_document_tmp_status():
             must_queries=[
                 # TermQuery("fingerprint","md5=2cc044b81ec13acddcc970b71b780365")
                 # TermQuery("save",66),
-                RangeQuery("status",66),
+                RangeQuery("status",1,51),
                 # BoolQuery(should_queries=[
                 #                           # TermQuery("tenderee","山西利民工业有限责任公司"),
                 #                           # MatchPhraseQuery("doctitle","中国电信"),
@@ -283,7 +283,7 @@ def turn_document_tmp_status():
             ],
             must_not_queries=[
                 # TermQuery("docid",288599518)
-                ExistsQuery("doctitle"),
+                # ExistsQuery("doctitle"),
                 # ExistsQuery("page_time"),
                               ]
         )
@@ -350,14 +350,14 @@ def turn_document_tmp_status():
         # _extract_json = _extract_json.replace("\x06", "").replace("\x05", "").replace("\x07", "").replace('\\', '')
         # item.setValue(document_tmp_extract_json,_extract_json,True)
         # json.loads(_extract_json)
-        # item.setValue(document_tmp_status,71,True)
+        item.setValue(document_tmp_status,0,True)
         # item.setValue(document_tmp_save,1,True)
         # if item.exists_row(ots_client):
         #     item.update_row(ots_client)
         # print(item.getProperties())
-        # item.update_row(ots_client)
+        item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
-        item.delete_row(ots_client)
+        # item.delete_row(ots_client)
         # from BaseDataMaintenance.model.ots.document import Document
         #
         # Doc = Document(item.getProperties())