Преглед на файлове

调整监控报警,特殊情况增加@所有人

luojiehua преди 2 години
родител
ревизия
3fa7504cbc

+ 82 - 14
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -48,6 +48,64 @@ class BaseDataMonitor():
         last_ten_minite_time = timeAdd(current_time,0,"%Y-%m-%d %H:%M:%S",-10)
         return last_ten_minite_time[:nums]
 
+    def check_document_uuid(self,log_filename):
+
+        def _handle(_item,result_queue):
+            bool_query = BoolQuery(must_queries=[TermQuery("uuid",_item.get("uuid"))])
+
+            rows,next_token,total_count,is_all_succeed = ots_client.search("document_tmp","document_tmp_index",
+                                                                           SearchQuery(bool_query,get_total_count=True),
+                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+            _item["exists"] = total_count
+        check_filename = "%s_check.xlsx"%(log_filename)
+        list_uuid = []
+        task_queue = Queue()
+        dict_tolong = {}
+        if not os.path.exists(check_filename) and os.path.exists(log_filename):
+            _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
+            _regrex_tolong = "msg too long:(?P<uuid>[^,]+),\d+"
+            with open(log_filename,"r",encoding="utf8") as f:
+                while 1:
+                    _line = f.readline()
+                    if not _line:
+                        break
+                    _match = re.search(_regrex,_line)
+                    if _match is not None:
+                        _uuid = _match.groupdict().get("uuid")
+                        tablename = _match.groupdict().get("tablename")
+                        if _uuid is not None:
+                            list_uuid.append({"uuid":_uuid,"tablename":tablename})
+                    _match = re.search(_regrex_tolong,_line)
+                    if _match is not None:
+                        _uuid = _match.groupdict().get("uuid")
+                        dict_tolong[_uuid] = 1
+
+
+            if list_uuid==0:
+                _msg = "数据遗漏检查出错"
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+            ots_client = getConnect_ots()
+
+            for _d in list_uuid:
+                task_queue.put(_d)
+            mt = MultiThreadHandler(task_queue,_handle,None,30)
+            mt.run()
+            df_data = {"uuid":[],
+                       "tablename":[],
+                       "exists":[],
+                       "tolong":[]}
+
+            for _data in list_uuid:
+                for k,v in df_data.items():
+                    if k!="tolong":
+                        v.append(_data.get(k))
+                df_data["tolong"].append(dict_tolong.get(_data["uuid"],0))
+            df2 = pd.DataFrame(df_data)
+            df2.to_excel(check_filename)
+
     def monitor_init(self):
 
         def _handle(_item,result_queue):
@@ -94,7 +152,7 @@ class BaseDataMonitor():
 
                 if list_uuid==0:
                     _msg = "数据遗漏检查出错"
-                    sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                    sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
                     # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
                 ots_client = getConnect_ots()
@@ -123,7 +181,7 @@ class BaseDataMonitor():
                     counts += 1
             if counts>0:
                 _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
@@ -131,7 +189,7 @@ class BaseDataMonitor():
 
         except Exception as e:
             _msg = "数据遗漏检查报错"
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
             # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
             traceback.print_exc()
 
@@ -234,7 +292,7 @@ class BaseDataMonitor():
             total_count_init = getQueueSize("dataflow_init")
             if total_count_init>=100:
                 _msg = "同步队列报警,有%s条数据滞留"%(str(total_count_init))
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=True)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
@@ -275,7 +333,11 @@ class BaseDataMonitor():
                 #                                                                              columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
                 _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s,查库免提取数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count),str(exists_count))
-                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                atAll=False
+                if success_count==0:
+                    atAll=True
+                    _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
                 # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
             traceback.print_exc()
@@ -283,10 +345,9 @@ class BaseDataMonitor():
     def monitor_proposedBuilding(self):
         current_date = getCurrent_date("%Y-%m-%d")
 
-        last_date = timeAdd(current_date,-1,"%Y-%m-%d")
 
         query = BoolQuery(must_queries=[
-            RangeQuery("update_time",last_date),
+            RangeQuery("update_time",current_date),
             WildcardQuery("docids","*")
         ])
 
@@ -296,7 +357,7 @@ class BaseDataMonitor():
 
 
         query = BoolQuery(must_queries=[
-            RangeQuery("update_time",last_date),
+            RangeQuery("update_time",current_date),
             WildcardQuery("spids","*")
         ])
 
@@ -305,8 +366,11 @@ class BaseDataMonitor():
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
         total_count = total_count_doc+total_count_sp
-        _msg = "拟在建生成报警:最近两天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
-        sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+        _msg = "拟在建生成报警:当天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
+        if total_count==0:
+            atAll=True
+            _msg += "\n疑似流程出现问题,请相关负责人尽快处理"
+        sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS,atAll=atAll)
         # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
