ソースを参照

优化重跑速度

luojiehua 7 ヶ月 前
コミット
5e43026e93

+ 2 - 2
BaseDataMaintenance/common/ossUtils.py

@@ -108,7 +108,7 @@ def test_download(filemd5):
 
 
 if __name__=="__main__":
-    # print(getMDFFromFile('8a9c96a68803c2ad01881d0ee93618e5.pdf'))
-    test_download("892bde698088f1d61b5310782550d0e1")
+    print(getMDFFromFile(r'G:\新建文件夹\WeChat Files\wxid_kluerlj8cn3b21\FileStorage\File\2024-09\中国区超低氮锅炉电锅炉招标文件与附件(1).zip'))
+    # test_download("892bde698088f1d61b5310782550d0e1")
     # print(bucket.sign_url("GET","0015//20220623/2022-06-22/WGH001018/1655926900020.png",86500*30))
     # print(time.strftime("%Y-%m-%d",time.localtime(1658655178)))

+ 64 - 27
BaseDataMaintenance/maintenance/dataflow.py

@@ -2277,22 +2277,22 @@ class Dataflow_dumplicate(Dataflow):
     def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
         document_less = _dict1
         docid_less = _dict1["docid"]
-        docchannel_less = document_less["docchannel"]
-        page_time_less = document_less["page_time"]
+        docchannel_less = document_less.get("docchannel",0)
+        page_time_less = document_less.get("page_time")
         doctitle_refine_less = document_less["doctitle_refine"]
-        project_codes_less = document_less["project_codes"]
+        project_codes_less = document_less.get("project_codes")
         nlp_enterprise_less = document_less["nlp_enterprise"]
-        tenderee_less = document_less["tenderee"]
-        agency_less = document_less["agency"]
+        tenderee_less = document_less.get("tenderee","")
+        agency_less = document_less.get("agency")
         win_tenderer_less = document_less["win_tenderer"]
         bidding_budget_less = document_less["bidding_budget"]
         win_bid_price_less = document_less["win_bid_price"]
-        product_less = document_less["product"]
-        package_less = document_less["package"]
-        json_time_less = document_less["dict_time"]
-        project_name_less = document_less["project_name"]
-        fingerprint_less = document_less["fingerprint"]
-        extract_count_less = document_less["extract_count"]
+        product_less = document_less.get("product")
+        package_less = document_less.get("package")
+        json_time_less = document_less.get("dict_time")
+        project_name_less = document_less.get("project_name")
+        fingerprint_less = document_less.get("fingerprint")
+        extract_count_less = document_less.get("extract_count",0)
         web_source_no_less = document_less.get("web_source_no")
         province_less = document_less.get("province")
         city_less = document_less.get("city")
@@ -2308,21 +2308,21 @@ class Dataflow_dumplicate(Dataflow):
         document_greater = _dict2
         docid_greater = _dict2["docid"]
         page_time_greater = document_greater["page_time"]
-        docchannel_greater = document_greater["docchannel"]
-        doctitle_refine_greater = document_greater["doctitle_refine"]
+        docchannel_greater = document_greater.get("docchannel",0)
+        doctitle_refine_greater = document_greater.get("doctitle_refine","")
         project_codes_greater = document_greater["project_codes"]
         nlp_enterprise_greater = document_greater["nlp_enterprise"]
-        tenderee_greater = document_greater["tenderee"]
-        agency_greater = document_greater["agency"]
+        tenderee_greater = document_greater.get("tenderee","")
+        agency_greater = document_greater.get("agency","")
         win_tenderer_greater = document_greater["win_tenderer"]
         bidding_budget_greater = document_greater["bidding_budget"]
         win_bid_price_greater = document_greater["win_bid_price"]
-        product_greater = document_greater["product"]
-        package_greater = document_greater["package"]
+        product_greater = document_greater.get("product")
+        package_greater = document_greater.get("package")
         json_time_greater = document_greater["dict_time"]
-        project_name_greater = document_greater["project_name"]
-        fingerprint_greater = document_greater["fingerprint"]
-        extract_count_greater = document_greater["extract_count"]
+        project_name_greater = document_greater.get("project_name")
+        fingerprint_greater = document_greater.get("fingerprint")
+        extract_count_greater = document_greater.get("extract_count",0)
         web_source_no_greater = document_greater.get("web_source_no")
         province_greater = document_greater.get("province")
         city_greater = document_greater.get("city")
@@ -2571,7 +2571,7 @@ class Dataflow_dumplicate(Dataflow):
                 else:
                     bool_query = _query
                 rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
-                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=30,get_total_count=True),
+                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=60,get_total_count=True),
                                                                                     ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
                 list_dict = getRow_ots(rows)
                 list_data = []
@@ -3953,6 +3953,27 @@ class Dataflow_dumplicate(Dataflow):
                 list_projects = []
 
             _time = time.time()
