Bläddra i källkod

增加漏数据及修复数据;附件处理时如果历史已经处理过,则跳过

luojiehua 2 år sedan
förälder
incheckning
6f03687150

+ 42 - 1
BaseDataMaintenance/maintenance/dataflow.py

@@ -2206,7 +2206,48 @@ class Dataflow_dumplicate(Dataflow):
             return the_group[:_index+1]
         return []
 
-    def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
+    def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
+        document_less = _dict1
+        docid_less = _dict1["docid"]
+        docchannel_less = document_less["docchannel"]
+        page_time_less = document_less["page_time"]
+        doctitle_refine_less = document_less["doctitle_refine"]
+        project_codes_less = document_less["project_codes"]
+        nlp_enterprise_less = document_less["nlp_enterprise"]
+        tenderee_less = document_less["tenderee"]
+        agency_less = document_less["agency"]
+        win_tenderer_less = document_less["win_tenderer"]
+        bidding_budget_less = document_less["bidding_budget"]
+        win_bid_price_less = document_less["win_bid_price"]
+        product_less = document_less["product"]
+        package_less = document_less["package"]
+        json_time_less = document_less["dict_time"]
+        project_name_less = document_less["project_name"]
+        fingerprint_less = document_less["fingerprint"]
+        extract_count_less = document_less["extract_count"]
+
+        document_greater = _dict2
+        docid_greater = _dict2["docid"]
+        page_time_greater = document_greater["page_time"]
+        doctitle_refine_greater = document_greater["doctitle_refine"]
+        project_codes_greater = document_greater["project_codes"]
+        nlp_enterprise_greater = document_greater["nlp_enterprise"]
+        tenderee_greater = document_greater["tenderee"]
+        agency_greater = document_greater["agency"]
+        win_tenderer_greater = document_greater["win_tenderer"]
+        bidding_budget_greater = document_greater["bidding_budget"]
+        win_bid_price_greater = document_greater["win_bid_price"]
+        product_greater = document_greater["product"]
+        package_greater = document_greater["package"]
+        json_time_greater = document_greater["dict_time"]
+        project_name_greater = document_greater["project_name"]
+        fingerprint_greater = document_greater["fingerprint"]
+        extract_count_greater = document_greater["extract_count"]
+
+        return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=False)
+
+
+    def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
         document_less = _dict1
         docid_less = _dict1["docid"]
         docchannel_less = document_less["docchannel"]

+ 18 - 9
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -103,15 +103,24 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                     list_html.append({attachment_filemd5:_filemd5,
                                       "html":_html})
                 else:
-                    _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
-                    if not _succeed:
-                        _not_failed = False
+                    #has process_time then jump
+                    if len(str(_attach.getProperties().get(attachment_process_time,"")))>10:
+                        _html = _attach.getProperties().get(attachment_attachmenthtml,"")
+                        if _html is None:
+                            _html = ""
+
+                        list_html.append({attachment_filemd5:_filemd5,
+                                          "html":_html})
+                    else:
+                        _succeed = self.request_attachment_interface(_attach,_dochtmlcon)
+                        if not _succeed:
+                            _not_failed = False
 
-                    _html = _attach.getProperties().get(attachment_attachmenthtml,"")
-                    if _html is None:
-                        _html = ""
-                    list_html.append({attachment_filemd5:_filemd5,
-                                      "html":_html})
+                        _html = _attach.getProperties().get(attachment_attachmenthtml,"")
+                        if _html is None:
+                            _html = ""
+                        list_html.append({attachment_filemd5:_filemd5,
+                                          "html":_html})
 
                 if _attach.getProperties().get(attachment_filetype)=="swf":
                     swf_urls.extend(json.loads(_attach.getProperties().get(attachment_swfUrls,"[]")))
@@ -429,7 +438,7 @@ class Dataflow_ActivteMQ_attachment(Dataflow_attachment):
                         log("getAttachments search in ots:%s"%(_filemd5))
                         _attach = {attachment_filemd5:_filemd5}
                         _attach_ots = attachment(_attach)
-                        _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls],True)
+                        _attach_ots.fix_columns(self.ots_client,[attachment_status,attachment_path,attachment_attachmenthtml,attachment_attachmentcon,attachment_filetype,attachment_swfUrls,attachment_process_time],True)
                         if _attach_ots.getProperties().get(attachment_status) is not None:
                             log("getAttachments find in ots:%s"%(_filemd5))
                             list_attachment.append(Attachment_postgres(_attach_ots.getProperties()))