@@ -537,10 +601,14 @@ class BaseDataMonitor():
 
 if __name__ == '__main__':
 
-    dm = BaseDataMonitor()
-    # dm.start_monitor()
-    dm.monitor_proposedBuilding()
-    print(dm.get_last_tenmin_time(16))
+    # dm = BaseDataMonitor()
+    # # dm.start_monitor()
+    # log_filename = "C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log"
+    # dm.check_document_uuid(log_filename)
+
+    sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)
+    # dm.monitor_proposedBuilding()
+    # print(dm.get_last_tenmin_time(16))
 
 
 

+ 9 - 3
BaseDataMaintenance/dataSource/interface.py

@@ -63,7 +63,9 @@ import requests
 
 ACCESS_TOKEN_SUANFA = "https://oapi.dingtalk.com/robot/send?access_token=eec7d420c64fc7eb037561f40c837c934205b1b7751ae36151d885948c1a4a1d"
 ACCESS_TOKEN_DATAWORKS = "https://oapi.dingtalk.com/robot/send?access_token=9489f01c4ab9f0c3f87e2ff5c3e35eb9fb0d17afb6244de4683596df1111daea"
-def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA):
+
+
+def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA,atAll=False):
     timestamp = str(round(time.time() * 1000))
     secret = 'SECb1c5d36f73fb7cd36f91c71cb05441a7bbdad872e051234a626c7d7ceba6ee6a'
     secret_enc = secret.encode('utf-8')
@@ -82,11 +84,15 @@ def sentMsgToDD(msg,access_token=ACCESS_TOKEN_SUANFA):
     data = {
         "msgtype": "text",
         "text": {"content": msg},
-        "isAtAll": True}
+        "isAtAll": False,
+        "at":{"isAtAll": atAll}
+    }
     res = requests.post(webhook, data=json.dumps(data), headers=headers)   #发送post请求
+    # print(res.status_code)
 
 
 
 if __name__=="__main__":
-    print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment/readme.md","rb").read()),"pdf"))
+    # print(getAttachDealInterface(base64.b64encode(open("F://Workspace2016/BaseDataMaintenance/BaseDataMaintenance/maintenance/attachment/readme.md","rb").read()),"pdf"))
     # sentMsgToDD("测试消息")
+    sentMsgToDD("报警test_msg",ACCESS_TOKEN_DATAWORKS)

+ 2 - 2
BaseDataMaintenance/dataSource/setttings.py

@@ -37,8 +37,8 @@ neo4j_port = 7687
 neo4j_user = "bxkc_web"
 neo4j_pass = "bxkc_web"
 
-# oracle_host = "121.46.18.113"
-# oracle_port = 10522
+oracle_host = "121.46.18.113"
+oracle_port = 10522
 oracle_host = "192.168.0.150"
 oracle_port = 1522
 # oracle_user = "bxkc_data_readonly"

+ 23 - 5
BaseDataMaintenance/maintenance/dataflow.py

@@ -2294,7 +2294,7 @@ class Dataflow_dumplicate(Dataflow):
         fingerprint_greater = document_greater["fingerprint"]
         extract_count_greater = document_greater["extract_count"]
 
-        return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=False)
+        return check_dumplicate_rule(docid_less,docid_greater,fingerprint_less,fingerprint_greater,project_codes_less,project_codes_greater,tenderee_less,tenderee_greater,agency_less,agency_greater,win_tenderer_less,win_tenderer_greater,bidding_budget_less,bidding_budget_greater,win_bid_price_less,win_bid_price_greater,project_name_less,project_name_greater,doctitle_refine_less,doctitle_refine_greater,extract_count_less,extract_count_greater,docchannel_less,docchannel_greater,page_time_less,page_time_greater,product_less,product_greater,nlp_enterprise_less,nlp_enterprise_greater,package_less,package_greater,json_time_less,json_time_greater,min_counts,b_log=b_log)
 
 
     def dumplicate_check_bak(self,_dict1,_dict2,min_counts,b_log=False):
@@ -2793,7 +2793,7 @@ class Dataflow_dumplicate(Dataflow):
                 # TermQuery("docid",271983871)
             ])
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
+                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
                                                                                 ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
             log("flow_dumplicate producer total_count:%d"%total_count)
             list_dict = getRow_ots(rows)
@@ -3912,7 +3912,6 @@ class Dataflow_dumplicate(Dataflow):
                     self.changeSaveStatus(remove_list)
 
                 # print(dtmp.getProperties())
-                dmp_docid = ",".join([str(a) for a in list(dup_docid)])
                 dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
                 dtmp.setValue(document_tmp_best_docid,best_docid,True)
                 dtmp.update_row(self.ots_client)