+
+            projects = list_projects
+            for _proj in projects:
+                dup_docid = _proj.get(project_dup_docid,"")
+                list_dup_docid = dup_docid.split(",")
+                new_dup_docid = []
+                for _docid in list_dup_docid:
+                    if _docid=="":
+                        continue
+                    docid = int(_docid)
+                    _d = {"partitionkey":docid%500+1,
+                          "docid":docid,
+                          }
+                    _doc = Document(_d)
+
+                    if _doc.fix_columns(self.ots_client,[document_update_document],True):
+                        if _doc.getProperties().get(document_update_document,"")!="true":
+                            new_dup_docid.append(str(docid))
+                _proj[project_dup_docid] = ",".join(new_dup_docid)
+            list_projects = projects
+
             project_json = to_project_json(list_projects)
             # log("json projects takes:%.3f"%(time.time()-_time))
             if b_log:
@@ -3987,6 +4008,11 @@ class Dataflow_dumplicate(Dataflow):
         has_before = False
         has_after = False
 
+        bidclose_time = page_time
+        web_source_name = item.get(document_tmp_web_source_name,"")
+
+
+
         if len(page_time)>0:
             l_page_time = timeAdd(page_time,days=-90)
             dict_time = item.get("dict_time",{})
@@ -3996,6 +4022,14 @@ class Dataflow_dumplicate(Dataflow):
                         has_before = True
                     if v>page_time:
                         has_after = True
+                    if k==document_tmp_time_bidclose:
+                        bidclose_time = v
+
+        set_web_source = {"中国招标投标公共服务平台","比地招标"}
+
+        if web_source_name in set_web_source and bidclose_time<page_time:
+            return False
+
         log("check page_time has_before %s has_after %s"%(str(has_before),str(has_after)))
         if has_before:
             _query = BoolQuery(must_queries=[MatchPhraseQuery(document_doctitle,item.get(document_doctitle,""))],
@@ -4088,7 +4122,7 @@ class Dataflow_dumplicate(Dataflow):
             remove_list = []
 
 
-            if self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid)):
+            if (self.check_page_time(item) and (len(final_list)==0 or best_docid==item.get(document_tmp_docid))) or item.get("update_document","")=="true":
                 dtmp.setValue(document_tmp_save,1,True)
                 # dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
                 dmp_docid = ",".join([str(a) for a in list(dup_docid)])
@@ -4124,7 +4158,9 @@ class Dataflow_dumplicate(Dataflow):
                 dtmp.setValue(document_tmp_projects,"[]",True)
             else:
                 project_json,list_merge_dump = self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log)
-                if list_merge_dump is not None and str(item.get(document_tmp_docid)) in list_merge_dump:
+
+
+                if list_merge_dump is not None and str(item.get(document_tmp_docid)) in list_merge_dump and item.get("update_document","")!="true":
                     dtmp.setValue(document_tmp_save,0,True)
                 dtmp.setValue(document_tmp_projects,project_json,True)
             log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
@@ -4250,10 +4286,11 @@ class Dataflow_dumplicate(Dataflow):
                           document_tmp_save:0
                           }
                     _d_tmp = Document_tmp(_d)
-                    if _d_tmp.fix_columns(self.ots_client,["status"],True):
+                    if _d_tmp.fix_columns(self.ots_client,["status",document_update_document],True):
                         if _d_tmp.getProperties().get("status")==1:
-                            _d_tmp.setValue("status",0,True)
-                            _d_tmp.update_row(self.ots_client)
+                            if _d_tmp.getProperties().get(document_update_document,"")!="true":
+                                _d_tmp.setValue("status",0,True)
+                                _d_tmp.update_row(self.ots_client)
 
 
 
