Pārlūkot izejas kodu

拟在建入库改造,完善follow生成,联系人生成

luojiehua 2 gadi atpakaļ
vecāks
revīzija
95d42ef0ea

+ 1 - 1
BaseDataMaintenance/common/Utils.py

@@ -557,7 +557,7 @@ def getIndexOfWords(words):
         return vocab_words["<pad>"]
 
 def isCellphone(phone):
-    if re.search("^1\d{10}$",phone) is not None:
+    if phone is not None and re.search("^1\d{10}$",str(phone)) is not None:
         return True
     return False
 

+ 3 - 1
BaseDataMaintenance/common/ossUtils.py

@@ -105,4 +105,6 @@ def test_download(filemd5):
 
 if __name__=="__main__":
     # print(getMDFFromFile('1623894475151.pdf'))
-    test_download("0852ca62c6e3da56a89a02ed4af87724")
+    # test_download("0852ca62c6e3da56a89a02ed4af87724")
+    print(bucket.sign_url("GET","0015//20220623/2022-06-22/WGH001018/1655926900020.png",86500*30))
+    print(time.strftime("%Y-%m-%d",time.localtime(1658655178)))

+ 45 - 20
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -95,7 +95,7 @@ class BaseDataMonitor():
                 if list_uuid==0:
                     _msg = "数据遗漏检查出错"
                     sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-                    sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+                    # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
                 ots_client = getConnect_ots()
 
@@ -124,7 +124,7 @@ class BaseDataMonitor():
             if counts>0:
                 _msg = "数据遗漏检查报警,%s有%s条公告遗漏,详见%s"%(last_date,str(counts),check_filename)
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-                sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
 
@@ -132,7 +132,7 @@ class BaseDataMonitor():
         except Exception as e:
             _msg = "数据遗漏检查报错"
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
             traceback.print_exc()
 
 
@@ -180,7 +180,7 @@ class BaseDataMonitor():
 
                 _msg = "附件提取队列报警:队列堆积%s条公告,最近十分钟处理公告附件数:%s,处理成功数:%s"%(str(total_count_todeal),str(process_count),str(process_succeed_count))
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-                sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
             traceback.print_exc()
 
@@ -194,6 +194,13 @@ class BaseDataMonitor():
             # rows,next_token,total_count_todeal,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
             #                                                                     SearchQuery(query,None,True),
             #                                                                     columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+            total_count_init = getQueueSize("dataflow_init")
+            if total_count_init>=100:
+                _msg = "同步队列报警,有%s条数据滞留"%(str(total_count_init))
+                sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+
             total_count_todeal = getQueueSize("dataflow_extract")
 
             if total_count_todeal>100:
@@ -228,7 +235,7 @@ class BaseDataMonitor():
 
                 _msg = "要素提取队列报警:队列堆积%s条公告,最近十分钟入库数:%s,最近十分钟处理公告数:%s,其中成功处理数:%s"%(str(total_count_todeal),str(init_count),str(process_count),str(success_count))
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-                sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
         except Exception as e:
             traceback.print_exc()
 
@@ -238,18 +245,28 @@ class BaseDataMonitor():
         last_date = timeAdd(current_date,-1,"%Y-%m-%d")
 
         query = BoolQuery(must_queries=[
-            RangeQuery("page_time",last_date),
+            RangeQuery("update_time",last_date),
             WildcardQuery("docids","*")
         ])
 
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
+        rows,next_token,total_count_doc,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
                                                                                    SearchQuery(query,None,True),
                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