+ 34 - 2
BaseDataMaintenance/maintenance/document_extract/fixDocFromTmp.py

@@ -1,7 +1,7 @@
 
 
 from tablestore import *
-from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity
+from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_capacity,getConnection_oracle
 
 from BaseDataMaintenance.model.ots.document import Document
 from BaseDataMaintenance.model.ots.document_tmp import Document_tmp
@@ -76,5 +76,37 @@ def fixDoc():
     mt.run()
 
 
+def fix_html_from_oracle():
+    ots_client = getConnect_ots()
+    ots_capacity = getConnect_ots_capacity()
+    conn_oracle = getConnection_oracle()
+    cursor = conn_oracle.cursor()
+    from BaseDataMaintenance.model.oracle.GongGaoTemp import dict_channel_table
+
+    _query = BoolQuery(must_queries=[TermQuery("docid",233591284)])
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document_extract2","document_extract2_index",
+                                                                   SearchQuery(_query,limit=1),
+                                                                   ColumnsToGet(column_names=["uuid","original_docchannel"],return_type=ColumnReturnType.SPECIFIED))
+    list_data = getRow_ots(rows)
+    for _data in list_data:
+        d = Document(_data)
+        d.fix_columns(ots_client,["uuid","original_docchannel"],True)
+        _channel = d.getProperties().get("original_docchannel")
+        _uuid = d.getProperties().get("uuid")
+        for _channel,_table_name in dict_channel_table.items():
+            if _channel==527:
+                _channel = 52
+            _table_name = dict_channel_table[str(_channel)]
+            _table_name = _table_name.replace("_TEMP","")
+
+            sql = "select page_content from %s where id='%s'"%(_table_name,_uuid)
+            cursor.execute(sql)
+            rows = cursor.fetchall()
+            print(_table_name)
+            print(rows)
+    print(d.getProperties())
+    print(list_data)
+
 if __name__ == '__main__':
-    fixDoc()
+    # fixDoc()
+    fix_html_from_oracle()

+ 348 - 23
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -329,7 +329,7 @@ class f_decode_sub_docs_json(BaseUDTF):
                                 columns[_key_sub_docs] = str(sub_docs[_key_sub_docs])
         self.forward(columns["win_tenderer"],columns["bidding_budget"],columns["win_bid_price"],extract_count)
 
-@annotate('string,string -> string,string,string,string,string')
+@annotate('string,string,string -> string,string,string,string,string,string,string')
 class f_decode_for_dumplicate(BaseUDTF):
 
     def __init__(self):
@@ -339,7 +339,7 @@ class f_decode_for_dumplicate(BaseUDTF):
         logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
 
 
-    def process(self,sub_docs_json,extractjson):
+    def process(self,sub_docs_json,extractjson,extract):
         if extractjson is None or extractjson=="":
             extractjson = "{}"
         try:
@@ -359,22 +359,43 @@ class f_decode_for_dumplicate(BaseUDTF):
             list_sub_docs = [{}]
         max_len = max([len(list_product),len(list_code),len(list_sub_docs)])
 