@@ -4450,7 +4487,7 @@ if __name__ == '__main__':
     # test_attachment_interface()
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
-    df_dump.test_dumplicate(519262974
+    df_dump.test_dumplicate(457656095
                             )
     # compare_dumplicate_check()
     # df_dump.test_merge([391898061

+ 109 - 26
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -1449,6 +1449,7 @@ class Dataflow_init(Dataflow):
                             ots_dict = _data.getProperties_ots()
                             if ots_dict["docid"]<self.base_shenpi_id:
                                 ots_dict["docid"] += self.base_shenpi_id
+                                ots_dict["partitionkey"] = ots_dict["docid"]%500+1
 
                             if ots_dict.get(T_SHEN_PI_XIANG_MU_PAGE_ATTACHMENTS,"") !='[]':
                                 if send_msg_toacmq(self.pool_mq,json.dumps(ots_dict,cls=MyEncoder),self.mq_attachment):
@@ -1469,7 +1470,87 @@ class Dataflow_init(Dataflow):
             traceback.print_exc()
             self.pool_oracle.decrease()
 
+    def fix_shenpi(self):
+
+        pool_oracle = ConnectorPool(10,15,getConnection_oracle)
+        begin_id = 0
+        end_id = 64790010
+        thread_num = 15
+        step = (end_id-begin_id)//thread_num
+        list_items = []
+        for _i in range(thread_num):
+            _begin = _i*step
+            _end = (_i+1)*step-1
+            if _i==thread_num-1:
+                _end = end_id
+            list_items.append((_begin,_end,_i))
+        task_queue = Queue()
+        for item in list_items:
+            task_queue.put(item)
+
+        fix_count_list = []
+
+        def _handle(item,result_queue):
+            conn_oracle = pool_oracle.getConnector()
+            (begin_id,end_id,thread_id) = item
+
+            _count = 0
+            for _id_i in range(begin_id,end_id):
+                try:
+                    bool_query = BoolQuery(must_queries=[
+                        TermQuery("docchannel",302),
+                        TermQuery("original_id",_id_i)
+                    ])
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                        SearchQuery(bool_query,get_total_count=True))
+                    if total_count>0:
+                        continue
+
+                    # bool_query = BoolQuery(must_queries=[
+                    #     TermQuery("id",_id_i),
+                    # ])
+                    # rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
+                    #                                                                     SearchQuery(bool_query,get_total_count=True))
+                    # if total_count>0:
+                    #     continue
+
+                    try:
+                        list_data = T_SHEN_PI_XIANG_MU.select_rows(conn_oracle,_id_i)
+                    except Exception as e:
+                        continue
+
+                    # send data to mq one by one with max_shenpi_id updated
+                    for _data in list_data:
+
+                        _id = _data.getProperties().get(T_SHEN_PI_XIANG_MU_ID)
+
+                        ots_dict = _data.getProperties_ots()
+                        if ots_dict["docid"]<self.base_shenpi_id:
+                            ots_dict["docid"] += self.base_shenpi_id
+                            ots_dict["partitionkey"] = ots_dict["docid"]%500+1
+                        ots_dict["status"] = 201
+                        dict_1 = {}
+                        dict_2 = {}
+                        for k,v in ots_dict.items():
+                            if k!="dochtmlcon":
+                                dict_1[k] = v
+                            if k in ('partitionkey',"docid","dochtmlcon"):
+                                dict_2[k] = v
+                        d_1 = Document(dict_1)
+                        d_2 = Document(dict_2)
+                        d_1.update_row(self.ots_client)
+                        d_2.update_row(self.ots_capacity)
+                        _count += 1
+                except Exception as e:
+                    traceback.print_exc()
 
+                log("thread_id:%d=%d/%d/%d"%(thread_id,_id_i-begin_id,_count,end_id-begin_id))
+            fix_count_list.append(_count)
+            pool_oracle.putConnector(conn_oracle)
+
+        mt = MultiThreadHandler(task_queue,_handle,None,thread_count=thread_num)
+        mt.run()
+        print(fix_count_list,sum(fix_count_list))
 
     def ots2mq(self):
         try:
@@ -1477,13 +1558,32 @@ class Dataflow_init(Dataflow):
 
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
                                                                                 SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_docid)]),get_total_count=True,limit=100),
-                                                                                ColumnsToGet(return_type=ColumnReturnType.ALL))
+                                                                                ColumnsToGet(return_type=ColumnReturnType.NONE))
             list_data = getRow_ots(rows)
+            task_queue = Queue()
             for _data in list_data:
+                task_queue.put(_data)
+
+
+            while next_token:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                                    SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                                    ColumnsToGet(return_type=ColumnReturnType.NONE))
+                list_data = getRow_ots(rows)
+
+                for _data in list_data:
+                    task_queue.put(_data)
+
+                if task_queue.qsize()>=1000:
+                    break
+
+            def _handle(_data,result_queue):
+
                 _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
                       document_tmp_docid:_data.get(document_tmp_docid),
                       document_tmp_status:0}
                 _document = Document(_d)
+                _document.fix_columns(self.ots_client,None,True)
                 page_attachments = _data.get(document_tmp_attachment_path,"[]")
 
                 _document_html = Document(_data)
@@ -1498,36 +1598,16 @@ class Dataflow_init(Dataflow):
                     _data[document_tmp_status] = status
                     send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
                 if send_succeed:
+                    _document.setValue(document_tmp_status,0,True)
                     _document.update_row(self.ots_client)
                 else:
                     log("send_msg_error2222")
-            while next_token:
-                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
-                                                                                    SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
-                                                                                    ColumnsToGet(return_type=ColumnReturnType.ALL))
-                list_data = getRow_ots(rows)
-                for _data in list_data:
-                    _d = {document_tmp_partitionkey:_data.get(document_tmp_partitionkey),
-                          document_tmp_docid:_data.get(document_tmp_docid),
-                          document_tmp_status:0}
-                    _document = Document(_d)
-                    page_attachments = _data.get(document_tmp_attachment_path,"[]")
 