@@ -4021,6 +4020,22 @@ class Dataflow_dumplicate(Dataflow):
             self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
             return
 
+    def test_merge(self,list_docid_less,list_docid_greater):
+        list_docs_less = self.search_docs(list_docid_less)
+        list_projects_less = self.generate_projects_from_document(list_docs_less)
+
+        print("======list_projects_less",list_projects_less)
+        list_docs_greater = self.search_docs(list_docid_greater)
+        print("==list_docs_greater",[a.getProperties() for a in list_docs_greater])
+        list_projects_greater = self.generate_projects_from_document(list_docs_greater)
+
+        print("=========list_projects_greater",list_projects_greater)
+        list_projects_less.extend(list_projects_greater)
+        list_projects = dumplicate_projects(list_projects_less,b_log=True)
+        project_json = to_project_json(list_projects)
+        log("project_json:%s"%project_json)
+        return project_json
+
     def getRemainDoc(self,docid):
         columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
         bool_query = BoolQuery(must_queries=[
@@ -4074,6 +4089,8 @@ class Dataflow_dumplicate(Dataflow):
             return best_docid
         return None
 
+
+
 if __name__ == '__main__':
     # df = Dataflow()
     # df.flow_init()
@@ -4090,8 +4107,9 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    # df_dump.test_dumplicate(293126454)
-    df_dump.flow_remove_project_tmp()
+    df_dump.test_dumplicate(268920229)
+    # df_dump.test_merge([292315564],[287890754])
+    # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)
     # df_dump.fix_doc_which_not_in_project()
     # df_dump.delete_projects_by_document(16288036)

+ 9 - 3
BaseDataMaintenance/maintenance/dataflow_mq.py

@@ -758,6 +758,10 @@ class Dataflow_ActivteMQ_extract(Dataflow_extract):
                 data[k] = v
             data["timeout"] = 440
             data["doc_id"] = data.get(document_tmp_docid,0)
+            # if data["docid"]<298986054 and data["docid"]>0:
+            #     log("jump docid %s"%(str(data["docid"])))
+            #     ackMsg(conn,message_id,subscription)
+            #     return
             data["content"] = data.get(document_tmp_dochtmlcon,"")
             if document_tmp_dochtmlcon in data:
                 data.pop(document_tmp_dochtmlcon)
@@ -1480,12 +1484,14 @@ def fixDoc_to_queue_init(filename=""):
     row_name = ",".join(list(dict_oracle2ots.keys()))
     conn = getConnection_oracle()
     cursor = conn.cursor()
+    _count = 0
     for uuid,tablename,_exists in zip(df["uuid"],df["tablename"],df["exists"]):
         if _exists==0:
+            _count += 1
             _source = str(tablename).replace("_TEMP","")
             sql = " insert into %s(%s) select %s from %s where id='%s' "%(tablename,row_name,row_name,_source,uuid)
             cursor.execute(sql)
-            log(sql)
+            log("%d:%s"%(_count,sql))
     conn.commit()
 
 if __name__ == '__main__':
@@ -1496,5 +1502,5 @@ if __name__ == '__main__':
     # de = Dataflow_ActivteMQ_extract()
     # de.start_flow_extract()
     # fixDoc_to_queue_extract()
-    check_data_synchronization()
-    fixDoc_to_queue_init()
+    # check_data_synchronization()
+    fixDoc_to_queue_init(filename="C:\\Users\\Administrator\\Desktop\\flow_init_2023-02-03.log_check.xlsx")

+ 2 - 0
BaseDataMaintenance/maintenance/project/transfer_project_to_tmp.py

@@ -0,0 +1,2 @@
+
+

+ 118 - 106
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2056,7 +2056,7 @@ def appendKeyvalueCount(list_projects,keys=[project_tenderee,project_agency,proj
         for k in keys:
             v = _proj.get(k,"")
             if isinstance(v,str):
-                if not (v is None or v==""):
+                if v is not None and v!="":
                     _count += 1
             elif isinstance(v,(int,float)):
                 if v>0:
@@ -2079,7 +2079,6 @@ def dumplicate_projects(list_projects,b_log=False):
         # log("================")
         # for _p in cluster_projects:
         #     log("docids:%s"%(_p.get(project_docids,"")))
-
         for _pp in cluster_projects:
             _find = False
             list_prob = []
@@ -2094,13 +2093,14 @@ def dumplicate_projects(list_projects,b_log=False):
                     update_projects_by_project(_pp,[_p])
                     _find = True
                     _update = True
-                    break
             if not _find:
                 list_p.append(_pp)
 
         if len(cluster_projects)==len(list_p):
             break
         cluster_projects = list_p
+
+
     return cluster_projects
 
 def update_projects_by_project(project_dict,projects):
@@ -2287,6 +2287,7 @@ def check_dynamics_title_merge(project_dynamics,project_dynamics_to_merge,b_log)
 
                     _title2 = re.sub(r'项目|工程|服务|询价|比价|谈判|竞争性|磋商|结果|中标|招标|采购|的|公示|公开|成交|公告|评标|候选人|交易|通知|废标|流标|终止|中止|一笔|预告|单一来源|询价|竞价|合同', '',  _title2)
                     _sim = getSimilarityOfString(_title1,_title2)
+                    # log("title1,title2 %s==%s"%(_title1,_title2))
                     if _sim>0.8:
                         return 1
                     if len(_title1)>15 and len(_title2)>15:
@@ -2335,12 +2336,16 @@ def check_zhaozhong_page_time_merge(zhao_biao_page_time,zhong_biao_page_time,zha
 
 def check_sub_project_name_merge(sub_project_name,sub_project_name_to_merge,b_log):
     #check sub_project_name
-    _set = set([a for a in [sub_project_name.replace("Project",""),sub_project_name_to_merge.replace("Project","")] if a!=""])
-    if len(_set)>1:
-        if b_log:
-            log("check sub_project_name failed %s===%s"%(str(sub_project_name),str(sub_project_name_to_merge)))
-        return -1
-    return 1
+    sub_project_name = str(sub_project_name).replace("Project","")
+    sub_project_name_to_merge = str(sub_project_name_to_merge).replace("Project","")
+    _set = set([a for a in [sub_project_name,sub_project_name_to_merge] if a!=""])
+    if sub_project_name!="" and sub_project_name_to_merge!="":
+        if len(_set)>1:
+            if b_log:
+                log("check sub_project_name failed %s===%s"%(str(sub_project_name),str(sub_project_name_to_merge)))
+            return -1
+        return 1
+    return 0
 
 def check_roles_merge(enterprise,enterprise_to_merge,tenderee,tenderee_to_merge,agency,agency_to_merge,win_tenderer,win_tenderer_to_merge,b_log):
     _set1 = set([a for a in [tenderee,tenderee_to_merge] if a!=""])
@@ -2374,7 +2379,8 @@ def check_roles_merge(enterprise,enterprise_to_merge,tenderee,tenderee_to_merge,
                     log("check win_tenderer failed %s===%s"%(str(win_tenderer),str(win_tenderer_to_merge)))
                 return -1
     if len(_set1)+len(_set2)+len(_set3)>=2:
-        return 1
+        if (tenderee!="" or agency!="" or win_tenderer!="") and (tenderee_to_merge!="" or agency_to_merge!="" or win_tenderer_to_merge!=""):
+            return 1
     return 0
 
 def check_money_merge(bidding_budget,bidding_budget_to_merge,win_bid_price,win_bid_price_to_merge,b_log):
@@ -2444,6 +2450,7 @@ def check_project_codes_merge(list_code,list_code_to_merge,b_log):
         return 1
     return 0
 
+
 def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=False,simple_check=False):
     docids = _proj.get(project_docids,"")
     page_time = _proj.get(project_page_time,"")
@@ -2513,7 +2520,7 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
 
 
     check_dict = {0:0,1:0,-1:0}
-
+    prob_count = 0
     #时间判断-招中标时间
     _zhaozhong_check = check_zhaozhong_page_time_merge(zhao_biao_page_time,zhong_biao_page_time,zhao_biao_page_time_to_merge,zhong_biao_page_time_to_merge,_proj,_dict,b_log)
     check_dict[_zhaozhong_check] += 1
@@ -2529,6 +2536,7 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
         if return_prob:
             return False,0
         return False
+    prob_count += _money_check
 
     #人物判断-角色
     _roles_check = check_roles_merge(enterprise,enterprise_to_merge,tenderee,tenderee_to_merge,agency,agency_to_merge,win_tenderer,win_tenderer_to_merge,b_log)
@@ -2537,23 +2545,23 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
         if return_prob:
             return False,0
         return False
-
+    prob_count += _roles_check
     _product_check = check_product_merge(product,product_to_merge,b_log)
 
-
+    prob_count += _product_check*2
     _project_name_check = check_project_name_merge(project_name,project_name_to_merge,b_log)
-
+    prob_count += _project_name_check
     _title_check = check_dynamics_title_merge(project_dynamics,project_dynamics_to_merge,b_log)
-
+    prob_count += _title_check
     min_count = 2
     if product=="" or product_to_merge=="":
         min_count = 1
     #事件判断--产品和名称、标题需要满足两个个
-    if _project_name_check+_product_check+_title_check<min_count:
+    if max(_project_name_check,0)+max(_product_check,0)+max(_title_check,0)<min_count:
         if b_log:
             log("project_name,project_name_to_merge %s %s"%(project_name,project_name_to_merge))
             log("product,product_to_merge %s %s"%(product,product_to_merge))
-            log("check _project_name_check+_product_check+_title_check<2 failed %d"%(_project_name_check+_product_check+_title_check))
+            log("check _project_name_check+_product_check+_title_check<2 failed %d %s,%s,%s"%(_project_name_check+_product_check+_title_check,str(_project_name_check),str(_product_check),str(_title_check)))
         if return_prob:
             return False,0
         return False
@@ -2574,6 +2582,7 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
         if return_prob:
             return False,0
         return False
+    prob_count += _codes_check
 
     #时间判断-其他时间
     _time_check = check_time_merge(_proj,_dict,b_log)
@@ -2582,14 +2591,22 @@ def check_merge_rule(_proj,_dict,b_log=False,time_limit=86400*200,return_prob=Fa
     #时间判断-分包编号
     _sub_project_name_check = check_sub_project_name_merge(sub_project_name,sub_project_name_to_merge,b_log)
     check_dict[_sub_project_name_check] += 1
-
+    prob_count += _sub_project_name_check
 
     #时间判断-发布时间
     _page_time_check = check_page_time_merge(page_time,page_time_to_merge,b_log,time_limit)
     check_dict[_page_time_check] += 1
 
 
-    _prob = check_dict[1]/(check_dict[-1]+check_dict[0]+check_dict[1])
+    _prob = prob_count/8
+
+    if _prob<0.15:
+        if b_log:
+            log("prob less than 0.15")
+        if return_prob:
+            return False,_prob
+        return False
+
     if b_log:
         log("check %s-%s result%s"%(docids,docids_to_merge,str(check_dict)))
     if check_dict[-1]>0:
@@ -2649,7 +2666,7 @@ class f_group_merge_projects(BaseUDAF):
                 for _j in range(_i+1,len(_group)):
                     _p_uuid,_,_p = _group[_i]
                     _pp_uuid,_,_pp = _group[_j]
-                    if check_merge_rule(_p,_pp,False,simple_check=True):
+                    if check_merge_rule(_p,_pp,False):
                         list_group_pair.append([_p_uuid,_pp_uuid])
             if len(list_group_pair)>0:
                 list_group_data.append(list_group_pair)
@@ -2901,6 +2918,8 @@ class f_dumplicate_projects(BaseUDAF):
         set_uuid = set()
         list_data = []
         for uuid_1,attrs_json in buffer[0]:
+            if attrs_json is None:
+                continue
             if uuid_1 in set_uuid:
                 continue
             list_data.append(json.loads(attrs_json))
@@ -2928,7 +2947,8 @@ class f_generate_project_with_attrs_json(BaseUDTF):
     def process(self,attrs_json):
         if attrs_json is not None:
             _group = json.loads(attrs_json)
-            self.forward(json.dumps([_group],ensure_ascii=False))
+            project_json = to_project_json([_group])
+            self.forward(project_json)
 
 @annotate('string -> string')
 class f_generate_project_with_delete_uuid(BaseUDTF):
@@ -2946,7 +2966,7 @@ class f_generate_project_with_delete_uuid(BaseUDTF):
         if delete_uuid is not None:
             _group = {project_delete_uuid:delete_uuid,
                       "to_delete":True}
-            self.forward(json.dumps([_group]),ensure_ascii=False)
+            self.forward(json.dumps([_group],ensure_ascii=False))
 
 def test_remerge():
     a = f_remege_limit_num_contain_bychannel()
@@ -3021,104 +3041,96 @@ class f_extract_year_win_and_price(BaseUDTF):
 
 def test_merge_rule():
     o_a = {
-        "bidding_budget":2022,
+        "bidding_budget":0,
         "bidding_budget_unit":"",
-        "second_bid_price":0,
-        "second_bid_price_unit":"",
-        "second_service_time":"",
-        "second_tenderer":"丹江口市金智恒贸易有限宏茗Verito",
         "sub_project_code":"",
         "sub_project_name":"Project",
-        "win_bid_price":4950,
-        "win_bid_price_unit":"万元",
+        "win_bid_price":0,
+        "win_bid_price_unit":"",
         "win_service_time":"",
-        "win_tenderer":"丹江口市方谊电脑网络有限公司",
-        "win_tenderer_manager":"汤蕙冰",
-        "win_tenderer_phone":"07195232489",
-        "district":"丹江口",
-        "city":"十堰",
-        "province":"湖北",
-        "area":"华中",
-        "industry":"通用设备",
-        "info_type":"计算机设备",
-        "info_source":"政府采购",
-        "qcodes": "",
-        "project_name":"丹江口市交通运输局财务专用电脑采购",
-        "project_code":"丹采计备【2022】XY0002号",
-        "tenderee":"丹江口市交通运输局",
-        "tenderee_addr": "",
-        "tenderee_phone":"0719-5222536",
-        "agency":"丹江口市交通运输局",
-        "agency_phone":"0719-5222536",
-        "procurement_system":"交通系统",
-        "time_bidopen":"2022-04-02",
-        "extract_count":0,
-        "project_dynamic":"[{\"docid\": 230964885, \"doctitle\": \"丹江口市交通运输局财务专用电脑采购中标(成交)结果公告\", \"docchannel\": 101, \"bidway\": \"\", \"page_time\": \"2022-04-03\", \"status\": 201, \"is_multipack\": false, \"extract_count\": 0}]",
+        "win_tenderer":"日照华中机电贸易有限公司",
+        "district":"未知",
+        "city":"日照",
+        "province":"山东",
+        "area":"华东",
+        "industry":"建筑建材",
+        "info_type":"有色金属冶炼及压延产品",
+        "info_source":"企业采购",
+        "qcodes":"",
+        "project_code":"DLGCB-X001302",
+        "tenderee":"日照港通通信工程有限公司动力分公司",
+        "procurement_system":"企业采购系统",
+        "time_release":"2020-05-22",
+        "extract_count":3,
+        "project_dynamic":"[{\"docid\": 99800062, \"doctitle\": \"DLGCB-X001302\", \"docchannel\": 101, \"bidway\": \"\", \"page_time\": \"2020-05-22\", \"status\": 201, \"is_multipack\": false, \"extract_count\": 3}]",
         "docid_number":1,
-        "docids":"230964885",
-        "zhong_biao_page_time":"2022-04-03",
-        "project_codes":"2022001,BJ2022040280753,丹采计备【2022】XY0002号",
-        "page_time":"2022-04-03",
-        "product":"躁魉鼙锼鹅缝,交通运输躅台式电脑舍,台式计算机(强制节能),财务专用电脑,台式电脑,办公设备",
-        "nlp_enterprise":"[]",
+        "docids":"99800062",
+        "zhong_biao_page_time":"2020-05-22",
+        "project_codes":"DLGCB-X001302",
+        "page_time":"2020-05-22",
+        "product":"铜辫子",
+        "nlp_enterprise":"[\"日照华中机电贸易有限公司\", \"乐清\", \"日照港通通信工程有限公司动力分公司\"]",
         "nlp_enterprise_attachment":"[]",
-        "delete_uuid":"5aa174e2-859b-4ea9-8d64-5f2174886084",
-        "keyvaluecount":6,
-        "dup_docid":"",
-        "keep_uuid":""
+        "delete_uuid":"03f60e46-3036-4f2a-a4bb-f5a326c2755e"
     }
     o_b = {
         "bidding_budget":0,
         "bidding_budget_unit":"",
         "sub_project_code":"",
         "sub_project_name":"Project",
-        "win_bid_price":4950,
-        "win_bid_price_unit":"万元",
-        "win_service_time":"",
-        "win_tenderer":"丹江口市方谊电脑网络有限公司",
-        "district":"丹江口",
-        "city":"十堰",
-        "province":"湖北",
-        "area":"华中",
-        "industry":"通用设备",
-        "info_type":"计算机设备",
-        "info_source":"工程建设",
-        "qcodes": "",
-        "project_name":"丹江口市交通运输局财务专用电脑采购",
-        "project_code":"丹采计备【2022】XY0002号",
-        "tenderee":"丹江口市交通运输局",
-        "tenderee_addr": "",
-        "tenderee_phone":"07195222536",
-        "tenderee_contact":"洪书梅",
-        "agency":"丹江口市交通运输局",
-        "agency_phone":"07195222536",
-        "agency_contact":"洪书梅",
-        "procurement_system":"交通系统",
-        "time_bidopen":"2022-04-02",
-        "extract_count":0,
-        "project_dynamic":"[{\"docid\": 232857494, \"doctitle\": \"丹江口市交通运输局交通运输局财务专用电脑采购合同公告\", \"docchannel\": 120, \"bidway\": \"询价\", \"page_time\": \"2022-04-12\", \"status\": 201, \"is_multipack\": false, \"extract_count\": 0}, {\"docid\": 234180491, \"doctitle\": \"丹江口市交通运输局财务专用电脑采购中标(成交)结果公告\", \"docchannel\": 101, \"bidway\": \"\", \"page_time\": \"2022-04-19\", \"status\": 201, \"is_multipack\": false, \"extract_count\": 0}]",
-        "docid_number":2,
-        "docids":"232857494,234180491",
-        "zhong_biao_page_time":"2022-04-19",
-        "project_codes":"2022001,丹采计备【2022】XY0002号,20220402271923",
-        "page_time":"2022-04-19",
-        "product":"财务专用电脑,台式电脑",
+        "district":"未知",
+        "city":"日照",
+        "province":"山东",
+        "area":"华东",
+        "industry":"建筑建材",
+        "info_type":"有色金属冶炼及压延产品",
+        "info_source":"企业采购",
+        "qcodes":"",
+        "project_code":"DLGCB-X001302",
+        "tenderee":"日照港通通信工程有限公司动力分公司",
+        "procurement_system":"企业采购系统",
+        "time_release":"2020-05-19",
+        "extract_count":2,
+        "project_dynamic":"[{\"docid\": 99403871, \"doctitle\": \"DLGCB-X001302\", \"docchannel\": 52, \"bidway\": \"\", \"page_time\": \"2020-05-19\", \"status\": 201, \"is_multipack\": false, \"extract_count\": 2}]",
+        "docid_number":1,
+        "docids":"99403871",
+        "zhao_biao_page_time":"2020-05-19",
+        "project_codes":"DLGCB-X001302",
+        "page_time":"2020-05-19",
+        "product":"铜辫子",
+        "nlp_enterprise":"[\"日照港通通信工程有限公司动力分公司\"]",
+        "nlp_enterprise_attachment":"[]",
+        "delete_uuid":"03f60e46-3036-4f2a-a4bb-f5a326c2755e"
+    }
+    o_c = {
+        "district":"未知",
+        "city":"日照",
+        "province":"山东",
+        "area":"华东",
+        "industry":"建筑建材",
+        "info_type":"有色金属冶炼及压延产品",
+        "info_source":"企业采购",
+        "qcodes":"",
+        "project_code":"ZBCGZX-X039338",
+        "tenderee_addr":"",
+        "procurement_system":"",
+        "extract_count":1,
+        "project_dynamic":"[{\"docid\": 110153883, \"doctitle\": \"ZBCGZX-X039338\", \"docchannel\": 101, \"bidway\": \"\", \"page_time\": \"2020-08-31\", \"status\": 201, \"is_multipack\": false, \"extract_count\": 1}]",
+        "docid_number":1,
+        "docids":"110153883",
+        "zhong_biao_page_time":"2020-08-31",
+        "project_codes":"ZBCGZX-X039338",
+        "page_time":"2020-08-31",
+        "product":"",
         "nlp_enterprise":"[]",
         "nlp_enterprise_attachment":"[]",
-        "delete_uuid":"b2a2594c-764d-46c2-9717-80307b63937c",
-        "keyvaluecount":5,
-        "win_tenderer_manager":"",
-        "win_tenderer_phone":"13329854499",
-        "bidway":"询价",
-        "time_release":"2022-04-12",
-        "dup_docid":"",
-        "keep_uuid":""
+        "delete_uuid":"4b4967be-b387-4259-9eb4-cd228a6b223f"
     }
-
-    print(check_merge_rule(o_a,o_b,True))
+    # print(check_merge_rule(o_a,o_b,True))
+    print(dumplicate_projects([o_a,o_b,o_c],True))
 
 if __name__ == '__main__':
-    # test_merge_rule()
-    a = uuid4()
-    print(str(a))
-    print(to_project_json([{"keep_uuid":"123"}]))
+    test_merge_rule()
+    # a = uuid4()
+    # print(str(a))
+    # print(to_project_json([{"keep_uuid":"123"}]))

+ 13 - 0
BaseDataMaintenance/model/ots/project2_tmp.py

@@ -0,0 +1,13 @@
+
+from BaseDataMaintenance.model.ots.project import *
+
+class Project_tmp(Project):
+
+    def __init__(self,_dict):
+
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "project2_tmp"
+
+    def getPrimary_keys(self):
+        return [project_uuid]

+ 8 - 2
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py

@@ -167,6 +167,12 @@ class proposedBuilding_tmp(BaseModel):
 
         set_contact = set()
 
+        return_tenderee = ""
+        for _group in list_group:
+            # add contacts of tenderee
+            return_tenderee = _group.get("tenderee","")
+
+
         for _group in list_group:
             # add contacts of tenderee
             _tenderee = _group.get("tenderee")
@@ -216,7 +222,7 @@ class proposedBuilding_tmp(BaseModel):
 
             if document_dict is not None:
                 _tenderee = document_dict.get("tenderee","")
-                if _tenderee!="":
+                if return_tenderee=="" and  _tenderee!="":
                     enterprise_dict = self.getEnterprise(_tenderee,["province","contacts","address","contacts1","contacts2","contacts3","contacts4","contacts5"])
                     if enterprise_dict is None:
                         enterprise_dict = {}
@@ -485,7 +491,7 @@ class proposedBuilding_tmp(BaseModel):
         if page_time=="":
             page_time = update_time
         legal_contacts = []
-        # list_follows = self.getFollows(list_group,legal_contacts,set_enterprise)
+        list_follows = self.getFollows(list_group,legal_contacts,set_enterprise)
         legal_contacts,tenderee_count = self.getContacts(ots_client,list_group,set_enterprise,legal_contacts)
         if len(legal_contacts)==0:
             return None

+ 1 - 1
BaseDataMaintenance/start_dataflow_dumplicate.py

@@ -7,5 +7,5 @@ from BaseDataMaintenance.maintenance.dataflow import *
 
 if __name__ == '__main__':
     # flow = Dataflow()
-    flow = Dataflow_dumplicate(start_delete_listener=False)
+    flow = Dataflow_dumplicate(start_delete_listener=True)
     flow.start_flow_dumplicate()

+ 8 - 0
BaseDataMaintenance/test/2.py

@@ -0,0 +1,8 @@
+
+
+import requests
+
+print(requests.post("http://www.baidu.com",timeout=1.2))
+a = []
+
+print(",".join(a).split(","))

+ 1 - 1
BaseDataMaintenance/test/ab.py

@@ -10,7 +10,7 @@ from BaseDataMaintenance.dataSource.source import getConnect_ots
 
 import json
 ots_client = getConnect_ots()
-docid = 291120546
+docid = 244774657
 a = {"name":"双腿链条索具&DSL-WLL3TON\载荷3t"}
 test_str = json.dumps(a,ensure_ascii=False)
 print(test_str)

+ 84 - 0
BaseDataMaintenance/test/turn_table_define.py

@@ -0,0 +1,84 @@
+
+
+a = '''
+project_uuid = "uuid"
+project_docids = "docids"
+project_zhao_biao_page_time = "zhao_biao_page_time"
+project_zhong_biao_page_time = "zhong_biao_page_time"
+project_page_time = "page_time"
+project_doctextcon = "doctextcon"
+project_doctitles = "doctitles"
+project_attachmenttextcon = "attachmenttextcon"
+project_area = "area"
+project_province = "province"
+project_city = "city"
+project_district = "district"
+project_info_type = "info_type"
+project_industry = "industry"
+project_qcodes = "qcodes"
+project_project_name = "project_name"
+project_project_code = "project_code"
+project_project_codes = "project_codes"
+project_project_addr = "project_addr"
+project_tenderee = "tenderee"
+project_tenderee_addr = "tenderee_addr"
+project_tenderee_phone = "tenderee_phone"
+project_tenderee_contact = "tenderee_contact"
+project_agency = "agency"
+project_agency_phone = "agency_phone"
+project_agency_contact = "agency_contact"
+project_sub_project_name = "sub_project_name"
+project_sub_project_code = "sub_project_code"
+project_bidding_budget = "bidding_budget"
+project_win_tenderer = "win_tenderer"
+project_win_bid_price = "win_bid_price"
+project_win_tenderer_manager = "win_tenderer_manager"
+project_win_tenderer_phone = "win_tenderer_phone"
+project_second_tenderer = "second_tenderer"
+project_second_bid_price = "second_bid_price"
+project_second_tenderer_manager = "second_tenderer_manager"
+project_second_tenderer_phone = "second_tenderer_phone"
+project_third_tenderer = "third_tenderer"
+project_third_bid_price = "third_bid_price"
+project_third_tenderer_manager = "third_tenderer_manager"
+project_third_tenderer_phone = "third_tenderer_phone"
+project_procurement_system = "procurement_system"
+project_bidway = "bidway"
+project_dup_data = "dup_data"
+project_docid_number = "docid_number"
+project_project_dynamics = "project_dynamic"
+project_product = "product"
+
+project_moneysource = "moneysource"
+project_service_time = "service_time"
+project_time_bidclose = "time_bidclose"
+project_time_bidopen = "time_bidopen"
+project_time_bidstart = "time_bidstart"
+project_time_commencement = "time_commencement"
+project_time_completion = "time_completion"
+project_time_earnest_money_start = "time_earnest_money_start"
+project_time_earnest_money_end = "time_earnest_money_end"
+project_time_get_file_end = "time_get_file_end"
+project_time_get_file_start = "time_get_file_start"
+project_time_publicity_end = "time_publicity_end"
+project_time_publicity_start = "time_publicity_start"
+project_time_registration_end = "time_registration_end"
+project_time_registration_start = "time_registration_start"
+project_time_release = "time_release"
+
+project_dup_docid = "dup_docid"
+project_info_source = "info_source"
+
+project_nlp_enterprise = "nlp_enterprise"
+project_nlp_enterprise_attachment = "nlp_enterprise_attachment"
+'''
+
+p = '".+"'
+import re
+
+print(re.findall(p,a))
+
+c = [d.replace("\"","") for d in re.findall(p,a)]
+for _c in c:
+    print(_c)
+print(",".join(c))