-        for _i in range(max_len):
-            _product = list_product[_i%len(list_product)]
-            _code = list_code[_i%len(list_code)]
-            _subdoc = list_sub_docs[_i%len(list_sub_docs)]
-            win_tenderer = _subdoc.get("win_tenderer","")
-            bidding_budget = _subdoc.get("bidding_budget","0")
-            if float(bidding_budget)==0:
-                bidding_budget = ""
-            else:
-                bidding_budget = str(float(bidding_budget))
-            win_bid_price = _subdoc.get("win_bid_price","0")
-            if float(win_bid_price)==0:
-                win_bid_price = ""
-            else:
-                win_bid_price = str(float(win_bid_price))
-            self.forward(_product,_code,win_tenderer,bidding_budget,win_bid_price)
+        if extract!="extract":
+            win_tenderer = ""
+            bidding_budget = ""
+            win_bid_price = ""
+            for _subdoc in list_sub_docs:
+                win_tenderer = _subdoc.get("win_tenderer","")
+                bidding_budget = _subdoc.get("bidding_budget","0")
+                if float(bidding_budget)==0:
+                    bidding_budget = ""
+                else:
+                    bidding_budget = str(float(bidding_budget))
+                win_bid_price = _subdoc.get("win_bid_price","0")
+                if float(win_bid_price)==0:
+                    win_bid_price = ""
+                else:
+                    win_bid_price = str(float(win_bid_price))
+                if len(set([win_tenderer,bidding_budget,win_bid_price]))>=3:
+                    break
+
+            self.forward("",product,"",project_codes,win_tenderer,bidding_budget,win_bid_price)
+        else:
+            for _i in range(max_len):
+                _product = list_product[_i%len(list_product)]
+                _code = list_code[_i%len(list_code)]
+                _subdoc = list_sub_docs[_i%len(list_sub_docs)]
+                win_tenderer = _subdoc.get("win_tenderer","")
+                bidding_budget = _subdoc.get("bidding_budget","0")
+                if float(bidding_budget)==0:
+                    bidding_budget = ""
+                else:
+                    bidding_budget = str(float(bidding_budget))
+                win_bid_price = _subdoc.get("win_bid_price","0")
+                if float(win_bid_price)==0:
+                    win_bid_price = ""
+                else:
+                    win_bid_price = str(float(win_bid_price))
+                self.forward(_product,product,_code,project_codes,win_tenderer,bidding_budget,win_bid_price)
 
 
 
@@ -595,12 +616,22 @@ class f_dump_probability(BaseUDAF):
         buffer[0].extend(pbuffer[0])
 
     def terminate(self, buffer):
-        list_dict = buffer[0]
-        list_dict = list_dict[:10000]
-        list_group = split_with_time(list_dict,sort_key="page_time_stamp",timedelta=86400*2)
 
+        list_dict = buffer[0]
+        _set = set()
+        list_data = []
+        for _dict in list_dict:
+            docid = _dict["docid"]
+            if docid in _set:
+                continue
+            _set.add(docid)
+            list_data.append(_dict)
+            if len(list_data)>10000:
+                break
+        list_group = split_with_time(list_data,sort_key="page_time_stamp",timedelta=86400*2)
         return json.dumps(list_group)
 
+
 @annotate('string -> bigint,bigint,bigint,bigint,string')
 class f_split_dumplicate_probability(BaseUDTF):
 
@@ -634,7 +665,7 @@ class f_split_dumplicate_probability(BaseUDTF):
                             _docid2 = _group[_index_j]["docid"]
                             if _docid1<_docid2:
                                 self.forward(_docid1,_docid2,1,_len,_type)
-                            else:
+                            elif _docid1>_docid2:
                                 self.forward(_docid2,_docid1,1,_len,_type)
             except Exception as e:
                 logging(str(e))
@@ -989,6 +1020,202 @@ def check_time(json_time_less,json_time_greater):
                         return False
     return True
 