-                    _document_html = Document(_data)
-                    _document_html.fix_columns(self.ots_capacity,[document_tmp_dochtmlcon],True)
+            if task_queue.qsize()>0:
+                mt = MultiThreadHandler(task_queue,_handle,None,15)
+                mt.run()
+
 
-                    if page_attachments!="[]":
-                        status = random.randint(1,10)
-                        _data[document_tmp_status] = status
-                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_attachment)
-                    else:
-                        status = random.randint(11,50)
-                        _data[document_tmp_status] = status
-                        send_succeed = send_msg_toacmq(self.pool_mq,json.dumps(_document_html.getProperties(),cls=MyEncoder),self.mq_extract)
-                    if send_succeed:
-                        _document.update_row(self.ots_client)
-                    else:
-                        log("send_msg_error2222")
         except Exception as e:
             traceback.print_exc()
 
@@ -1546,6 +1626,8 @@ class Dataflow_init(Dataflow):
                 _document = Document_tmp(_d)
                 page_attachments = _data.get(document_tmp_attachment_path,"[]")
 
+                log("refix doc %s from document_tmp"%(str(_data.get(document_tmp_docid))))
+
                 _document_html = Document_html(_data)
                 _document_html.fix_columns(self.ots_client,[document_tmp_dochtmlcon],True)
 
@@ -1675,6 +1757,7 @@ class Dataflow_init(Dataflow):
 
 
 
+
 def transform_attachment():
     from BaseDataMaintenance.model.ots.attachment import attachment
     from BaseDataMaintenance.model.postgres.attachment import Attachment_postgres

+ 82 - 7
BaseDataMaintenance/maxcompute/documentDumplicate.py

@@ -784,18 +784,26 @@ def check_money(bidding_budget_less,bidding_budget_greater,
                 moneys_less,moneys_greater,
                 moneys_attachment_less,moneys_attachment_greater):
 
+    bidding_budget_less_source = bidding_budget_less
+    bidding_budget_greater_source = bidding_budget_greater
+    win_bid_price_less_source = win_bid_price_less
+    win_bid_price_greater_source = win_bid_price_greater
     #只判断最高前六位
     if getLength(bidding_budget_less)>0:
+        bidding_budget_less_source = float(bidding_budget_less_source)
         bidding_budget_less = round(float(bidding_budget_less))
         bidding_budget_less = str(round(bidding_budget_less,6-len(str(bidding_budget_less))))
     if getLength(bidding_budget_greater)>0:
+        bidding_budget_greater_source = float(bidding_budget_greater_source)
         bidding_budget_greater = round(float(bidding_budget_greater))
         bidding_budget_greater = str(round(bidding_budget_greater,6-len(str(bidding_budget_greater))))
 
     if getLength(win_bid_price_less)>0:
+        win_bid_price_less_source = float(win_bid_price_less_source)
         win_bid_price_less = round(float(win_bid_price_less))
         win_bid_price_less = str(round(win_bid_price_less,6-len(str(win_bid_price_less))))
     if getLength(win_bid_price_greater)>0:
+        win_bid_price_greater_source = float(win_bid_price_greater_source)
         win_bid_price_greater = round(float(win_bid_price_greater))
         win_bid_price_greater = str(round(win_bid_price_greater,6-len(str(win_bid_price_greater))))
 