-        if total_count<=100:
-            _msg = "拟在建生成报警:最近两天生成的拟在建数量为:%d"%(total_count)
-            sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+
+        query = BoolQuery(must_queries=[
+            RangeQuery("update_time",last_date),
+            WildcardQuery("spids","*")
+        ])
+
+        rows,next_token,total_count_sp,is_all_succeed = self.ots_client.search("designed_project","designed_project_index",
+                                                                            SearchQuery(query,None,True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+        total_count = total_count_doc+total_count_sp
+        _msg = "拟在建生成报警:最近两天生成的拟在建数量为:%d,其中公告生成:%d,审批项目生成:%d"%(total_count,total_count_doc,total_count_sp)
+        sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
+        # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
     def monitor_sychr(self):
@@ -268,7 +285,7 @@ class BaseDataMonitor():
         if total_count>=200:
             _msg = "数据流报警:待同步到成品表公告数为:%d"%(total_count)
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
     def monitor_preproject(self):
         current_date = getCurrent_date("%Y-%m-%d")
@@ -286,7 +303,7 @@ class BaseDataMonitor():
         if total_count<=10*10000:
             _msg = "周期项目生成报警:最近一周生成/更新的周期项目数量为:%d"%(total_count)
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
     def monitor_dumplicate(self):
 
@@ -325,12 +342,12 @@ class BaseDataMonitor():
         if total_count_lastday_dump/total_count_lastday<0.2:
             _msg = "公告去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
         if total_count_to_dump>2000:
             _msg = "公告去重报警,待去重数量:%s"%(str(total_count_to_dump))
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
         #成品表监控
         query = BoolQuery(must_queries=[
@@ -351,7 +368,7 @@ class BaseDataMonitor():
         if total_count_lastday_dump/total_count_lastday<0.2:
             _msg = "公告成品去重报警,%s入库公告数:%d,其中去重数:%d,去重率:%.2f"%(last_date,total_count_lastday,total_count_lastday_dump,total_count_lastday_dump/total_count_lastday)
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
         query = BoolQuery(must_queries=[
             RangeQuery("status",*flow_dumplicate_status_from,True,True),
@@ -364,7 +381,7 @@ class BaseDataMonitor():
         if total_count>=200:
             _msg = "数据流报警:待去重公告数为:%d"%(total_count)
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
     def monitor_merge(self):
@@ -432,11 +449,11 @@ class BaseDataMonitor():
                     f.write(",".join(list_multi))
                 _msg = "公告合并报警:近%d条成品公告,有%d条未生成项目,有%d条公告找到多个项目,详见%s;其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,not_find_count,multi_count,logname,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-                sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
             else:
                 _msg = "公告合并报警:近%d条成品公告其中有%d条中标公告且找到招标公告数为%d,比率为%.3f"%(check_count,zhongbiao_count,zhongbiao_find_zhaobiao,zhongbiao_find_zhaobiao/zhongbiao_count)
                 sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-                sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+                # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
         query = BoolQuery(must_queries=[
@@ -451,10 +468,11 @@ class BaseDataMonitor():
         if total_count_lastday<10*10000:
             _msg = "公告成品入库报警,%s入库公告数:%d"%(last_date,total_count_lastday)
             sentMsgToDD(_msg,ACCESS_TOKEN_DATAWORKS)
-            sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
+            # sendEmail(smtp_host,smtp_username,smtp_password,self.recieviers,_msg)
 
 
     def start_monitor(self):
+        #数据监控
 
         scheduler = BlockingScheduler()
 
@@ -468,12 +486,19 @@ class BaseDataMonitor():
         scheduler.add_job(self.monitor_init,"cron",hour="*/3")
         scheduler.start()
 
+    def start_attach_monitor(self):
+        #附件监控
+        scheduler = BlockingScheduler()
+
+        scheduler.add_job(self.monitor_attachment,"cron",minute="*/10")
+        scheduler.start()
 
 
 if __name__ == '__main__':
 
     dm = BaseDataMonitor()
     # dm.start_monitor()
+    dm.monitor_proposedBuilding()
     print(dm.get_last_tenmin_time(16))
 
 

BIN
BaseDataMaintenance/maintenance/check.xlsx


+ 789 - 18
BaseDataMaintenance/maintenance/dataflow.py

@@ -7,6 +7,7 @@ from BaseDataMaintenance.dataSource.source import getConnect_ots,getConnect_ots_
 from tablestore import *
 from BaseDataMaintenance.common.Utils import *
 from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from BaseDataMaintenance.common.multiProcess import MultiProcessHandler
 from queue import Queue
 
 from BaseDataMaintenance.model.ots.document_tmp import *
@@ -25,6 +26,7 @@ from apscheduler.schedulers.blocking import BlockingScheduler
 from BaseDataMaintenance.maintenance.dataflow_settings import *
 from threading import Thread
 import oss2
+from BaseDataMaintenance.maintenance.documentDumplicate import *
 
 def getSet(list_dict,key):
     _set = set()
@@ -150,7 +152,6 @@ class Dataflow():
                 for _dict in list_dict:
                     self.queue_init.put(_dict)
                 _count += len(list_dict)
-                print("%d/%d"%(_count,total_count))
         def comsumer():
             mt = MultiThreadHandler(self.queue_init,comsumer_handle,None,30,1,ots_client=self.ots_client)
             mt.run()
@@ -336,9 +337,9 @@ class Dataflow():
 
 
 
-    def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
-                                  set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save"]),
-                                  set_range=set(["page_time","status"])):
+    def generate_dumplicate_query(self,_dict,_dict_must_not,set_match=set(["project_code","project_codes","product"]),set_nested=set(["win_tenderer","bidding_budget","win_bid_price"]),
+                                  set_term=set(["project_name","doctitle_refine","docchannel","tenderee","agency","web_source_no","fingerprint","save","docid"]),
+                                  set_range=set(["page_time","status"]),set_phrase=set(["doctitle"])):
         list_must_queries = []
         list_must_no_queries = []
         for k,v in _dict.items():
@@ -355,6 +356,8 @@ class Dataflow():
                     list_must_queries.append(NestedQuery("sub_docs_json",TermQuery("sub_docs_json.%s"%k,_v)))
             elif k in set_term:
                 list_must_queries.append(TermQuery(k,v))
+            elif k in set_phrase:
+                list_must_queries.append(MatchPhraseQuery(k,v))
             elif k in set_range:
                 if len(v)==1:
                     list_must_queries.append(RangeQuery(k,v[0]))
@@ -394,7 +397,11 @@ class Dataflow():
         if agency is not None and agency!="":
             extract_count += 1
         if sub_docs_json is not None:
-            for sub_docs in json.loads(sub_docs_json):
+            sub_docs = json.loads(sub_docs_json)
+            sub_docs.sort(key=lambda x:x.get("bidding_budget",0),reverse=True)
+            sub_docs.sort(key=lambda x:x.get("win_bid_price",0),reverse=True)
+            # log("==%s"%(str(sub_docs)))
+            for sub_docs in sub_docs:
                 for _key_sub_docs in sub_docs.keys():
                     extract_count += 1
                     if _key_sub_docs in columns:
@@ -1251,12 +1258,10 @@ class Dataflow():
                     _dict["save"] = 1
             else:
                 _dict["save"] = 0
-            print(item)
             if item.get("status")>=status_from[0] and item.get("status")<=status_from[1]:
                 _dict["status"] = random.randint(status_to[0],status_to[1])
             list_dict.append(_dict)
         for _dict in list_dict:
-            print(_dict)
             dtmp = Document_tmp(_dict)
             dtmp.update_row(self.ots_client)
 
@@ -1299,7 +1304,6 @@ class Dataflow():
 
         def comsumer_handle(item,result_queue,ots_client):
             # print(item)
-            print("docid",item.get(document_tmp_docid))
             dtmp = Document_tmp(item)
 
             dtmp.setValue(document_tmp_status,random.randint(*status_to),True)
@@ -1373,19 +1377,26 @@ class Dataflow():
                   }
             dtmp = Document_tmp(_d)
 
+            dup_docid = set()
+            for _dict in final_list:
+                dup_docid.add(_dict.get(document_tmp_docid))
+            if item.get(document_tmp_docid) in dup_docid:
+                dup_docid.remove(item.get(document_tmp_docid))
 
             if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
                 dtmp.setValue(document_tmp_save,1,True)
                 dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
+                dmp_docid = ",".join([str(a) for a in list(dup_docid)])
             else:
                 dtmp.setValue(document_tmp_save,0,True)
+                if best_docid in dup_docid:
+                    dup_docid.remove(best_docid)
+                    dmp_docid = ",".join([str(a) for a in list(dup_docid)])
+                    dmp_docid = "%d,%s"%(best_docid,dmp_docid)
+                else:
+                    dmp_docid = ",".join([str(a) for a in list(dup_docid)])
+
 
-            dup_docid = set()
-            for _dict in final_list:
-                dup_docid.add(_dict.get(document_tmp_docid))
-            if item.get(document_tmp_docid) in dup_docid:
-                dup_docid.remove(item.get(document_tmp_docid))
-            dmp_docid = ",".join([str(a) for a in list(dup_docid)])
             dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
             dtmp.update_row(self.ots_client)
 
@@ -1422,7 +1433,7 @@ class Dataflow():
                                              TermQuery("project_name",project_name)])
                 list_should_q.append(_q)
             if len(list_should_q)>0:
-                list_data = self.search_data_by_query(item,list_should_q,100,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
+                list_data = self.search_data_by_query(item,list_should_q,100,merge=True,table_name="project2",table_index="project2_index_formerge",sort_column="tenderee",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=["tenderee","win_tenderer"])
 
                 if len(list_data)==1:
                     dtmp.setValue("merge_uuid",list_data[0]["uuid"],True)
@@ -2057,18 +2068,778 @@ class Dataflow_extract(Dataflow):
         schedule.add_job(self.flow_extract,"cron",second="*/10")
         schedule.start()
 
+class Dataflow_dumplicate(Dataflow):
+
+    def __init__(self):
+        Dataflow.__init__(self)
+        self.c_f_get_extractCount = f_get_extractCount()
+        self.c_f_get_package = f_get_package()
+        logging.basicConfig(level = logging.info,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def get_dict_time(self,_extract,keys=["time_bidclose","time_bidopen","time_bidstart","time_commencement","time_completion","time_earnestMoneyEnd","time_earnestMoneyStart","time_getFileEnd","time_getFileStart","time_publicityEnd","time_publicityStart","time_registrationEnd","time_registrationStart","time_release"]):
+        dict_time = {}
+        for k in keys:
+            dict_time[k] = _extract.get(k)
+        return dict_time
+
+
+    def post_extract(self,_dict):
+        win_tenderer,bidding_budget,win_bid_price,_ = self.f_decode_sub_docs_json(_dict.get(document_tmp_project_code),_dict.get(document_tmp_project_name),_dict.get(document_tmp_tenderee),_dict.get(document_tmp_agency),_dict.get(document_tmp_sub_docs_json))
+        _dict["win_tenderer"] = win_tenderer
+        _dict["bidding_budget"] = bidding_budget
+        _dict["win_bid_price"] = win_bid_price
+        extract_json = _dict.get(document_tmp_extract_json,"{}")
+        _extract = json.loads(extract_json)
+        _dict["product"] = ",".join(_extract.get("product",[]))
+        _dict["fingerprint"] = _extract.get("fingerprint","")
+        _dict["project_codes"] = _extract.get("code",[])
+        if len(_dict["project_codes"])>0:
+            _dict["project_code"] = _dict["project_codes"][0]
+        else:
+            _dict["project_code"] = ""
+        _dict["doctitle_refine"] = _extract.get("doctitle_refine","")
+        _dict["nlp_enterprise"] = str({"indoctextcon":_extract.get("nlp_enterprise",[]),
+                                       "notindoctextcon":_extract.get("nlp_enterprise_attachment",[])})
+        _dict["extract_count"] = self.c_f_get_extractCount.evaluate(extract_json)
+        _dict["package"] = self.c_f_get_package.evaluate(extract_json)
+        _dict["project_name"] = _extract.get("name","")
+        _dict["dict_time"] = self.get_dict_time(_extract)
+
+    def dumplicate_fianl_check(self,base_list):
+        the_group = base_list
+        the_group.sort(key=lambda x:x["confidence"],reverse=True)
+
+        _index = 0
+        base_fingerprint = "None"
+        if len(base_list)>0:
+            base_fingerprint = base_list[0]["fingerprint"]
+        for _i in range(1,len(base_list)):
+            _dict1 = base_list[_i]
+            fingerprint_less = _dict1["fingerprint"]
+            _pass = True
+            if fingerprint_less==base_fingerprint:
+                _index = _i
+                continue
+            for _j in range(min(_i,5)):
+                _dict2 = base_list[_j]
+                _prob = self.dumplicate_check(_dict1,_dict2,_dict2.get("min_counts",10),b_log=True)
+                # print("_prob:",_prob)
+                if _prob<=0.1:
+                    _pass = False
+                    break
+            log("checking index:%d"%(_i))
+            _index = _i
+            if not _pass:
+                _index -= 1
+                break
+
+        if _index>=1:
+            return the_group[:_index+1]
+        return []
+
+    def dumplicate_check(self,_dict1,_dict2,min_counts,b_log=False):
+        document_less = _dict1
+        docid_less = _dict1["docid"]
+        docchannel_less = document_less["docchannel"]
+        page_time_less = document_less["page_time"]
+        doctitle_refine_less = document_less["doctitle_refine"]
+        project_codes_less = document_less["project_codes"]
+        nlp_enterprise_less = document_less["nlp_enterprise"]
+        tenderee_less = document_less["tenderee"]
+        agency_less = document_less["agency"]
+        win_tenderer_less = document_less["win_tenderer"]
+        bidding_budget_less = document_less["bidding_budget"]
+        win_bid_price_less = document_less["win_bid_price"]
+        product_less = document_less["product"]
+        package_less = document_less["package"]
+        json_time_less = document_less["dict_time"]
+        project_name_less = document_less["project_name"]
+        fingerprint_less = document_less["fingerprint"]
+        extract_count_less = document_less["extract_count"]
+
+        document_greater = _dict2
+        docid_greater = _dict2["docid"]
+        page_time_greater = document_greater["page_time"]
+        doctitle_refine_greater = document_greater["doctitle_refine"]
+        project_codes_greater = document_greater["project_codes"]
+        nlp_enterprise_greater = document_greater["nlp_enterprise"]
+        tenderee_greater = document_greater["tenderee"]
+        agency_greater = document_greater["agency"]
+        win_tenderer_greater = document_greater["win_tenderer"]
+        bidding_budget_greater = document_greater["bidding_budget"]
+        win_bid_price_greater = document_greater["win_bid_price"]
+        product_greater = document_greater["product"]
+        package_greater = document_greater["package"]
+        json_time_greater = document_greater["dict_time"]
+        project_name_greater = document_greater["project_name"]
+        fingerprint_greater = document_greater["fingerprint"]
+        extract_count_greater = document_greater["extract_count"]
+
+
+        if fingerprint_less==fingerprint_greater:
+            return 1
+
+        same_count = 0
+        all_count = 8
+        if len(set(project_codes_less) & set(project_codes_greater))>0:
+            same_count += 1
+        if getLength(tenderee_less)>0 and tenderee_less==tenderee_greater:
+            same_count += 1
+        if getLength(agency_less)>0 and agency_less==agency_greater:
+            same_count += 1
+        if getLength(win_tenderer_less)>0 and win_tenderer_less==win_tenderer_greater:
+            same_count += 1
+        if getLength(bidding_budget_less)>0 and bidding_budget_less==bidding_budget_greater:
+            same_count += 1
+        if getLength(win_bid_price_less)>0 and win_bid_price_less==win_bid_price_greater:
+            same_count += 1
+        if getLength(project_name_less)>0 and project_name_less==project_name_greater:
+            same_count += 1
+        if getLength(doctitle_refine_less)>0 and doctitle_refine_less==doctitle_refine_greater:
+            same_count += 1
+        base_prob = 0
+        if min_counts<3:
+            base_prob = 0.9
+        elif min_counts<5:
+            base_prob = 0.8
+        elif min_counts<8:
+            base_prob = 0.7
+        else:
+            base_prob = 0.6
+        _prob = base_prob*same_count/all_count
+        if _prob<0.1 and min(extract_count_less,extract_count_greater)<=3:
+            _prob = 0.15
+        if _prob<0.1:
+            return _prob
+
+        check_result = {"pass":1}
+        if docchannel_less in (51,102,103,104,115,116,117):
+            if doctitle_refine_less!=doctitle_refine_greater:
+                if page_time_less!=page_time_greater:
+                    check_result["docchannel"] = 0
+                    check_result["pass"] = 0
+                else:
+                    check_result["docchannel"] = 2
+        if not check_doctitle(doctitle_refine_less,doctitle_refine_greater,project_codes_less,project_codes_greater):
+            check_result["doctitle"] = 0
+            check_result["pass"] = 0
+            if b_log:
+                logging.info("%d-%d,check_doctitle_failed:%s==%s"%(docid_less,docid_greater,str(doctitle_refine_less),str(doctitle_refine_greater)))
+        else:
+            check_result["doctitle"] = 2
+
+        #added check
+        if not check_codes(project_codes_less,project_codes_greater):
+            check_result["code"] = 0
+            check_result["pass"] = 0
+            if b_log:
+                logging.info("%d-%d,check_code_failed:%s==%s"%(docid_less,docid_greater,str(project_codes_less),str(project_codes_greater)))
+        else:
+            if getLength(project_codes_less)>0 and getLength(project_codes_greater)>0 and len(set(project_codes_less) & set(project_codes_greater))>0:
+                check_result["code"] = 2
+            else:
+                check_result["code"] = 1
+
+
+        if not check_product(product_less,product_greater):
+            check_result["product"] = 0
+            check_result["pass"] = 0
+            if b_log:
+                logging.info("%d-%d,check_product_failed:%s==%s"%(docid_less,docid_greater,str(product_less),str(product_greater)))
+        else:
+            if getLength(product_less)>0 and getLength(product_greater)>0:
+                check_result["product"] = 2
+            else:
+                check_result["product"] = 1
+
+        if not check_demand():
+            check_result["pass"] = 0
+
+        if not check_entity(nlp_enterprise_less,nlp_enterprise_greater,
+                            tenderee_less,tenderee_greater,
+                            agency_less,agency_greater,
+                            win_tenderer_less,win_tenderer_greater):
+            check_result["entity"] = 0
+            check_result["pass"] = 0
+            if b_log:
+                logging.info("%d-%d,check_entity_failed:%s==%s==%s==%s==%s==%s==%s==%s"%(docid_less,docid_greater,str(nlp_enterprise_less),str(nlp_enterprise_greater),str(tenderee_less),str(tenderee_greater),str(agency_less),str(agency_greater),str(win_tenderer_less),str(win_tenderer_greater)))
+        else:
+            if docchannel_less in (51,52,103,105,114,118) and getLength(tenderee_less)>0 and getLength(tenderee_greater)>0:
+                check_result["entity"] = 2
+            elif docchannel_less in (101,119,120) and getLength(win_tenderer_less)>0 and getLength(win_tenderer_greater)>0:
+                check_result["entity"] = 2
+            else:
+                check_result["entity"] = 1
+
+        if not check_money(bidding_budget_less,bidding_budget_greater,
+                           win_bid_price_less,win_bid_price_greater):
+            if b_log:
+                logging.info("%d-%d,check_money_failed:%s==%s==%s==%s"%(docid_less,docid_greater,str(bidding_budget_less),str(bidding_budget_greater),str(win_bid_price_less),str(win_bid_price_greater)))
+            check_result["money"] = 0
+            check_result["pass"] = 0
+        else:
+            if docchannel_less in (51,52,103,105,114,118) and getLength(bidding_budget_less)>0 and getLength(bidding_budget_greater)>0:
+                check_result["money"] = 2
+            elif docchannel_less in (101,119,120) and getLength(win_bid_price_less)>0 and getLength(win_bid_price_greater)>0:
+                check_result["money"] = 2
+            else:
+                check_result["money"] = 1
+
+        #added check
+        if not check_package(package_less,package_greater):
+            if b_log:
+                logging.info("%d-%d,check_package_failed:%s==%s"%(docid_less,docid_greater,str(package_less),str(package_greater)))
+            check_result["package"] = 0
+            check_result["pass"] = 0
+        else:
+            if getLength(package_less)>0 and getLength(package_greater)>0:
+                check_result["package"] = 2
+            else:
+                check_result["package"] = 1
+
+        #added check
+        if not check_time(json_time_less,json_time_greater):
+            if b_log:
+                logging.info("%d-%d,check_time_failed:%s==%s"%(docid_less,docid_greater,str(json_time_less),str(json_time_greater)))
+                if isinstance(json_time_less,dict):
+                    time_less = json_time_less
+                else:
+                    time_less = json.loads(json_time_less)
+                if isinstance(json_time_greater,dict):
+                    time_greater = json_time_greater
+                else:
+                    time_greater = json.loads(json_time_greater)
+                for k,v in time_less.items():
+                    if getLength(v)>0:
+                        v1 = time_greater.get(k,"")
+                        if getLength(v1)>0:
+                            if v!=v1:
+                                log("%d-%d,key:%s"%(docid_less,docid_greater,str(k)))
+
+            check_result["time"] = 0
+            check_result["pass"] = 0
+        else:
+            if getLength(json_time_less)>10 and getLength(json_time_greater)>10:
+                check_result["time"] = 2
+            else:
+                check_result["time"] = 1
+
+        if check_result.get("pass",0)==0:
+            if b_log:
+                logging.info(str(check_result))
+
+            if check_result.get("money",1)==0:
+                return 0
+
+            if check_result.get("entity",1)==2 and check_result.get("code",1)==2 and check_result.get("doctitle",2)==2 and check_result.get("product",2)==2 and check_result.get("money",0)==2:
+                return _prob
+            else:
+                return 0
+            if check_result.get("time",1)==0:
+                return 0
+        return _prob
+
+    def search_data_by_query(self,item,_query,confidence,retry_times=3,merge=False,table_name="document_tmp",table_index="document_tmp_index",sort_column="docid",singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
+
+        for _ in range(retry_times):
+            try:
+                _time = time.time()
+                check_time = 0
+                if isinstance(_query,list):
+                    bool_query = BoolQuery(should_queries=_query)
+                else:
+                    bool_query = _query
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search(table_name,table_index,
+                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(sort_column)]),limit=30,get_total_count=True),
+                                                                                    ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+                list_dict = getRow_ots(rows)
+                list_data = []
+                for _dict in list_dict:
+                    self.post_extract(_dict)
+                    _docid = _dict.get(document_tmp_docid)
+                    if merge:
+                        list_data.append(_dict)
+                    else:
+                        if _docid!=item.get(document_tmp_docid):
+                            _time1 = time.time()
+                            confidence = self.dumplicate_check(item,_dict,total_count,b_log=True)
+                            check_time+= time.time()-_time1
+
+                            _dict["confidence"] = confidence
+                            _dict["min_counts"] = total_count
+                            list_data.append(_dict)
+                all_time = time.time()-_time
+                # log("check:%d rows takes%.4f,check%.4f"%(len(list_dict),all_time-check_time,check_time))
+                return list_data
+            except Exception as e:
+                traceback.print_exc()
+        return []
+
+    def add_data_by_query(self,item,base_list,set_docid,_query,confidence,table_name,table_index,singleNum_keys=["tenderee","win_tenderer"],contain_keys=[],multiNum_keys=[],notlike_keys=["project_code"],columns=[document_tmp_save,document_tmp_status,document_tmp_product,document_tmp_docchannel,document_tmp_web_source_no,document_tmp_doctitle_refine,document_tmp_project_code,document_tmp_project_name,document_tmp_tenderee,document_tmp_agency,document_tmp_sub_docs_json,document_tmp_extract_count]):
+        list_dict = self.search_data_by_query(item,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,notlike_keys=notlike_keys,columns=columns)
+        for _dict in list_dict:
+            _docid = _dict.get(document_tmp_docid)
+            confidence = _dict["confidence"]
+            if confidence>0.1:
+                if _docid not in set_docid:
+                    base_list.append(_dict)
+                    set_docid.add(_docid)
+            set_docid.add(_docid)
+
+    def appendRule(self,list_rules,_dict,base_dict,must_not_dict,confidence,item,to_log=True):
+        for k,v in _dict.items():
+            if getLength(v)==0:
+                return
+        _dict.update(base_dict)
+        if to_log:
+            log(str(_dict))
+        _query = self.generate_dumplicate_query(_dict,must_not_dict)
+        _rule = {"confidence":confidence,
+                 "item":item,
+                 "query":_query,
+                 "singleNum_keys":[],
+                 "contain_keys":[],
+                 "multiNum_keys":[]}
+        list_rules.append(_rule)
+
+    def translate_dumplicate_rules(self,status_from,item,get_all=False,to_log=False):
+        docchannel,project_code,project_name,tenderee,agency,doctitle_refine,win_tenderer,bidding_budget,win_bid_price,page_time,fingerprint,product = self.get_dump_columns(item)
+        current_date = getCurrent_date("%Y-%m-%d")
+        if page_time=='':
+            page_time = current_date
+
+        if page_time>=timeAdd(current_date,-2):
+            table_name = "document_tmp"
+            table_index = "document_tmp_index"
+            base_dict = {
+                "docchannel":item["docchannel"],
+                "status":[status_from[0]],
+                "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
+            }
+            must_not_dict = {"save":0,"docid":item.get("docid")}
+            doctitle_refine_name = "doctitle_refine"
+        else:
+            table_name = "document"
+            table_index = "document_index"
+            if get_all:
+                _status = [201,450]
+            else:
+                _status = [201,300]
+            base_dict = {
+                "docchannel":item["docchannel"],
+                "status":_status,
+                "page_time":[timeAdd(page_time,-2),timeAdd(page_time,2)]
+            }
+            must_not_dict = {"docid":item.get("docid")}
+            doctitle_refine_name = "doctitle"
+
+
+
+        list_rules = []
+        singleNum_keys = ["tenderee","win_tenderer"]
+
+        confidence = 100
+        self.appendRule(list_rules,{document_tmp_fingerprint:fingerprint},base_dict,must_not_dict,confidence,item)
+        confidence = 90
+        _dict = {document_tmp_agency:agency,
+                 "win_tenderer":win_tenderer,
+                 "win_bid_price":win_bid_price}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {document_tmp_agency:agency,
+                 "win_tenderer":win_tenderer,
+                 "bidding_budget":bidding_budget}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {document_tmp_agency:agency,
+                 "win_bid_price":win_bid_price,
+                 "bidding_budget":bidding_budget}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {win_tenderer:win_tenderer,
+                 "win_bid_price":win_bid_price,
+                 "bidding_budget":bidding_budget}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {"tenderee":tenderee,
+                 "win_tenderer":win_tenderer,
+                 "win_bid_price":win_bid_price}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {"tenderee":tenderee,
+                 "win_tenderer":win_tenderer,
+                 "bidding_budget":bidding_budget}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"tenderee":tenderee,
+                 "win_bid_price":win_bid_price,
+                 "bidding_budget":bidding_budget}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {"tenderee":tenderee,
+                 "agency":agency,
+                 "win_tenderer":win_tenderer}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {"tenderee":tenderee,
+                 "agency":agency,
+                 "win_bid_price":win_bid_price}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"tenderee":tenderee,
+                 "agency":agency,
+                 "bidding_budget":bidding_budget}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        confidence=85
+        _dict = {"tenderee":tenderee,
+                 "agency":agency
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {"tenderee":tenderee,
+                 "project_codes":project_code
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {"tenderee":tenderee,
+                 "project_name":project_name
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        if getLength(product)>0:
+            l_p = product.split(",")
+            _dict = {"tenderee":tenderee,
+                     "product":l_p[0]
+                     }
+            self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"tenderee":tenderee,
+                 "win_tenderer":win_tenderer
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"tenderee":tenderee,
+                 "win_bid_price":win_bid_price
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"tenderee":tenderee,
+                 "bidding_budget":bidding_budget
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"tenderee":tenderee,
+                 doctitle_refine_name:doctitle_refine
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"agency":agency,
+                 "project_codes":project_code
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"agency":agency,
+                 "project_name":project_name
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"project_codes":project_code,
+                 "project_name":project_name
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"project_codes":project_code,
+                 "win_tenderer":win_tenderer
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"project_codes":project_code,
+                 "win_bid_price":win_bid_price
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"project_codes":project_code,
+                 "bidding_budget":bidding_budget
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
 
+        _dict = {"project_codes":project_code,
+                 doctitle_refine_name:doctitle_refine
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"project_name":project_name,
+                 "win_tenderer":win_tenderer
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"project_name":project_name,
+                 "win_bid_price":win_bid_price
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"project_name":project_name,
+                 "bidding_budget":bidding_budget
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"project_name":project_name,
+                 doctitle_refine_name:doctitle_refine
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"win_tenderer":win_tenderer,
+                 "win_bid_price":win_bid_price
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"win_tenderer":win_tenderer,
+                 "bidding_budget":bidding_budget
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"win_tenderer":win_tenderer,
+                 doctitle_refine_name:doctitle_refine
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"win_bid_price":win_bid_price,
+                 "bidding_budget":bidding_budget
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"win_bid_price":win_bid_price,
+                 doctitle_refine_name:doctitle_refine
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        _dict = {"bidding_budget":bidding_budget,
+                 doctitle_refine_name:doctitle_refine
+                 }
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        confidence=80
+        _dict = {doctitle_refine_name:doctitle_refine}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+        _dict = {"project_codes":project_code}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        confidence=70
+        _dict = {"project_name":project_name}
+        self.appendRule(list_rules,_dict,base_dict,must_not_dict,confidence,item)
+
+        return list_rules,table_name,table_index
+
+
+    def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
+        def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]):
+            bool_query = BoolQuery(must_queries=[
+                RangeQuery(document_tmp_status,*status_from,True,True),
+                # TermQuery("docid",246433488)
+            ])
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
+                                                                                ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+            log("flow_dumplicate producer total_count:%d"%total_count)
+            list_dict = getRow_ots(rows)
+            for _dict in list_dict:
+                self.queue_dumplicate.put(_dict)
+            _count = len(list_dict)
+            while next_token and _count<flow_process_count:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                    ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+                list_dict = getRow_ots(rows)
+                for _dict in list_dict:
+                    self.queue_dumplicate.put(_dict)
+                _count += len(list_dict)
+        def comsumer():
+            mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
+            mt.run()
+
+        producer()
+        comsumer()
+
+    def dumplicate_comsumer_handle(self,item,result_queue,ots_client,get_all=False,upgrade=True):
+        start_time = time.time()
+        self.post_extract(item)
+
+        base_list = []
+        set_docid = set()
+
+        list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,get_all=get_all,to_log=True)
+
+        list_rules.sort(key=lambda x:x["confidence"],reverse=True)
+        _i = 0
+        step = 5
+
+        item["confidence"] = 999
+        if item.get(document_tmp_docid) not in set_docid:
+            base_list.append(item)
+            set_docid.add(item.get(document_tmp_docid))
+
+        while _i<len(list_rules):
+            must_not_q = []
+            if len(base_list)>0:
+                must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
+            _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
+                               must_not_queries=must_not_q)
+            _rule = list_rules[_i]
+            confidence = _rule["confidence"]
+            singleNum_keys = _rule["singleNum_keys"]
+            contain_keys = _rule["contain_keys"]
+            multiNum_keys = _rule["multiNum_keys"]
+            self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
+            _i += step
+
+
+
+        _time = time.time()
+        log("%d start final check with length:%d"%(item["docid"],len(base_list)))
+        final_list = self.dumplicate_fianl_check(base_list)
+        log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
+        best_docid = self.get_best_docid(final_list)
+
+        final_list_docid = [a["docid"] for a in final_list]
+        log("%d:final_list_docid:%s"%(item["docid"],str(final_list_docid)))
+        _d = {"partitionkey":item["partitionkey"],
+              "docid":item["docid"],
+              "status":random.randint(*flow_dumplicate_status_to),
+              document_tmp_opertime:getCurrent_date(format="%Y-%m-%d %H:%M:%S")
+              }
+        dtmp = Document_tmp(_d)
+
+
+        dup_docid = set()
+        for _dict in final_list:
+            dup_docid.add(_dict.get(document_tmp_docid))
+        if item.get(document_tmp_docid) in dup_docid:
+            dup_docid.remove(item.get(document_tmp_docid))
+
+
+        remove_list = []
+
+        if len(final_list)==0 or best_docid==item.get(document_tmp_docid):
+            dtmp.setValue(document_tmp_save,1,True)
+            dtmp.setValue(document_tmp_merge_uuid,self.merge_document(item,flow_dumplicate_status_to),True)
+            dmp_docid = ",".join([str(a) for a in list(dup_docid)])
+            for _dict in final_list:
+                if _dict.get(document_tmp_docid) in dup_docid:
+                    remove_list.append(_dict)
+        else:
+            dtmp.setValue(document_tmp_save,0,True)
+            if best_docid in dup_docid:
+                dup_docid.remove(best_docid)
+                for _dict in final_list:
+                    if _dict.get(document_tmp_docid) in dup_docid:
+                        remove_list.append(_dict)
+                dmp_docid = ",".join([str(a) for a in list(dup_docid)])
+                dmp_docid = "%d,%s"%(best_docid,dmp_docid)
+            else:
+                dmp_docid = ",".join([str(a) for a in list(dup_docid)])
+                for _dict in final_list:
+                    if _dict.get(document_tmp_docid) in dup_docid:
+                        remove_list.append(_dict)
+        log("save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
+        if upgrade:
+            self.changeSaveStatus(remove_list)
+
+            dmp_docid = ",".join([str(a) for a in list(dup_docid)])
+            dtmp.setValue(document_tmp_dup_docid,dmp_docid,True)
+            dtmp.update_row(self.ots_client)
+
+        # log("dump takes %.2f"%(time.time()-start_time))
+
+    def start_flow_dumplicate(self):
+        schedule = BlockingScheduler()
+        schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
+        schedule.start()
+
+    def changeSaveStatus(self,list_dict):
+        for _dict in list_dict:
+            if _dict.get(document_tmp_save,1)==1:
+                _d = {"partitionkey":_dict["partitionkey"],
+                      "docid":_dict["docid"],
+                      document_tmp_save:0
+                      }
+                _d_tmp = Document_tmp(_d)
+                _d_tmp.update_row(self.ots_client)
+
+
+
+    def test_dumplicate(self,docid):
+        columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
+        bool_query = BoolQuery(must_queries=[
+            TermQuery("docid",docid)
+        ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
+                                                                            ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        log("flow_dumplicate producer total_count:%d"%total_count)
+        list_dict = getRow_ots(rows)
+
+        for item in list_dict:
+            self.dumplicate_comsumer_handle(item,None,self.ots_client,get_all=True,upgrade=False)
+            return
+
+    def getRemainDoc(self,docid):
+        columns=[document_tmp_status,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json]
+        bool_query = BoolQuery(must_queries=[
+            TermQuery("docid",docid)
+        ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("docid")]),limit=100,get_total_count=True),
+                                                                            ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        list_dict = getRow_ots(rows)
+
+        if len(list_dict)>0:
+            item = list_dict[0]
+            start_time = time.time()
+            self.post_extract(item)
+
+            base_list = []
+            set_docid = set()
+
+            list_rules,table_name,table_index = self.translate_dumplicate_rules(flow_dumplicate_status_from,item,to_log=True)
+
+            list_rules.sort(key=lambda x:x["confidence"],reverse=True)
+            _i = 0
+            step = 5
+
+            item["confidence"] = 999
+            if item.get(document_tmp_docid) not in set_docid:
+                base_list.append(item)
+                set_docid.add(item.get(document_tmp_docid))
+
+            while _i<len(list_rules):
+                must_not_q = []
+                if len(base_list)>0:
+                    must_not_q = [TermQuery("docid",a) for a in list(set_docid)[-100:]]
+                _query = BoolQuery(should_queries=[_rule["query"] for _rule in list_rules[_i:_i+step]],
+                                   must_not_queries=must_not_q)
+                _rule = list_rules[_i]
+                confidence = _rule["confidence"]
+                singleNum_keys = _rule["singleNum_keys"]
+                contain_keys = _rule["contain_keys"]
+                multiNum_keys = _rule["multiNum_keys"]
+                self.add_data_by_query(item,base_list,set_docid,_query,confidence,table_name=table_name,table_index=table_index,singleNum_keys=singleNum_keys,contain_keys=contain_keys,multiNum_keys=multiNum_keys,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle_refine,document_tmp_sub_docs_json,document_tmp_extract_json])
+                _i += step
+
+
+
+            _time = time.time()
+            log("%d start final check with length:%d"%(item["docid"],len(base_list)))
+            final_list = self.dumplicate_fianl_check(base_list)
+            log("%d final_check takes:%.2f"%(item["docid"],time.time()-_time))
+            best_docid = self.get_best_docid(final_list)
+            return best_docid
+        return None
 
 if __name__ == '__main__':
-    df = Dataflow()
+    # df = Dataflow()
     # df.flow_init()
     # df.flow_test()
     # df.test_merge()
-    df.start_flow_attachment()
+    # df.start_flow_attachment()
     # df.start_flow_extract()
     # df.start_flow_dumplicate()
     # # df.start_flow_merge()
     # df.start_flow_remove()
 
     # download_attachment()
-    test_attachment_interface()
+    # test_attachment_interface()
+    df_dump = Dataflow_dumplicate()
+    # df_dump.start_flow_dumplicate()
+    df_dump.test_dumplicate(25126084)

Failā izmaiņas netiks attēlotas, jo tās ir par lielu
+ 2162 - 0
BaseDataMaintenance/maintenance/documentDumplicate.py


+ 49 - 0
BaseDataMaintenance/maintenance/extract_uuid.py

@@ -0,0 +1,49 @@
+
+import re
+import pandas as pd
+
+import pickle
+
+def save(object_to_save, path):
+    '''
+    保存对象
+    @Arugs:
+        object_to_save: 需要保存的对象
+
+    @Return:
+        保存的路径
+    '''
+    with open(path, 'wb') as f:
+        pickle.dump(object_to_save, f)
+
+def extract_uuid_from_log():
+    list_files = [
+                  "/data/python/flow_init_log/flow_init_2022-06-02.log",
+                  "/data/python/flow_init_log/flow_init_2022-06-03.log",
+                  "/data/python/flow_init_log/flow_init_2022-06-04.log",
+                  "/data/python/flow_init_log/flow_init_2022-06-05.log",
+                  "/data/python/flow_init.log"
+    ]
+    list_uuid = []
+    _regrex = "delete\s+(?P<tablename>bxkc[^\s]+)\s+.*ID='(?P<uuid>.+)'"
+    for _file in list_files:
+        with open(_file,"r",encoding="utf8") as f:
+            while 1:
+                _line = f.readline()
+                if not _line:
+                    break
+                _match = re.search(_regrex,_line)
+                if _match is not None:
+                    _uuid = _match.groupdict().get("uuid")
+                    tablename = _match.groupdict().get("tablename")
+                    if _uuid is not None:
+                        list_uuid.append({"uuid":_uuid,"tablename":tablename})
+    df_data = {"uuid":[],
+               "tablename":[]}
+    for _data in list_uuid:
+        for k,v in df_data.items():
+            v.append(_data.get(k))
+    save(df_data,"uuid.pk")
+
+if __name__ == '__main__':
+    extract_uuid_from_log()

+ 27 - 14
BaseDataMaintenance/maintenance/proposedBuilding/DataSynchronization.py

@@ -19,7 +19,7 @@ class DataSynchronization():
         self.isDone = False
         self.proposedBuilding_table = "proposedBuilding_tmp"
         self.proposedBuilding_table_index = "proposedBuilding_tmp_index"
-        self.pool_ots = ConnectorPool(init_num=10,max_num=40,method_init=getConnect_ots)
+        self.ots_client = getConnect_ots()
 
     def producer(self,task_queue):
         '''
@@ -30,13 +30,15 @@ class DataSynchronization():
         bool_query = BoolQuery(must_queries=[ExistsQuery("crtime")])
 
         columns = ["uuid","crtime","json_list_group"]
+
         rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
-                                                                          SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("crtime",SortOrder.ASC)]), limit=100, get_total_count=True),
+                                                                          SearchQuery(bool_query ,sort=Sort(sorters=[FieldSort("crtime",SortOrder.DESC)]), limit=100, get_total_count=True),
                                                                           ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
         list_data = getRow_ots(rows)
         for _data in list_data:
             _proposed = proposedBuilding_tmp(_data)
             task_queue.put(_proposed,True)
+        _count = len(list_data)
         while next_token:
             rows, next_token, total_count, is_all_succeed = ots_client.search(self.proposedBuilding_table, self.proposedBuilding_table_index,
                                                                               SearchQuery(bool_query ,next_token=next_token, limit=100, get_total_count=True),
@@ -45,16 +47,24 @@ class DataSynchronization():
             for _data in list_data:
                 _proposed = proposedBuilding_tmp(_data)
                 task_queue.put(_proposed,True)
+            _count += len(list_data)
+            if _count>3000:
+                break
+
 
     def comsumer(self,task_queue):
 
-        def _handle(_proposed,result_queue,pool_ots):
-            ots_client = pool_ots.getConnector()
+        def _handle(_proposed,result_queue,ots_client):
+
+            print(_proposed)
 
             #修改designed_project
+            _time = time.time()
             _project_dict = _proposed.toDesigned_project(ots_client)
+            log("toDesigned_project takes %.2fs"%(time.time()-_time))
 
             try:
+                _time = time.time()
                 if _project_dict is not None:
                     #更新数据
                     _designed_project = designed_project(_project_dict)
@@ -62,18 +72,15 @@ class DataSynchronization():
 
                 #删除tmp
                 _proposed.delete_row(ots_client)
+                log("update designed takes %.2fs"%(time.time()-_time))
             except Exception as e:
                 log("comsumer failed cause of %s"%(str(e)))
                 log(traceback.format_exc())
 
 
-
-            pool_ots.putConnector(ots_client)
-
-
         result_queue = queue.Queue()
 
-        mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,pool_ots=self.pool_ots)
+        mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,ots_client=self.ots_client)
         mt.run()
 
 
@@ -91,14 +98,18 @@ class DataSynchronization():
 
         self.producer(task_queue)
 
-        if self.waitTask(task_queue):
-            thread_comsumer = Thread(target=self.comsumer,args=([task_queue]))
-            thread_comsumer.start()
+        # _dict = {"uuid":"12313","crtime":123,
+        #                 "json_list_group":'''
+        #                 [{"docid": 254681903, "shenpi_id": null, "type_id": "254681903", "page_time": "2022-07-25", "province": "云南", "city": null, "district": null, "tenderee": "景谷傣族彝族自治县永平镇人民政府", "tenderee_contact": "", "tenderee_phone": "0879-5311786", "agency": "云南赛林工程管理咨询有限公司", "project_code": "云赛招字2022-245", "project_name": "永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目", "doctitle": "云赛招字2022-245:景谷傣族彝族自治县永平镇人民政府永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目施工竞争性磋商成交公告", "docchannel": "101", "stage": "施工在建", "proportion": "全长1959.8m", "projectDigest": "标的信息,工程类:标段名称:永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目。工程类:名称:永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目。工程类:施工工期:90日历天", "projectAddress": null, "begin_time": null, "end_time": null, "project_name_refind": "永平镇芒腊村芒腊组组内道路建设及基础设施建设扶持项目", "industry": "司法建筑", "new_enough": 1, "follow_enough": 1}]
+        #                 '''}
+        # task_queue.put(proposedBuilding_tmp(_dict))
+
+        self.comsumer(task_queue)
 
 
     def scheduler(self):
         _scheduler = BlockingScheduler()
-        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/10")
+        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1")
         _scheduler.start()
 
 def startSychro():
@@ -107,6 +118,8 @@ def startSychro():
 
 if __name__=="__main__":
     ds = DataSynchronization()
-    ds.scheduler()
+    # ds.scheduler()
+    ds.maxcompute2ots()
+
 
 

+ 2 - 2
BaseDataMaintenance/model/ots/BaseModel.py

@@ -66,11 +66,11 @@ class BaseModel():
         # 客户端异常,一般为参数错误或者网络异常。
         except OTSClientError as e:
             traceback.print_exc()
-            log("get row failed, http_status:%d, error_message:%s" % (e.get_http_status(), e.get_error_message()))
+            log("%s get row failed, http_status:%s, error_message:%s" % (table_name,str(e.get_http_status()), str(e.get_error_message())))
         # 服务端异常,一般为参数错误或者流控错误。
         except OTSServiceError as e:
             traceback.print_exc()
-            log("get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (e.get_http_status(), e.get_error_code(), e.get_error_message(), e.get_request_id()))
+            log("get row failed, http_status:%d, error_code:%s, error_message:%s, request_id:%s" % (str(e.get_http_status()), e.get_error_code(), e.get_error_message(), e.get_request_id()))
 
     def fix_columns(self,ots_client,columns_to_fix,_flag):
         _dict = self.search(ots_client,self.table_name,self.getPrimaryKey_turple(),columns_to_fix)

+ 6 - 3
BaseDataMaintenance/model/ots/designed_project.py

@@ -14,10 +14,12 @@ class designed_project(BaseModel):
     def getPrimary_keys(self):
         return ["partitionkey","id"]
 
-    def search_by_docids(self,ots_client,docids):
+    def search_by_docids(self,ots_client,docids,spids):
         should_q = []
         for _docid in docids.split(","):
             should_q.append(TermQuery("docids",_docid))
+        for _spid in spids.split(","):
+            should_q.append(TermQuery("spids",_spid))
 
         bool_query = BoolQuery(should_queries=should_q)
         columns = ["docids"]
@@ -29,10 +31,11 @@ class designed_project(BaseModel):
 
 
     def update_project(self,ots_client):
-        docids = self.__dict__.get("docids")
+        docids = self.__dict__.get("docids","")
+        spids = self.__dict__.get("spids","")
 
         #判断是否有存量生成项目,有则更新且删除多余的
-        list_dict = self.search_by_docids(ots_client,docids)
+        list_dict = self.search_by_docids(ots_client,docids,spids)
         if len(list_dict)>0:
             for _dict in list_dict[1:]:
                 _designed_delete = designed_project(_dict)

+ 13 - 0
BaseDataMaintenance/model/ots/document_tmp.py

@@ -39,6 +39,19 @@ document_tmp_extract_json = "extract_json"
 document_tmp_industry_json = "industry_json"
 document_tmp_other_json = "other_json"
 
+document_tmp_time_bidclose = "time_bidclose"
+document_tmp_time_bidopen = "time_bidopen"
+document_tmp_time_completion = "time_completion"
+document_tmp_time_earnest_money_end = "time_earnest_money_end"
+document_tmp_time_earnest_money_start = "time_earnest_money_start"
+document_tmp_time_get_file_end = "time_get_file_end"
+document_tmp_time_get_file_start = "time_get_file_start"
+document_tmp_time_publicity_end = "time_publicity_end"
+document_tmp_time_publicity_start = "time_publicity_start"
+document_tmp_time_registration_end = "time_registration_end"
+document_tmp_time_registration_start = "time_registration_start"
+document_tmp_time_release = "time_release"
+
 class Document_tmp(BaseModel):
 
     def __init__(self,_dict):

+ 405 - 99
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py

@@ -3,7 +3,29 @@ from BaseDataMaintenance.model.ots.BaseModel import BaseModel
 from BaseDataMaintenance.common.Utils import *
 from BaseDataMaintenance.primarykey.startSnowflake import get_guid
 import json
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from tablestore import *
 
+from BaseDataMaintenance.maintenance.dataflow import Dataflow_dumplicate
+
+ots_client = getConnect_ots()
+
+stage_priority_dict = {
+    "立项阶段": "立项阶段",
+    "可研阶段": "可研阶段",
+    "环评阶段": "环评阶段",
+    "稳评阶段": "环评阶段",
+    "咨询阶段": "环评阶段",
+    "造价阶段": "环评阶段",
+    "设计阶段": "设计阶段",
+    "勘察阶段": "设计阶段",
+    "EPC总承包":"设计阶段",
+    "施工图审": "施工准备",
+    "施工许可": "施工准备",
+    "施工准备": "施工准备",
+    "施工在建": "施工在建",
+    "竣工阶段": "竣工阶段"
+}
 
 class proposedBuilding_tmp(BaseModel):
 
@@ -12,85 +34,363 @@ class proposedBuilding_tmp(BaseModel):
         for k,v in _dict.items():
             self.setValue(k,v,True)
         self.table_name = "proposedBuilding_tmp"
+        self.dict_enterprise = {}
+        self.dict_document = {}
+        self.data_flow = Dataflow_dumplicate()
 
     def getPrimary_keys(self):
         return ["uuid"]
 
 
-    def getFollows(self,list_group):
-        dict_stage = {}
-        for _group in list_group:
-            _stage = _group["stage"]
-            if _stage not in dict_stage:
-                dict_stage[_stage] = []
-            dict_stage[_stage].append(_group)
-        LIST_STEGES = ["施工在建","施工准备","环评阶段","设计阶段"]
-        for k,v in dict_stage.items():
-            v.sort(key=lambda x:x.get("page_time",""),reverse=True)
-        list_follows = []
-        last_stage = ""
-        last_pageTime = ""
-        for _STAGE_i in range(len(LIST_STEGES)):
-            _STAGE = LIST_STEGES[_STAGE_i]
-            for _group in dict_stage.get(_STAGE,[]):
-                current_stage = _group.get("stage","")
-                current_pageTime = _group.get("page_time","")
-                if last_stage=="":
-                    last_stage = current_stage
-                    last_pageTime = current_pageTime
-                if last_stage in LIST_STEGES[:_STAGE_i]:
-                    continue
-                if current_pageTime>last_pageTime:
-                    continue
-                last_stage = current_stage
-                last_pageTime = current_pageTime
-                list_follows.append(_group)
-        list_follows.reverse()
-        return list_follows
-
-
-    def getContacts(self,ots_client,_group):
-        _contacts = []
-        enterprise_dict = self.search(ots_client,"enterprise",[("name",_group["tenderee"])],["province","contacts","address"])
+    # def getFollows(self,list_group):
+    #
+    #
+    #
+    #     dict_stage = {}
+    #
+    #
+    #     #sort
+    #     list_group.sort(key=lambda x:x.get("page_time",""))
+    #
+    #
+    #     list_follows = []
+    #     #dumplicate
+    #     set_stage_page_time = set()
+    #     for _group in list_group:
+    #         current_stage = _group.get("stage","")
+    #         current_pageTime = _group.get("page_time","")
+    #         if current_stage=="" or current_pageTime=="":
+    #             continue
+    #         _key = "%s-%s"%(str(current_stage),str(current_pageTime))
+    #         if _key in set_stage_page_time:
+    #             continue
+    #         list_follows.append(_group)
+    #         set_stage_page_time.add(_key)
+    #     return list_follows
+
+
+        # for _group in list_group:
+        #     _stage = _group["stage"]
+        #     if _stage not in dict_stage:
+        #         dict_stage[_stage] = []
+        #     dict_stage[_stage].append(_group)
+        #
+        #
+        #
+        # LIST_STEGES = ["施工在建","施工准备","环评阶段","设计阶段"]
+        # # LIST_STEGES = ["立项","可研","环评","稳评","其他咨询服务","造价","勘察设计","施工图审查","施工许可证","施工准备","施工在建","竣工"]
+        # for k,v in dict_stage.items():
+        #     v.sort(key=lambda x:x.get("page_time",""),reverse=True)
+        # list_follows = []
+        # last_stage = ""
+        # last_pageTime = ""
+        # for _STAGE_i in range(len(LIST_STEGES)):
+        #     _STAGE = LIST_STEGES[_STAGE_i]
+        #     for _group in dict_stage.get(_STAGE,[]):
+        #         current_stage = _group.get("stage","")
+        #         current_pageTime = _group.get("page_time","")
+        #         if last_stage=="":
+        #             last_stage = current_stage
+        #             last_pageTime = current_pageTime
+        #         if last_stage in LIST_STEGES[:_STAGE_i]:
+        #             continue
+        #         if current_pageTime>last_pageTime:
+        #             continue
+        #         last_stage = current_stage
+        #         last_pageTime = current_pageTime
+        #         list_follows.append(_group)
+        # list_follows.reverse()
+        # return list_follows
+
+
+    def getEnterprise(self,enterprise_name,columns):
+        _time = time.time()
+        if enterprise_name in self.dict_enterprise:
+            return self.dict_enterprise[enterprise_name]
+        enterprise_dict = self.search(ots_client,"enterprise",[("name",str(enterprise_name))],columns)
+        self.dict_enterprise[enterprise_name] = enterprise_dict
+        log("search Enterprise %s takes %.2f"%(str(enterprise_name),time.time()-_time))
+        return enterprise_dict
+
+    def getStage(self,_stage):
 
+        return stage_priority_dict.get(_stage)
+
+    def addContact(self,_contacts,_type,enterprise_name):
+        #add contacts from enterprise table
+
+        enterprise_dict = self.getEnterprise(enterprise_name,["province","contacts","address","contacts1","contacts2","contacts3","contacts4","contacts5"])
+
+        _time = time.time()
+        if enterprise_dict is None:
+            enterprise_dict = {}
         cellphone = ""
-        phone = ""
-        if isCellphone(_group["tenderee_phone"]):
-            cellphone = _group["tenderee_phone"]
-        else:
-            phone = _group["tenderee_phone"]
-
-        _dict_contact = {"address":enterprise_dict.get("address",""),"cellphone":cellphone,"phone":phone,
-                         "company_name":_group["tenderee"],"contact_name":_group["tenderee_contact"],
-                         "type":"业主单位","id":get_guid()}
-        _dict_contact = popNoneFromDict(_dict_contact)
-        _contacts.append(_dict_contact)
-        _docid = int(_group.get("docid"))
-        document_dict = self.search(ots_client,"document",[("partitionkey",_docid%500+1),("docid",_docid)],["sub_docs_json"])
-        win_tenderer = ""
-        if document_dict is not None:
-            sub_docs_json = document_dict.get("sub_docs_json",'[{}]')
-            for sub_docs in json.loads(sub_docs_json):
-                win_tenderer = sub_docs.get("win_tenderer","")
-        if win_tenderer!="":
-            enterprise_dict = self.search(ots_client,"enterprise",[("name",win_tenderer)],["province","contacts","address"])
-            if enterprise_dict is not None:
-                win_contacts = json.loads(enterprise_dict.get("contacts","[]"))
-                for _dict in win_contacts:
-                    cellphone = ""
-                    contact_name = ""
-                    phone = ""
-                    if _dict.get("contact_person","")!="" and (_dict.get("mobile_no","")!="" or _dict.get("phone_no","")!=""):
-                        contact_name = _dict.get("contact_person","")
-                        cellphone = _dict.get("mobile_no","")
-                        phone = _dict.get("phone_no","")
-                        _dict_contact = {"address":enterprise_dict.get("address",""),"cellphone":cellphone,
-                                         "phone":phone,"company_name":win_tenderer,"contact_name":contact_name,
-                                         "type":"%s单位"%(_group.get("stage","")[:2]),"id":get_guid()}
+
+        json_contact = "".join([enterprise_dict.get("contacts%d"%_i,"") for _i in range(1,6)])
+        json_contact = enterprise_dict.get("contacts","")+json_contact
+        if json_contact!="":
+            tenderee_contacts = json.loads(json_contact)
+            tenderee_contacts.sort(key=lambda x:x.get("score",0),reverse=True)
+            _count = 0
+            for _dict in tenderee_contacts[:10]:
+                cellphone = ""
+                contact_name = ""
+                phone = ""
+                if _dict.get("contact_person","")!="" and (_dict.get("mobile_no","")!="" or _dict.get("phone_no","")!=""):
+                    contact_name = _dict.get("contact_person","")
+                    cellphone = _dict.get("mobile_no","")
+                    phone = _dict.get("phone_no","")
+                    _dict_contact = {"address":enterprise_dict.get("address",""),"cellphone":cellphone,
+                                     "phone":phone,"company_name":enterprise_name,"contact_name":contact_name,
+                                     "type":_type,"id":get_guid()}
+                    _dict_contact = popNoneFromDict(_dict_contact)
+                    _count += 1
+                    _contacts.append(_dict_contact)
+                    if _count>=2:
+                        break
+            log("extract contact %s takes %.2f length:%d"%(enterprise_name,time.time()-_time,len(tenderee_contacts)))
+
+
+    def getContacts(self,ots_client,list_group,set_enterprise,_contacts):
+
+
+        set_contact = set()
+
+        for _group in list_group:
+            # add contacts of tenderee
+            _tenderee = _group.get("tenderee")
+            if _tenderee is not None and _tenderee not in set_enterprise:
+
+                set_enterprise.add(_tenderee)
+
+                enterprise_dict = self.getEnterprise(_tenderee,["province","contacts","address","contacts1","contacts2","contacts3","contacts4","contacts5"])
+
+                _time = time.time()
+                if enterprise_dict is None:
+                    enterprise_dict = {}
+                cellphone = ""
+                phone = ""
+                if isCellphone(_group.get("tenderee_phone","")):
+                    cellphone = _group.get("tenderee_phone","")
+                else:
+                    phone = _group.get("tenderee_phone","")
+
+                if cellphone!="" or phone!="":
+
+                    _dict_contact = {"address":enterprise_dict.get("address",""),"cellphone":cellphone,"phone":phone,
+                                     "company_name":_group["tenderee"],"contact_name":_group["tenderee_contact"],
+                                     "type":"业主单位","id":get_guid()}
+                    _dict_contact = popNoneFromDict(_dict_contact)
+
+                    _contacts.append(_dict_contact)
+
+                self.addContact(_contacts,"业主单位",_tenderee)
+
+
+            _agency = _group.get("agency")
+            if _agency is not None and _agency!=_tenderee and  _agency not in set_enterprise:
+
+                set_enterprise.add(_agency)
+                self.addContact(_contacts,"代理单位",_agency)
+
+            # add contacts of win_tenderee
+
+            _time = time.time()
+            _docid = _group.get("docid")
+            if _docid is not None:
+                _docid = int(_docid)
+                document_dict = self.getDocumentWithDocid(_docid,["sub_docs_json","tenderee","tenderee_contact","tenderee_phone","agency","agency_contact","agency_phone"])
+            else:
+                document_dict = None
+            win_tenderer = ""
+            log("search document  %s takes %.2f"%(str(_docid),time.time()-_time))
+
+            if document_dict is not None:
+                _tenderee = document_dict.get("tenderee","")
+                if _tenderee!="":
+                    enterprise_dict = self.getEnterprise(_tenderee,["province","contacts","address","contacts1","contacts2","contacts3","contacts4","contacts5"])
+                    if enterprise_dict is None:
+                        enterprise_dict = {}
+                    tenderee_contact = document_dict.get("tenderee_contact","")
+                    tenderee_phone = document_dict.get("tenderee_phone","")
+                    if tenderee_phone!="":
+                        cellphone = ""
+                        phone = ""
+                        if isCellphone(tenderee_phone):
+                            cellphone = tenderee_phone
+                        else:
+                            phone = tenderee_phone
+                        _dict_contact = {"address":enterprise_dict.get("address",""),"cellphone":cellphone,"phone":phone,
+                                         "company_name":_tenderee,"contact_name":tenderee_contact,
+                                         "type":"业主单位","id":get_guid()}
+                        _dict_contact = popNoneFromDict(_dict_contact)
+                        _contacts.append(_dict_contact)
+
+                _agency = document_dict.get("agency","")
+                if _tenderee!="":
+                    enterprise_dict = self.getEnterprise(_tenderee,["province","contacts","address","contacts1","contacts2","contacts3","contacts4","contacts5"])
+                    if enterprise_dict is None:
+                        enterprise_dict = {}
+                    tenderee_contact = document_dict.get("agency_contact","")
+                    tenderee_phone = document_dict.get("agency_phone","")
+                    if tenderee_phone!="":
+                        cellphone = ""
+                        phone = ""
+                        if isCellphone(tenderee_phone):
+                            cellphone = tenderee_phone
+                        else:
+                            phone = tenderee_phone
+                        _dict_contact = {"address":enterprise_dict.get("address",""),"cellphone":cellphone,"phone":phone,
+                                         "company_name":_agency,"contact_name":tenderee_contact,
+                                         "type":"代理单位","id":get_guid()}
                         _dict_contact = popNoneFromDict(_dict_contact)
                         _contacts.append(_dict_contact)
-        return _contacts
 
+                    sub_docs_json = document_dict.get("sub_docs_json",'[{}]')
+                    for sub_docs in json.loads(sub_docs_json):
+                        if win_tenderer=="":
+                            win_tenderer = sub_docs.get("win_tenderer","")
+
+            if win_tenderer!="" and win_tenderer not in set_enterprise:
+                set_enterprise.add(win_tenderer)
+
+                self.addContact(_contacts,"%s单位"%(_group.get("stage","").replace("阶段","")),win_tenderer)
+        tenderee_count = 0
+        list_contact = []
+        for _c in _contacts:
+            _line = "%s-%s"%(_c.get("cellphone",""),_c.get("phone",""))
+            if _c.get("cellphone","")=="" and _c.get("phone","")=="":
+                continue
+            if _c.get("type","")=="业主单位":
+                tenderee_count += 1
+            if _line not in set_contact:
+                list_contact.append(_c)
+            set_contact.add(_line)
+        return list_contact
+
+    def getFollows(self,list_group,_contacts,set_enterprise):
+        set_follow = set()
+        _version_index = 0
+        set_project_uuid = set()
+        list_follow = []
+        for _group in list_group:
+            _docid = _group.get("docid")
+            _stage = _group["stage"]
+            follow_time = _group["page_time"]
+            progress_remark = ""
+            if _docid is not None and _docid!="":
+                document_dict = self.getDocumentWithDocid(_docid,["page_time","docchannel","status","sub_docs_json"])
+                win_tenderer = ""
+
+                if document_dict is not None:
+
+                    for sub_docs in json.loads(document_dict.get("sub_docs_json","[{}]")):
+                        if win_tenderer=="":
+                            win_tenderer = sub_docs.get("win_tenderer","")
+
+                    _status = document_dict.get("status")
+                    remain_docid = _docid
+                    if not(_status>=201 and _status<=300):
+                        remain_docid = self.data_flow.getRemainDoc(remain_docid)
+                        log("remain_docid from %s to %s"%(str(_docid),str(remain_docid)))
+                    project_flag = False
+                    if remain_docid is not None:
+                        _project = self.getProjectWithDocid(remain_docid,["zhao_biao_page_time","zhong_biao_page_time","win_tenderer","tenderee"])
+
+                        if _project is not None:
+                            _uuid = _project.get("uuid")
+                            if _uuid in set_project_uuid:
+                                continue
+                            progress_remark = ""
+                            if _project.get("zhao_biao_page_time") is not None:
+                                follow_time = max(follow_time,_project.get("zhao_biao_page_time"))
+                                progress_remark += "%s,项目处于%s的招标阶段;"%(_project.get("zhao_biao_page_time"),_stage)
+                            if _project.get("zhong_biao_page_time") is not None:
+                                follow_time = max(follow_time,_project.get("zhong_biao_page_time"))
+                                progress_remark += "截止%s"%(_project.get("zhong_biao_page_time"))
+                            if _project.get("win_tenderer") is not None:
+                                progress_remark += "确定%s中标人为%s"%(_stage,_project.get("win_tenderer"))
+                            else:
+                                progress_remark += "暂未确定中标人。"
+
+                            if _project.get("win_tenderer") is not None:
+                                if _project.get("win_tenderer") not in set_enterprise:
+                                    self.addContact(_contacts,"%s单位"%(_group.get("stage","").replace("阶段","")),_project.get("win_tenderer"))
+                                    set_enterprise.add(_project.get("win_tenderer"))
+
+                            if _project.get("tenderee") is not None:
+                                if _project.get("tenderee") not in set_enterprise:
+                                    self.addContact(_contacts,"业主单位",_project.get("tenderee"))
+                                    set_enterprise.add(_project.get("tenderee"))
+
+                            project_flag = True
+                            set_project_uuid.add(_uuid)
+                if not project_flag:
+                    if document_dict.get("docchannel") in (101,118,119,120):
+                        progress_remark = "截止%s,项目处于%s;"%(_group["page_time"],str(_stage))
+                        if win_tenderer!="":
+                            progress_remark += "确定%s中标人为%s"%(_stage,win_tenderer)
+                    else:
+                        progress_remark = "截止%s,项目处于%s的招标阶段;"%(_group["page_time"],str(_stage))
+            else:
+                progress_remark = "截止%s,项目处于%s;"%(_group["page_time"],str(_stage))
+
+            _key = "%s-%s"%(_group["page_time"],str(_stage))
+            if _key in set_follow:
+                continue
+            set_follow.add(_key)
+
+
+            _version_index += 1
+            _follow = {"id":get_guid(),"crtime":getCurrent_date(),"mdate":"%s 00:00:00"%follow_time,
+             "progress":_stage,"progress_remark":progress_remark,
+             "version":"跟进%d"%(_version_index)}
+
+            list_follow.append(_follow)
+        list_follow.sort(key=lambda x:x.get("mdate",""))
+        _version_index = 0
+        for _follow in list_follow:
+            _version_index += 1
+            _follow["version"] = "跟进%d"%(_version_index)
+        return list_follow
+
+
+
+
+    def getId(self,list_docid,list_spid):
+
+        #
+        # list_should_query = []
+        # for _docid in list_docid:
+        #     _q = TermQuery("docids",_docid)
+        #     list_should_query.append(_q)
+        # for _spid in list_spid:
+        #     _q = TermQuery("spids",_spid)
+        #     list_should_query.append(_q)
+
+        return get_guid()
+
+    def getDocumentWithDocid(self,docid,columns=["page_time","docchannel","sub_docs_json"]):
+        if docid is None or docid=="":
+            return None
+        if docid in self.dict_document:
+            return self.dict_document.get(docid)
+        partitionkey = int(docid)%500+1
+        document_dict = self.search(ots_client,"document",[("partitionkey",partitionkey),("docid",int(docid))],columns)
+        self.dict_document[docid] = document_dict
+        return document_dict
+
+    def getProjectWithDocid(self,docid,columns):
+
+        bool_query = BoolQuery(must_queries=[TermQuery("docids",docid)])
+
+        rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
+                                                                       SearchQuery(bool_query,get_total_count=True,limit=10),
+                                                                       columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+        if len(rows)>0:
+            list_data = getRow_ots(rows)
+            return list_data[0]
+        return None
 
 
     def toDesigned_project(self,ots_client):
@@ -101,7 +401,7 @@ class proposedBuilding_tmp(BaseModel):
             if "page_time" not in _group or _group["page_time"] is None:
                 _group["page_time"] = ""
         list_group.sort(key=lambda x:x["page_time"])
-        set_docid = set()
+
         set_contacts = set()
         list_contacts = []
         set_follows = set()
@@ -122,15 +422,23 @@ class proposedBuilding_tmp(BaseModel):
         progress = ""
         des_project_type = "30"
         status = "1"
-        id = get_guid()
-        partitionkey = id%200+1
+
         update_status = "0"
         update_time = getCurrent_date("%Y-%m-%d")
         project_code = ""
         project_type = ""
         area = ""
-        for _group in self.getFollows(list_group):
-            set_docid.add(str(_group["docid"]))
+
+        set_docid = set()
+        set_spid = set()
+        set_enterprise = set()
+        for _group in list_group:
+            _docid = _group.get("docid")
+            _spid = _group.get("shenpi_id")
+            if _docid is not None:
+                set_docid.add(str(_docid))
+            if _spid is not None:
+                set_spid.add(str(_spid))
             if floor_space=="":
                 floor_space = _group.get("proportion","")
                 covered_area = floor_space
@@ -153,43 +461,41 @@ class proposedBuilding_tmp(BaseModel):
             if area=="":
                 area = _group.get("city","")
 
-            list_contacts.extend(self.getContacts(ots_client,_group))
-
-            _follow = "%s-%s"%(_group["stage"],_group["page_time"])
-            if _follow not in set_follows:
-                list_follows.append({"id":get_guid(),"crtime":getCurrent_date(),"mdate":"%s 00:00:00"%_group["page_time"],
-                                     "progress":_group["stage"],"progress_remark":"截止%s,该项目处于%s阶段"%(_group["page_time"],_group["stage"]),
-                                     "version":"跟进%d"%(_version_index)})
-                set_follows.add(_follow)
-                project_follow = "跟进%d"%(_version_index)
-                _version_index += 1
-                progress = _group["stage"]
+            # _follow = "%s-%s"%(_group["stage"],_group["page_time"])
+            # if _follow not in set_follows:
+            #     list_follows.append({"id":get_guid(),"crtime":getCurrent_date(),"mdate":"%s 00:00:00"%_group["page_time"],
+            #                          "progress":_group["stage"],"progress_remark":"截止%s,该项目处于%s"%(_group["page_time"],str(self.getStage(_group.get("stage","")))),
+            #                          "version":"跟进%d"%(_version_index)})
+            #     set_follows.add(_follow)
+            #     project_follow = "跟进%d"%(_version_index)
+            #     _version_index += 1
+            progress = str(self.getStage(_group.get("stage","")))
+
         legal_contacts = []
-        for _c in list_contacts:
-            _line = "%s-%s-%s-%s"%(_c.get("company_name",""),_c.get("contact_name",""),_c.get("cellphone",""),_c.get("phone",""))
-            if _line not in set_follows:
-                legal_contacts.append(_c)
-            set_follows.add(_line)
-        project_dict = {"crtime":crtime,"floor_space":floor_space,"project_address":project_address,
+        list_follows = self.getFollows(list_group,legal_contacts,set_enterprise)
+        self.getContacts(ots_client,list_group,set_enterprise,legal_contacts)
+
+        # get the key,if exists then search else generate
+        id = self.getId(list(set_docid),list(set_spid))
+        partitionkey = id%200+1
+        project_dict = {"partitionkey":partitionkey,"id":id,
+                        "crtime":crtime,"floor_space":floor_space,"project_address":project_address,
                         "begintime":begintime,"endtime":endtime,"project_description":project_description,
                         "project_name":project_name,"ordinary_name":ordinary_name,"high_project_name":high_project_name,
                         "project_follow":project_follow,"page_time":page_time,"progress":progress,"contacts":json.dumps(legal_contacts,ensure_ascii=False),
-                        "follows":json.dumps(list_follows,ensure_ascii=False),"partitionkey":partitionkey,"id":id,
-                        "docids":",".join(list(set_docid)),"des_project_type":des_project_type,"status":status,"covered_area":covered_area,
+                        "follows":json.dumps(list_follows,ensure_ascii=False),
+                        "docids":",".join(list(set_docid)),"spids":",".join(list(set_spid)),"des_project_type":des_project_type,"status":status,"covered_area":covered_area,
                         "update_status":update_status,"update_time":update_time,"project_code":project_code,
-                        "project_type":project_type,"area":area}
+                        "project_type":project_type,"area":area,
+                        "full_text":"%s%s"%(str(project_name),str(project_description))}
         return project_dict
 
 
 
 
 
-
-
-
-
-
 if __name__=="__main__":
+
     a = proposedBuilding_tmp("1",'2',"3")
     print(dir(a))
     print(a.getAttribute_keys())

+ 66 - 0
BaseDataMaintenance/model/ots/t_shen_pi_xiang_mu.py

@@ -0,0 +1,66 @@
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+from BaseDataMaintenance.common.Utils import *
+import json
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from tablestore import *
+
+ots_client = getConnect_ots()
+
+class t_shen_pi_xiang_mu(BaseModel):
+
+    def __init__(self,_dict):
+
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "t_shen_pi_xiang_mu"
+
+    def getPrimary_keys(self):
+        return ["id"]
+
+
+
+def delete_dumplicate():
+    from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+
+    def produce(task_queue,ots_client):
+        bool_query = BoolQuery(must_queries=[TermQuery("web_source_no","to_delete")])
+        rows,next_token,total_count,is_all_succeed = ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
+                                                                       SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("id")]),get_total_count=True,limit=100),
+                                                                       columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+        list_data = getRow_ots(rows)
+        _count = len(list_data)
+        for _data in list_data:
+            task_queue.put(_data)
+        log("total_count:%d"%(total_count))
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
+                                                                           SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                           columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+            list_data = getRow_ots(rows)
+            _count += len(list_data)
+            for _data in list_data:
+                task_queue.put(_data)
+            if _count>=10000:
+                break
+
+    def comsume(item,result_queue):
+        a = t_shen_pi_xiang_mu(item)
+        a.delete_row(ots_client)
+
+
+    from queue import Queue
+    task_queue= Queue()
+
+    while 1:
+
+        produce(task_queue,ots_client)
+        if task_queue.qsize()>0:
+            mt = MultiThreadHandler(task_queue,comsume,None,30)
+            mt.run()
+
+
+if __name__=="__main__":
+
+    delete_dumplicate()
+

+ 55 - 0
BaseDataMaintenance/readme/start.md

@@ -0,0 +1,55 @@
+
+
+#11022启动同步定时器
+#切换目录
+cd /data/python
+#激活环境
+source activate py37
+#先关闭
+ps -ef | grep start_dataflow_init | grep -v grep | cut -c 9-16| xargs kill -9
+#启动
+nohup python BaseDataMaintenance/start_dataflow_init.py >> flow_init.log &
+
+
+#11022启动要素提取定时器
+#切换目录
+cd /data/python
+#激活环境
+source activate py37
+#先关闭
+ps -ef | grep start_dataflow_extract | grep -v grep | cut -c 9-16| xargs kill -9
+#启动
+nohup python BaseDataMaintenance/start_dataflow_extract.py >> flow_extract.log &
+
+#11022启动监控程序
+#切换目录
+cd /data/python
+#激活环境
+source activate py37
+#先关闭
+ps -ef | grep start_dataflow_monitor | grep -v grep | cut -c 9-16| xargs kill -9
+#启动
+nohup python BaseDataMaintenance/start_dataflow_monitor.py >> flow_monitor.log &
+
+
+===================================
+#19022启动监控程序
+#切换目录
+cd /data/python
+#激活环境
+source activate py37
+#先关闭
+ps -ef | grep start_dataflow_attach_monitor | grep -v grep | cut -c 9-16| xargs kill -9
+#启动
+nohup python BaseDataMaintenance/start_dataflow_attach_monitor.py >> flow_monitor.log &
+
+#19022启动附件识别程序
+#切换目录
+cd /data/python
+#激活环境
+source activate py37
+#先关闭
+ps -ef | grep start_dataflow_attachment | grep -v grep | cut -c 9-16| xargs kill -9
+#启动
+nohup python BaseDataMaintenance/start_dataflow_attachment.py >> flow_attachment.log &
+

+ 10 - 0
BaseDataMaintenance/start_dataflow_attach_monitor.py

@@ -0,0 +1,10 @@
+
+
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/..")
+from BaseDataMaintenance.dataMonitor.data_monitor import *
+
+if __name__ == '__main__':
+    bdm = BaseDataMonitor()
+    bdm.start_attach_monitor()

+ 2 - 1
BaseDataMaintenance/start_dataflow_dumplicate.py

@@ -6,5 +6,6 @@ sys.path.append(os.path.dirname(__file__)+"/..")
 from BaseDataMaintenance.maintenance.dataflow import *
 
 if __name__ == '__main__':
-    flow = Dataflow()
+    # flow = Dataflow()
+    flow = Dataflow_dumplicate()
     flow.start_flow_dumplicate()

Daži faili netika attēloti, jo izmaiņu fails ir pārāk liels