+def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=False):
+    if fingerprint_less==fingerprint_greater and getLength(fingerprint_less)>0:
+        return 1
+    if isinstance(project_codes_less,str):
+        project_codes_less = [a for a in project_codes_less.split(",") if a!=""]
+    if isinstance(project_codes_greater,str):
+        project_codes_greater = [a for a in project_codes_greater.split(",") if a!=""]
+
+
+    same_count = 0
+    all_count = 8
+    if len(set(project_codes_less) & set(project_codes_greater))>0:
+        same_count += 1
+    if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
+        same_count += 1
+    if getLength(agency_less)>0 and agency_less==agency_greater:
+        same_count += 1
+    if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
+        same_count += 1
+    if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
+        same_count += 1
+    if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
+        same_count += 1
+    if getLength(project_name_less)>0 and project_name_less==project_name_greater:
+        same_count += 1
+    if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
+        same_count += 1
+    base_prob = 0
+    if min_counts<3:
+        base_prob = 0.9
+    elif min_counts<5:
+        base_prob = 0.8
+    elif min_counts<8:
+        base_prob = 0.7
+    else:
+        base_prob = 0.6
+    _prob = base_prob*same_count/all_count
+    if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
+        _prob = 0.15
+    if _prob<0.1:
+        return _prob
+
+    check_result = {"pass":1}
+    if docchannel_less in (51,102,103,104,115,116,117):
+        if doctitle_refine_less!=doctitle_refine_greater:
+            if page_time_less!=page_time_greater:
+                check_result["docchannel"] = 0
+                check_result["pass"] = 0
+            else:
+                check_result["docchannel"] = 2
+    if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
+        check_result["doctitle"] = 0
+        check_result["pass"] = 0
+        if b_log:
+            logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
+    else:
+        check_result["doctitle"] = 2
+
+    #added check
+    if not check_codes(project_codes_less,project_codes_greater):
+        check_result["code"] = 0
+        check_result["pass"] = 0
+        if b_log:
+            logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
+    else:
+        if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
+            check_result["code"] = 2
+        else:
+            check_result["code"] = 1
+
+
+    if not check_product(product_less,product_greater):
+        check_result["product"] = 0
+        check_result["pass"] = 0
+        if b_log:
+            logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
+    else:
+        if getLength(product_less)>0 and getLength(product_greater)>0:
+            check_result["product"] = 2
+        else:
+            check_result["product"] = 1
+
+    if not check_demand():
+        check_result["pass"] = 0
+
+    if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
+                        tenderee_less,tenderee_greater,
+                        agency_less,agency_greater,
+                        win_tenderer_less,win_tenderer_greater):
+        check_result["entity"] = 0
+        check_result["pass"] = 0
+        if b_log:
+            logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
+    else:
+        if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
+            check_result["entity"] = 2
+        elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
+            check_result["entity"] = 2
+        else:
+            check_result["entity"] = 1
+
+    if not check_money(bidding_budget_less,bidding_budget_greater,
+                       win_bid_price_less,win_bid_price_greater):
+        if b_log:
+            logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
+        check_result["money"] = 0
+        check_result["pass"] = 0
+    else:
+        if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
+            check_result["money"] = 2
+        elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
+            check_result["money"] = 2
+        else:
+            check_result["money"] = 1
+
+    #added check
+    if not check_package(package_less,package_greater):
+        if b_log:
+            logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
+        check_result["package"] = 0
+        check_result["pass"] = 0
+    else:
+        if getLength(package_less)>0 and getLength(package_greater)>0:
+            check_result["package"] = 2
+        else:
+            check_result["package"] = 1
+
+    #added check
+    if not check_time(json_time_less,json_time_greater):
+        if b_log:
+            logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
+            if isinstance(json_time_less,dict):
+                time_less = json_time_less
+            else:
+                time_less = json.loads(json_time_less)
+            if isinstance(json_time_greater,dict):
+                time_greater = json_time_greater
+            else:
+                time_greater = json.loads(json_time_greater)
+            for k,v in time_less.items():
+                if getLength(v)>0:
+                    v1 = time_greater.get(k,"")
+                    if getLength(v1)>0:
+                        if v!=v1:
+                            logging.info("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
+
+        check_result["time"] = 0
+        check_result["pass"] = 0
+    else:
+        if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
+            check_result["time"] = 2
+        else:
+            check_result["time"] = 1
+
+    if check_result.get("pass",0)==0:
+        if b_log:
+            logging.info(str(check_result))
+
+        if check_result.get("money",1)==0:
+            return 0
+
+        if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
+            return _prob
+        else:
+            return 0
+        if check_result.get("time",1)==0:
+            return 0
+    return _prob
+
+@annotate("bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string->double")
+class f_dumplicate_check(BaseUDTF):
+    def __init__(self):
+        import logging
+        import json
+        global logging,json
+
+    def process(self,docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,
+                tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,
+                bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,
+                project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,
+                extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,
+                page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,
+                package_less,package_greater,json_time_less,json_time_greater,json_context):
+        _context = json.loads(json_context)
+
+        min_counts = 100
+
+
+
+        for item in _context:
+            if item["counts"]<min_counts:
+                min_counts = item["counts"]
+
+        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=False)
+        self.forward(_prob)
+
 @annotate("string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string,double")
 class f_dumplicate_featureMatrix(BaseUDTF):
 
@@ -1107,7 +1334,7 @@ class f_dumplicate_featureMatrix(BaseUDTF):
         self.forward(json_matrix,_prob)
         return
 