@@ -816,14 +824,21 @@ def check_money(bidding_budget_less,bidding_budget_greater,
                 budget_is_same = True
             if budget_less in moneys_greater or budget_less in moneys_attachment_greater:
                 budget_is_same = True
+            if bidding_budget_less_source in moneys_greater or bidding_budget_less_source in moneys_attachment_greater:
+                budget_is_same = True
             if budget_greater in moneys_less or budget_greater in moneys_attachment_less:
                 budget_is_same = True
+            if bidding_budget_greater_source in moneys_less or bidding_budget_greater_source in moneys_attachment_less:
+                budget_is_same = True
             if budget_is_same=="":
                 return False
 
     if getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
+
+
         price_less = float(win_bid_price_less)
         price_greater = float(win_bid_price_greater)
+
         if price_less!=price_greater:
 
             if min(price_less,price_greater)>0:
@@ -833,8 +848,12 @@ def check_money(bidding_budget_less,bidding_budget_greater,
                 price_is_same = True
             if price_less in moneys_greater or price_less in moneys_attachment_greater:
                 price_is_same = True
+            if win_bid_price_less_source in moneys_greater or win_bid_price_less_source in moneys_attachment_greater:
+                price_is_same = True
             if price_greater in moneys_less or price_greater in moneys_attachment_less:
                 price_is_same = True
+            if win_bid_price_greater_source in moneys_less or win_bid_price_greater_source in moneys_attachment_less:
+                price_is_same = True
             if price_is_same=="":
                 return False
     return True
@@ -955,6 +974,8 @@ def check_codes(project_codes_less,project_codes_greater):
 
     for project_code_less in project_codes_less:
         for project_code_greater in project_codes_greater:
+            project_code_less = str(project_code_less).upper()
+            project_code_greater = str(project_code_greater).upper()
             code_sim = getSimilarityOfString(project_code_less,project_code_greater)
             if project_code_less is not None and project_code_greater is not None:
                 if code_sim>0.6:
@@ -1076,11 +1097,26 @@ def check_doctitle(doctitle_refind_less, doctitle_refind_greater, codes_less=[],
                     return False
     return True
 
+
+def product_dump(list_product):
+    _product_l_l = []
+    list_product.sort(key=lambda x:len(x))
+    for _l in list_product:
+        _exists = False
+        for l1 in _product_l_l:
+            if l1 in _l:
+                _exists = True
+                break
+        if not _exists:
+            _product_l_l.append(_l)
+    return _product_l_l
 def check_product(product_less,product_greater,split_char=",",doctitle_refine_less='',doctitle_refine_greater=''):
     if getLength(product_less)>0 and getLength(product_greater)>0:
 
         _product_l = product_less.split(split_char)
+        _product_l = product_dump(_product_l)
         _product_g = product_greater.split(split_char)
+        _product_g = product_dump(_product_g)
         _title_l = doctitle_refine_less
         _title_g = doctitle_refine_greater
         same_count = 0
@@ -1100,11 +1136,28 @@ def check_product(product_less,product_greater,split_char=",",doctitle_refine_le
                 set_product_g_in_title.add(_g)
         # 限制标题出现的产品要有重叠
         if len(set_product_l_in_title)>0 and len(set_product_g_in_title)>0:
+
+            
             _set_union = set_product_l_in_title & set_product_g_in_title
-            if len(_set_union)==0:
-                return False
-            if len(_set_union)>0 and len(_set_union)!=len(set_product_l_in_title) and len(_set_union)!=len(set_product_g_in_title):
+
+            # 不同的部门若有重叠则通过
+            diff_l = set_product_l_in_title-_set_union
+            diff_g = set_product_g_in_title-_set_union
+
+            diff_dump = product_dump(list(diff_l.union(diff_g)))
+            if not(len(diff_dump)<=len(diff_l) or len(diff_dump)<=len(diff_g)):
                 return False
+
+            # 过于严格,暂时取消
+            # if len(_set_union)==0:
+            #     return False
+            # if len(_set_union)!=len(set_product_l_in_title) and len(_set_union)!=len(set_product_g_in_title):
+            #     _l1 = list(set_product_l_in_title)
+            #     _l2 = list(set_product_g_in_title)
+            #     _l1.extend(_l2)
+            #     _l1 = product_dump(_l1)
+            #     if len(_l1)!=len(_set_union):
+            #         return False
         for _l in _product_l:
             for _g in _product_g:
                 if getSimilarityOfString(_l,_g)>=0.8 or doctitle_refine_greater.find(_l)>=0 or doctitle_refine_less.find(_g)>=0:
@@ -1120,12 +1173,15 @@ def check_package(package_less,package_greater,split_char=","):
 
         _product_l = package_less.split(split_char)
         _product_g = package_greater.split(split_char)
+        same_level = False
         for _l in _product_l:
             for _g in _product_g:
+                if abs(len(_l)-len(_g))<=2:
+                    save_level = True
                 if _l==_g:
                     return True
-
-        return False
+        if same_level:
+            return False
     return True
 
 def check_time(json_time_less,json_time_greater):
@@ -1202,6 +1258,8 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
 
     #同一个站源,都有附件但附件没有重叠则不去重
     if web_source_no_less==web_source_no_greater and len(set_md5_less)>0 and len(set_md5_greater)>0 and len(set_md5_less&set_md5_greater)==0:
+        if b_log:
+            logging.info("same web_site,both has attach but not same web_source_no_less:%s,web_source_no_greater:%s"%(web_source_no_less,web_source_no_greater))
         return 0
 
     if isinstance(project_codes_less,str):
@@ -1274,8 +1332,12 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
         if _prob<0.1:
             _prob = 0.15
         if getLength(province_less)>0 and getLength(province_greater)>0 and province_less not in ("全国","未知") and province_greater not in ("全国","未知") and province_less!=province_greater:
+            if b_log:
+                logging.info("province not same:%s-%s"%(province_less,province_greater))
             return 0
     if _prob<0.1:
+        if b_log:
+            logging.info("prob too low:%f"%(_prob))
         return _prob
 
     check_result = {"pass":1}
@@ -1395,6 +1457,8 @@ def check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_
             check_result["time"] = 1
 
     if hard_level==2 and check_result["product"]<=1:
+        if b_log:
+            logging.inf("hard_level %s and check_product less than 2"%(str(hard_level)))
         return 0
     if check_result.get("pass",0)==0:
         if b_log:
@@ -1635,7 +1699,11 @@ class f_dumplicate_check(BaseUDTF):
             page_attachments_less = '[]'
         if page_attachments_greater is None:
             page_attachments_greater = '[]'
-        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
+        punish_less = _extract_less.get("punish",{})
+        punish_greater = _extract_greater.get("punish",{})
+        approval_less = _extract_less.get("approval",[])
+        approval_greater = _extract_greater.get("approval",[])
+        _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,min_counts,b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater,punish_less = punish_less,punish_greater = punish_greater,approval_less = approval_less,approval_greater = approval_greater)
         self.forward(_prob)
 
 @annotate("string,bigint,bigint,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string,string->string,double")
@@ -1815,6 +1883,8 @@ class f_redump_probability_final_check(BaseUDAF):
                 web_source_no_greater = document_greater["web_source_no"]
                 extract_json_greater = document_greater["extract_json"]
                 page_attachments_greater = document_greater["page_attachments"]
+
+
                 _pass = True
 
                 for document_less in final_group:
@@ -1859,7 +1929,12 @@ class f_redump_probability_final_check(BaseUDAF):
                     if page_attachments_greater is None:
                         page_attachments_greater = '[]'
 
-                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,len(the_group),b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater)
+                    punish_less = _extract_less.get("punish",{})
+                    punish_greater = _extract_greater.get("punish",{})
+                    approval_less = _extract_less.get("approval",[])
+                    approval_greater = _extract_greater.get("approval",[])
+
+                    _prob = check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,province_less,province_greater,city_less,city_greater,district_less,district_greater,len(the_group),b_log=False,web_source_no_less=web_source_no_less,web_source_no_greater=web_source_no_greater,moneys_less=moneys_less,moneys_greater=moneys_greater,moneys_attachment_less=moneys_attachment_less,moneys_attachment_greater=moneys_attachment_greater,page_attachments_less=page_attachments_less,page_attachments_greater=page_attachments_greater,punish_less = punish_less,punish_greater = punish_greater,approval_less = approval_less,approval_greater = approval_greater)
 
                     if _prob<0.1:
                         _pass = False

+ 113 - 1
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -1652,6 +1652,13 @@ def generate_common_properties(list_docs):
         extract_count = _doc.get(document_tmp_extract_count,0)
         candidates = _doc.get(document_candidates,"[]")
 
+        _province = _doc.get(document_province,"")
+        _city = _doc.get(document_city,"")
+        _district = _doc.get(document_district,"")
+
+        tenderee = _doc.get(document_tenderee,"")
+        agency = _doc.get(document_agency,"")
+
 
         try:
             set_nlp_enterprise |= set(json.loads(_doc.get(document_nlp_enterprise,"[]")))
@@ -1703,7 +1710,12 @@ def generate_common_properties(list_docs):
                               document_page_time:page_time,
                               document_status:201 if is_visuable==1 else 401,
                               "is_multipack":is_multipack,
-                              document_tmp_extract_count:extract_count
+                              document_tmp_extract_count:extract_count,
+                              document_tenderee:tenderee,
+                              document_agency:agency,
+                              document_province:_province,
+                              document_city:_city,
+                              document_district:_district
                               }
                              )
 
@@ -2990,6 +3002,105 @@ class MyEncoder(json.JSONEncoder):
             return obj
         return json.JSONEncoder.default(self, obj)
 
+def update_document_from_dynamic(_proj):
+    try:
+        list_dynamic = []
+        try:
+            list_dynamic = json.loads(_proj.get(project_project_dynamics,"[]"))
+        except Exception as e:
+            pass
+
+        dict_update_dict = {}
+        dict_column_count = {}
+        dict_addr_count = {}
+        for _dynamic in list_dynamic:
+            docid = _dynamic.get(document_docid)
+            tenderee = _dynamic.get(document_tenderee)
+            agency = _dynamic.get(document_agency)
+            province = _dynamic.get(document_province)
+            city = _dynamic.get(document_city)
+            district = _dynamic.get(document_district)
+
+
+            if getLength(tenderee)>0:
+                if tenderee not in dict_column_count:
+                    dict_column_count[tenderee] = {"count":1,"type":document_tenderee,"value":tenderee}
+                else:
+                    dict_column_count[tenderee]["count"] += 1
+            if getLength(agency)>0:
+                if agency not in dict_column_count:
+                        dict_column_count[agency] = {"count":1,"type":document_agency,"value":agency}
+                else:
+                    dict_column_count[agency]["count"] += 1
+
+            if province is not None and city is not None and district is not None:
+                addr = "%s%s%s"%(province,city,district)
+                if addr not in dict_addr_count:
+                    dict_addr_count[addr] = {"count":1}
+                    dict_addr_count[addr][document_province] = province
+                    dict_addr_count[addr][document_city] = city
+                    dict_addr_count[addr][document_district] = district
+                    if district!="":
+                        dict_addr_count[addr]["level"] = 3
+                    elif city!="":
+                        dict_addr_count[addr]["level"] = 2
+                    else:
+                        dict_addr_count[addr]["level"] = 1
+                else:
+                    dict_addr_count[addr]["count"] += 1
+
+        dict_list_v = {}
+        for k,v in dict_column_count.items():
+            _type = v.get("type")
+            if _type not in dict_list_v:
+                dict_list_v[_type] = []
+            dict_list_v[_type].append(v)
+        for k,v in dict_list_v.items():
+            v.sort(key=lambda x:x["count"],reverse=True)
+            if len(v)>0:
+                _proj[k] = v[0]["value"]
+                for _dynamic in list_dynamic:
+                    docid = _dynamic.get(document_docid)
+                    _v = _dynamic.get(k)
+                    if _v is not None and _v!="":
+                        if _v!=v[0]["value"]:
+                            if docid not in dict_update_dict:
+                                dict_update_dict[docid] = {document_docid:docid}
+                            dict_update_dict[docid][k] = v[0]["value"]
+        list_v = []
+        for k,v in dict_addr_count.items():
+            list_v.append(v)
+        list_v.sort(key=lambda x:x.get("count",0),reverse=True)
+        list_v.sort(key=lambda x:x.get("level",0),reverse=True)
+        if len(list_v)>0:
+            province = list_v[0].get(document_province)
+            city = list_v[0].get(document_city)
+            district = list_v[0].get(document_district)
+
+            _proj[document_province] = province
+            _proj[document_city] = city
+            _proj[document_district] = district
+            for _dynamic in list_dynamic:
+                docid = _dynamic.get(document_docid)
+
+                if document_province in _dynamic:
+                    if _dynamic.get(document_province,"")==province or _dynamic.get(document_province,"") in ("全国","未知",""):
+                        if province!=_dynamic.get(document_province,"") or city!=_dynamic.get(document_city,"") or district!=_dynamic.get(document_district,""):
+                            if docid not in dict_update_dict:
+                                dict_update_dict[docid] = {document_docid:docid}
+                            dict_update_dict[docid][document_province] = province
+                            dict_update_dict[docid][document_city] = city
+                            dict_update_dict[docid][document_district] = district
+        update_v = []
+        for k,v in dict_update_dict.items():
+            update_v.append(v)
+        _proj["document_update"] = update_v
+    except Exception as e:
+        pass
+
+
+
+
 def to_project_json(projects):
 
     list_proj = []