-@annotate('bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,bigint,double->string')
+@annotate('bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,bigint,double->string')
 class f_redump_probability_final_check(BaseUDAF):
     '''
     去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
@@ -1122,6 +1349,104 @@ class f_redump_probability_final_check(BaseUDAF):
     def new_buffer(self):
         return [list()]
 
+    def iterate(self, buffer,main_docid,docid,newly,docchannel,nlp_enterprise,product,package,json_dicttime,page_time,project_codes,project_name,doctitle_refine,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,confidence):
+        buffer[0].append({"main_docid":main_docid,"docid":docid,"docchannel":docchannel,"nlp_enterprise":nlp_enterprise,"product":product,"package":package,"json_dicttime":json_dicttime,"page_time":page_time,
+                          "project_codes":project_codes,"project_name":project_name,"doctitle_refine":doctitle_refine,"tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,
+                          "win_bid_price":win_bid_price,"extract_count":extract_count,"confidence":confidence})
+
+    def merge(self, buffer, pbuffer):
+        buffer[0].extend(pbuffer[0])
+
+    def terminate(self, buffer):
+        list_group = []
+        the_group = buffer[0]
+        the_group.sort(key=lambda x:x["confidence"],reverse=True)
+        _index = 0
+        if len(the_group)>0:
+            _index = 1
+            while _index<len(the_group):
+                document_greater = the_group[_index]
+                docid_greater = document_greater["docid"]
+                docchannel_greater = document_greater["docchannel"]
+                page_time_greater = document_greater["page_time"]
+                doctitle_refine_greater = document_greater["doctitle_refine"]
+                project_codes_greater = document_greater["project_codes"]
+                nlp_enterprise_greater = document_greater["nlp_enterprise"]
+                tenderee_greater = document_greater["tenderee"]
+                agency_greater = document_greater["agency"]
+                win_tenderer_greater = document_greater["win_tenderer"]
+                bidding_budget_greater = document_greater["bidding_budget"]
+                win_bid_price_greater = document_greater["win_bid_price"]
+                product_greater = document_greater["product"]
+                package_greater = document_greater["package"]
+                json_time_greater = document_greater["json_dicttime"]
+                fingerprint_greater = document_greater.get("fingerprint","")
+                project_name_greater = document_greater["project_name"]
+                extract_count_greater = document_greater["extract_count"]
+                _less_index = 0
+                while _less_index<_index:
+
+                    document_less = the_group[_less_index]
+                    docid_less = document_less["docid"]
+                    docchannel_less = document_less["docchannel"]
+                    page_time_less = document_less["page_time"]
+                    doctitle_refine_less = document_less["doctitle_refine"]
+                    project_codes_less = document_less["project_codes"]
+                    nlp_enterprise_less = document_less["nlp_enterprise"]
+                    tenderee_less = document_less["tenderee"]
+                    agency_less = document_less["agency"]
+                    win_tenderer_less = document_less["win_tenderer"]
+                    bidding_budget_less = document_less["bidding_budget"]
+                    win_bid_price_less = document_less["win_bid_price"]
+                    product_less = document_less["product"]
+                    package_less = document_less["package"]
+                    json_time_less = document_less["json_dicttime"]
+                    fingerprint_less = document_less.get("fingerprint","")
+                    project_name_less = document_less["project_name"]
+                    extract_count_less = document_less["extract_count"]
+
+                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,len(the_group),b_log=False)
+
+                    if _prob<0.1:
+                        break
+
+                    _less_index += 1
+                if _less_index!=_index:
+                    break
+                _index += 1
+
+        dumplicates = ""
+        if _index>1:
+            logging.info("index/whole:%d/%d"%(_index,len(the_group)))
+            final_group = the_group[:_index]
+            final_group.sort(key=lambda x:x["docid"])
+            final_group.sort(key=lambda x:x["extract_count"],reverse=True)
+            _set = set()
+            for _d in final_group:
+                _docid = _d["docid"]
+                if _docid in _set:
+                    continue
+                dumplicates += "%d,"%_docid
+                _set.add(_docid)
+            dumplicates = dumplicates[:-1]
+
+        return dumplicates
+
+@annotate('bigint,bigint,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,bigint,double->string')
+class f_redump_probability_final_check_bak(BaseUDAF):
+    '''
+    去重合并后重新判断,组内个数大于5时,dottitle、tenderee、win_tenderer、bidding_budget组内只能有一个取值
+    组内个数小于等于5时,tenderee、win_tenderer、bidding_budget组内只能有一个取值
+    '''
+    def __init__(self):
+        import logging
+        import json,re
+        global json,logging,re
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def new_buffer(self):
+        return [list()]
+
     def iterate(self, buffer,main_docid,docid,newly,docchannel,nlp_enterprise,product,package,json_dicttime,page_time,project_code,doctitle_refine,tenderee,agency,win_tenderer,bidding_budget,win_bid_price,extract_count,confidence):
         buffer[0].append({"main_docid":main_docid,"docid":docid,"docchannel":docchannel,"nlp_enterprise":nlp_enterprise,"product":product,"package":package,"json_dicttime":json_dicttime,"page_time":page_time,
                           "project_code":project_code,"doctitle_refine":doctitle_refine,"tenderee":tenderee,"agency":agency,"win_tenderer":win_tenderer,"bidding_budget":bidding_budget,

+ 1 - 0
BaseDataMaintenance/model/oracle/BaseModel.py

@@ -2,6 +2,7 @@
 import traceback
 
 
+
 class BaseModel():
 
     def __init__(self):

+ 13 - 0
BaseDataMaintenance/model/oracle/GongGaoTemp.py

@@ -22,6 +22,19 @@ dict_oracle2ots = {"WEB_SOURCE_NO":"web_source_no",
                     "DETAIL_LINK":"detail_link",
                    "docchannel":"docchannel"}
 
+dict_channel_table = {"114":"bxkc.T_CAI_GOU_YI_XIANG_TEMP",
+                      "117":"bxkc.T_CHAN_QUAN_JIAO_YI_TEMP",
+                      "51":"bxkc.T_GONG_GAO_BIAN_GENG_TEMP",
+                      "103":"bxkc.T_KONG_ZHI_JIA_TEMP",
+                      "115":"bxkc.T_PAI_MAI_CHU_RANG_TEMP",
+                      "116":"bxkc.T_TU_DI_KUANG_CHAN_TEMP",
+                      "103":"bxkc.T_ZHAO_BIAO_DA_YI_TEMP",
+                      "52":"bxkc.T_ZHAO_BIAO_GONG_GAO_TEMP",
+                      "104":"bxkc.T_ZHAO_BIAO_WEN_JIAN_TEMP",
+                      "102":"bxkc.T_ZHAO_BIAO_YU_GAO_TEMP",
+                      "101":"bxkc.T_ZHONG_BIAO_XIN_XI_TEMP",
+                      "105":"bxkc.T_ZI_SHEN_JIE_GUO_TEMP"}
+
 class GongGaoTemp(BaseModel):
 
     def __init__(self,_dict):

+ 61 - 61
BaseDataMaintenance/model/ots/document.py

@@ -289,43 +289,43 @@ def turn_document_status():
     def producer(task_queue,ots_client):
 
 
-        bool_query = BoolQuery(
-            must_queries=[
-                MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
-                # BoolQuery(should_queries=[
-                #                           # TermQuery("tenderee","山西利民工业有限责任公司"),
-                #                           # MatchPhraseQuery("doctitle","中国电信"),
-                #                           # MatchPhraseQuery("doctextcon","中国电信"),
-                #                           # MatchPhraseQuery("attachmenttextcon","中国电信")]),
-                #                           # RangeQuery(document_status,88,120,True,True),
-                #                           RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
-                #                           ExistsQuery
-                #                                  #,TermQuery(document_docid,171146519)
-                #                                  ]
-                # )
-            ],
-            # must_not_queries=[WildcardQuery("DX004354*")]
-        )
-
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
-        list_data = getRow_ots(rows)
-        print(total_count)
-        _count = len(list_data)
-        for _data in list_data:
-            _document = Document(_data)
-            task_queue.put(_document)
-        while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            _count += len(list_data)
-            print("%d/%d"%(_count,total_count))
-            for _data in list_data:
-                _document = Document(_data)
-                task_queue.put(_document)
+        # bool_query = BoolQuery(
+        #     must_queries=[
+        #         MatchPhraseQuery("doctitle","珠海城市职业技术学院2022年05月至2022年06月政府采购意向"),
+        #         # BoolQuery(should_queries=[
+        #         #                           # TermQuery("tenderee","山西利民工业有限责任公司"),
+        #         #                           # MatchPhraseQuery("doctitle","中国电信"),
+        #         #                           # MatchPhraseQuery("doctextcon","中国电信"),
+        #         #                           # MatchPhraseQuery("attachmenttextcon","中国电信")]),
+        #         #                           # RangeQuery(document_status,88,120,True,True),
+        #         #                           RangeQuery("page_time","2022-03-24","2022-03-25",True,False),
+        #         #                           ExistsQuery
+        #         #                                  #,TermQuery(document_docid,171146519)
+        #         #                                  ]
+        #         # )
+        #     ],
+        #     # must_not_queries=[WildcardQuery("DX004354*")]
+        # )
+        #
+        # rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+        #                                                                columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
+        # list_data = getRow_ots(rows)
+        # print(total_count)
+        # _count = len(list_data)
+        # for _data in list_data:
+        #     _document = Document(_data)
+        #     task_queue.put(_document)
+        # while next_token:
+        #     rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+        #                                                                    columns_to_get=ColumnsToGet([document_area],return_type=ColumnReturnType.SPECIFIED))
+        #     list_data = getRow_ots(rows)
+        #     _count += len(list_data)
+        #     print("%d/%d"%(_count,total_count))
+        #     for _data in list_data:
+        #         _document = Document(_data)
+        #         task_queue.put(_document)
 
         # docids = [223820830,224445409]
         # for docid in docids:
@@ -333,15 +333,13 @@ def turn_document_status():
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        # import pandas as pd
-        # df = pd.read_excel("2022-01-19_214304_export11.xlsx")
-        # for docid,tenderee,win in zip(df["docid"],df["招标单位"],df["中标单位"]):
-        #     if not isinstance(tenderee,(str)) or not isinstance(win,(str)) or win=="" or tenderee=="":
-        #         # print(docid)
-        #         _dict = {document_docid:int(docid),
-        #                  document_partitionkey:int(docid)%500+1,
-        #                  }
-        #         task_queue.put(Document(_dict))
+        import pandas as pd
+        df = pd.read_excel("G:\\20221212error.xlsx")
+        for docid in df["docid"]:
+            _dict = {document_docid:int(docid),
+                     document_partitionkey:int(docid)%500+1,
+                     }
+            task_queue.put(Document(_dict))
         log("task_queue size:%d"%(task_queue.qsize()))
 
     def _handle(item,result_queue,ots_client):
@@ -364,12 +362,14 @@ def turn_document_status():
         #change status
         # item.setValue(document_docchannel,item.getProperties().get(document_original_docchannel),True)
         # item.setValue(document_status,random.randint(151,171),True)
-        item.setValue(document_area,"华南",True)
-        item.setValue(document_province,"广东",True)
-        item.setValue(document_city,"珠海",True)
-        item.setValue(document_district,"金湾区",True)
+        # item.setValue(document_area,"华南",True)
+        # item.setValue(document_province,"广东",True)
+        # item.setValue(document_city,"珠海",True)
+        # item.setValue(document_district,"金湾区",True)
+        item.setValue(document_status,1,True)
+        # print(item.getProperties())
         item.update_row(ots_client)
-        log("update %d status done"%(item.getProperties().get(document_docid)))
+        # log("update %d status done"%(item.getProperties().get(document_docid)))
         pass
 
 
@@ -546,14 +546,7 @@ def fixDocumentHtml():
     mt = MultiThreadHandler(task_queue,_handle,None,2)
     mt.run()
 
-
-
-
-if __name__=="__main__":
-    # turn_extract_status()
-    # turn_document_status()
-    # drop_extract2()
-    # fixDocumentHtml()
+def delete_documents():
     from BaseDataMaintenance.dataSource.source import getConnect_ots
     from BaseDataMaintenance.dataSource.source import getConnect_ots_capacity
     ots_client = getConnect_ots()
@@ -562,7 +555,7 @@ if __name__=="__main__":
     df = pd.read_excel("2022-10-14_190838_数据导出.xlsx")
     _count = 0
     for _docid in df["docid"]:
-        partitionkey = int(_docid)//500+1
+        partitionkey = int(_docid)%500+1
         _d = {document_partitionkey:partitionkey,
               document_docid:int(_docid)}
         _doc = Document(_d)
@@ -570,4 +563,11 @@ if __name__=="__main__":
         _doc.delete_row(ots_capacity)
         _count += 1
         print(_docid)
-    print("delete count:%d"%_count)
+    print("delete count:%d"%_count)
+
+
+if __name__=="__main__":
+    # turn_extract_status()
+    turn_document_status()
+    # drop_extract2()
+    # fixDocumentHtml()