@@ -3025,6 +3136,7 @@ def to_project_json(projects):
             _proj.pop(project_uuid)
         if "project_uuid" in _proj:
             _proj.pop("project_uuid")
+        update_document_from_dynamic(_proj)
     return json.dumps(list_proj,cls=MyEncoder,ensure_ascii=False)
 
 def get_page_time_dis(page_time,n_page_time):

+ 4 - 2
BaseDataMaintenance/model/oracle/T_SHEN_PI_XIANG_MU.py

@@ -61,8 +61,10 @@ class T_SHEN_PI_XIANG_MU(BaseModel):
         new_dict.pop(T_SHEN_PI_XIANG_MU_ID)
 
         try:
-            new_dict[T_SHEN_PI_XIANG_MU_SOURCE_STAGE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_STAGE,0))
-            new_dict[T_SHEN_PI_XIANG_MU_SOURCE_TYPE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_TYPE,0))
+            if new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_STAGE) is not None:
+                new_dict[T_SHEN_PI_XIANG_MU_SOURCE_STAGE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_STAGE,0))
+            if new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_TYPE) is not None:
+                new_dict[T_SHEN_PI_XIANG_MU_SOURCE_TYPE] = int(new_dict.get(T_SHEN_PI_XIANG_MU_SOURCE_TYPE,0))
         except Exception as e:
             pass
 

+ 46 - 30
BaseDataMaintenance/model/ots/document.py

@@ -341,25 +341,25 @@ def turn_document_status():
         #
         # )
 
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet(["product","product_number"],return_type=ColumnReturnType.SPECIFIED))
-        list_data = getRow_ots(rows)
-        print(total_count)
-        _count = len(list_data)
-        for _data in list_data:
-            _document = Document_tmp(_data)
-            task_queue.put(_document)
-        while next_token:
-            rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
-                                                                           SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                           columns_to_get=ColumnsToGet(["product"],return_type=ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            _count += len(list_data)
-            print("%d/%d"%(_count,total_count))
-            for _data in list_data:
-                _document = Document_tmp(_data)
-                task_queue.put(_document)
+        # rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+        #                                                                columns_to_get=ColumnsToGet(["product","product_number"],return_type=ColumnReturnType.SPECIFIED))
+        # list_data = getRow_ots(rows)
+        # print(total_count)
+        # _count = len(list_data)
+        # for _data in list_data:
+        #     _document = Document_tmp(_data)
+        #     task_queue.put(_document)
+        # while next_token:
+        #     rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+        #                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+        #                                                                    columns_to_get=ColumnsToGet(["product"],return_type=ColumnReturnType.SPECIFIED))
+        #     list_data = getRow_ots(rows)
+        #     _count += len(list_data)
+        #     print("%d/%d"%(_count,total_count))
+        #     for _data in list_data:
+        #         _document = Document_tmp(_data)
+        #         task_queue.put(_document)
 
         # docids = [223820830,224445409]
         # for docid in docids:
@@ -371,17 +371,33 @@ def turn_document_status():
         # df = pd.read_excel(r"F:\Workspace2016\DataMining\data\2024-07-24_143135_数据导出.xlsx")
         # list_docid = df["docid"]
         # list_docid = [519497468]
-        # for docid in list_docid:
-        #     _dict = {document_docid:int(docid),
-        #              document_partitionkey:int(docid)%500+1,
-        #              }
-        #     task_queue.put(Document(_dict))
+
+        list_docid = []
+        filename = r"G:\新建文件夹\WeChat Files\wxid_kluerlj8cn3b21\FileStorage\File\2024-10\金额缺失的id (1).txt"
+        with open(filename,"r",encoding="utf8") as f:
+            while 1:
+                line = f.readline()
+                if not line:
+                    break
+                line = line.strip()
+                docid = line.split('-')[-1]
+                if re.search("^\d+$",docid) is not None:
+                    list_docid.append(int(docid))
+
+        for docid in list_docid:
+            _dict = {document_docid:int(docid),
+                     document_partitionkey:int(docid)%500+1,
+                     }
+            task_queue.put(Document(_dict))
         # for docid in df["docid2"]:
         #     _dict = {document_docid:int(docid),
         #              document_partitionkey:int(docid)%500+1,
         #              }
         #     task_queue.put(Document(_dict))
-        # log("task_queue size:%d"%(task_queue.qsize()))
+        log("task_queue size:%d"%(task_queue.qsize()))
+
+
+
 
     def _handle(item,result_queue,ots_client):
         #change attach value
@@ -409,11 +425,11 @@ def turn_document_status():
         # item.setValue(document_district,"金湾区",True)
         # item.setValue(document_status,66,True)
         # print(item.getProperties())
-        # item.setValue(document_status,1,True)
-        product = item.getProperties().get(document_product)
-        l_product = product.split(",")
-        n_product = ",".join(l_product[:500])
-        item.setValue(document_product,n_product,True)
+        item.setValue(document_status,1,True)
+        # product = item.getProperties().get(document_product)
+        # l_product = product.split(",")
+        # n_product = ",".join(l_product[:500])
+        # item.setValue(document_product,n_product,True)
         item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_docid)))
         pass

+ 5 - 5
BaseDataMaintenance/model/ots/document_tmp.py

@@ -268,7 +268,7 @@ def turn_document_tmp_status():
             must_queries=[
                 # TermQuery("fingerprint","md5=2cc044b81ec13acddcc970b71b780365")
                 # TermQuery("save",66),
-                RangeQuery("status",66),
+                RangeQuery("status",1,51),
                 # BoolQuery(should_queries=[
                 #                           # TermQuery("tenderee","山西利民工业有限责任公司"),
                 #                           # MatchPhraseQuery("doctitle","中国电信"),
@@ -283,7 +283,7 @@ def turn_document_tmp_status():
             ],
             must_not_queries=[
                 # TermQuery("docid",288599518)
-                ExistsQuery("doctitle"),
+                # ExistsQuery("doctitle"),
                 # ExistsQuery("page_time"),
                               ]
         )
@@ -350,14 +350,14 @@ def turn_document_tmp_status():
         # _extract_json = _extract_json.replace("\x06", "").replace("\x05", "").replace("\x07", "").replace('\\', '')
         # item.setValue(document_tmp_extract_json,_extract_json,True)
         # json.loads(_extract_json)
-        # item.setValue(document_tmp_status,71,True)
+        item.setValue(document_tmp_status,0,True)
         # item.setValue(document_tmp_save,1,True)
         # if item.exists_row(ots_client):
         #     item.update_row(ots_client)
         # print(item.getProperties())
-        # item.update_row(ots_client)
+        item.update_row(ots_client)
         # log("update %d status done"%(item.getProperties().get(document_tmp_docid)))
-        item.delete_row(ots_client)
+        # item.delete_row(ots_client)
         # from BaseDataMaintenance.model.ots.document import Document
         #
         # Doc = Document(item.getProperties())