Переглянути джерело

修复去重有时卡死的问题,产品匹配优化,周期项目补充联系人

luojiehua 1 рік тому
батько
коміт
297a9f1fde

+ 21 - 5
BaseDataMaintenance/common/multiThread.py

@@ -32,7 +32,7 @@ def stop_thread(thread):
 
 class _taskHandler(threading.Thread):
 
-    def __init__(self,task_queue,task_handler,result_queue,need_stop=True,*args,**kwargs):
+    def __init__(self,task_queue,task_handler,result_queue,need_stop=True,timeout=60,*args,**kwargs):
         threading.Thread.__init__(self)
         # Process.__init__(self)
         self.task_queue = task_queue
@@ -41,13 +41,28 @@ class _taskHandler(threading.Thread):
         self.need_stop = need_stop
         self.args = args
         self.kwargs = kwargs
+        self.timeout= timeout
 
     def run(self):
         while(True):
             try:
-                logging.debug("handler task queue size is %d need_stop %s"%(self.task_queue.qsize(),str(self.need_stop)))
-                item = self.task_queue.get(True,timeout=1)
+                logging.info("handler task queue size is %d need_stop %s thread_id:%d"%(self.task_queue.qsize(),str(self.need_stop),threading.get_ident()))
+                item = self.task_queue.get(True,timeout=5)
+
                 self.task_handler(item,self.result_queue,*self.args,**self.kwargs)
+
+                # t = threading.Thread(target=self.task_handler,args=(item,self.result_queue,*self.args,self.kwargs))
+                # t.start()
+                # start_time = time.time()
+                # while 1:
+                #     if  not t.is_alive():
+                #         break
+                #     if time.time()-start_time>self.timeout:
+                #         logging.info("thread %d run to long time ,killed"%(threading.get_ident()))
+                #         stop_thread(t)
+                #         break
+                #     time.sleep(0.1)
+
                 # self.task_queue.task_done()
             except queue.Empty as e:
                 # logging.info("%s thread is done"%(self.name))
@@ -59,7 +74,7 @@ class _taskHandler(threading.Thread):
 
 class MultiThreadHandler(object):
 
-    def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,restart=False,*args,**kwargs):
+    def __init__(self,task_queue,task_handler,result_queue,thread_count=1,process_count=1,need_stop=True,restart=False,timeout=60,*args,**kwargs):
         self.task_queue = task_queue
         self.task_handler = task_handler
         self.result_queue = result_queue
@@ -70,6 +85,7 @@ class MultiThreadHandler(object):
         self.kwargs = kwargs
         self.restart = restart
         self.need_stop = need_stop
+        self.timeout = timeout
 
     def getThreadStatus(self):
         _count = 0
@@ -79,7 +95,7 @@ class MultiThreadHandler(object):
                 _count += 1
             else:
                 if self.restart:
-                    _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,*self.args,**self.kwargs)
+                    _t = _taskHandler(self.task_queue,self.task_handler,self.result_queue,self.need_stop,self.timeout,*self.args,**self.kwargs)
                     _t.start()
                     restart += 1
         logging.debug("thread status alive:%d restart:%d total:%d need_stop %s"%(_count,restart,len(self.list_thread),str(self.need_stop)))

+ 2 - 1
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -43,7 +43,8 @@ class BaseDataMonitor():
         self.current_path = os.path.dirname(__file__)
 
     def cmd_execute(self,_cmd):
-        return os.popen(_cmd).read()
+        with os.popen(_cmd) as f:
+            return f.read()
 
     def get_last_tenmin_time(self,nums=15):
         current_time = getCurrent_date(format="%Y-%m-%d %H:%M:%S")

+ 0 - 14
BaseDataMaintenance/maintenance/3.py

@@ -1,14 +0,0 @@
-
-objectPath = "a//b/c"
-a = str(objectPath).replace("//","/")
-print(a)
-
-import pyautogui
-
-import time
-
-start_time =  time.time()
-for _ in range(10000):
-    pyautogui.size()
-print(time.time()-start_time)
-print(pyautogui.size())

+ 61 - 50
BaseDataMaintenance/maintenance/dataflow.py

@@ -1624,6 +1624,13 @@ class Dataflow():
         producer()
         comsumer()
 
+    def start_flow_dumplicate(self):
+        schedule = BlockingScheduler()
+        schedule.add_job(self.flow_remove,"cron",hour="20")
+        schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
+        schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
+        schedule.start()
+
     def flow_remove_project_tmp(self,process_count=flow_process_count):
 
         def producer():
@@ -1658,13 +1665,6 @@ class Dataflow():
         producer()
         comsumer()
 
-    def start_flow_dumplicate(self):
-        schedule = BlockingScheduler()
-        schedule.add_job(self.flow_remove,"cron",hour="20")
-        schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
-        schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
-        schedule.start()
-
     def start_flow_merge(self):
         schedule = BlockingScheduler()
         schedule.add_job(self.flow_merge,"cron",second="*/10")
@@ -2794,21 +2794,31 @@ class Dataflow_dumplicate(Dataflow):
 
         return list_rules,table_name,table_index
 
-
-    def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
-        def producer(columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document]):
-            q_size = self.queue_dumplicate.qsize()
-            log("dumplicate queue size %d"%(q_size))
-            if q_size>flow_process_count//3:
-                return
-            bool_query = BoolQuery(must_queries=[
-                RangeQuery(document_tmp_status,*status_from,True,True),
-                # TermQuery("docid",271983871)
-            ])
+    def producer_flow_dumplicate(self,process_count,status_from,columns=[document_tmp_status,document_tmp_save,document_tmp_page_time,document_tmp_docchannel,document_tmp_tenderee,document_tmp_agency,document_tmp_doctitle,document_tmp_sub_docs_json,document_tmp_extract_json,document_attachment_extract_status,document_update_document]):
+        q_size = self.queue_dumplicate.qsize()
+        log("dumplicate queue size %d"%(q_size))
+        if q_size>process_count//3:
+            return
+        bool_query = BoolQuery(must_queries=[
+            RangeQuery(document_tmp_status,*status_from,True,True),
+            # TermQuery("docid",271983871)
+        ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_update_document,SortOrder.DESC),FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+                                                                            ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        log("flow_dumplicate producer total_count:%d"%total_count)
+        list_dict = getRow_ots(rows)
+        for _dict in list_dict:
+            docid = _dict.get(document_tmp_docid)
+            if docid in self.dumplicate_set:
+                continue
+            self.dumplicate_set.add(docid)
+            self.queue_dumplicate.put(_dict)
+        _count = len(list_dict)
+        while next_token and _count<process_count:
             rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(document_update_document,SortOrder.DESC),FieldSort("docid",SortOrder.DESC)]),limit=100,get_total_count=True),
+                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                 ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
-            log("flow_dumplicate producer total_count:%d"%total_count)
             list_dict = getRow_ots(rows)
             for _dict in list_dict:
                 docid = _dict.get(document_tmp_docid)
@@ -2816,37 +2826,28 @@ class Dataflow_dumplicate(Dataflow):
                     continue
                 self.dumplicate_set.add(docid)
                 self.queue_dumplicate.put(_dict)
-            _count = len(list_dict)
-            while next_token and _count<flow_process_count:
-                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_tmp","document_tmp_index",
-                                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                                    ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
-                list_dict = getRow_ots(rows)
-                for _dict in list_dict:
-                    docid = _dict.get(document_tmp_docid)
-                    if docid in self.dumplicate_set:
-                        continue
-                    self.dumplicate_set.add(docid)
-                    self.queue_dumplicate.put(_dict)
-                _count += len(list_dict)
+            _count += len(list_dict)
 
-            _l = list(self.dumplicate_set)
-            _l.sort(key=lambda x:x,reverse=True)
-            self.dumplicate_set = set(_l[:flow_process_count]) | set(_l[-flow_process_count:])
-        def comsumer():
-            mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
-            mt.run()
+        _l = list(self.dumplicate_set)
+        _l.sort(key=lambda x:x,reverse=True)
+        self.dumplicate_set = set(_l[:flow_process_count*2])
 
-        producer()
-        # comsumer()
+    def comsumer_flow_dumplicate(self):
+        mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,60,1,ots_client=self.ots_client)
+        mt.run()
+
+    def flow_dumplicate(self,process_count=flow_process_count,status_from=flow_dumplicate_status_from):
+
+        self.producer_flow_dumplicate(process_count=process_count,status_from=status_from)
+        # self.comsumer_flow_dumplicate()
 
     def flow_dumpcate_comsumer(self):
         from multiprocessing import Process
-        process_count = 2
-        thread_count = 30
+        process_count = 3
+        thread_count = 15
         list_process = []
         def start_thread():
-            mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,ots_client=self.ots_client)
+            mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,restart=True,timeout=600,ots_client=self.ots_client)
             mt.run()
 
         for _ in range(process_count):
@@ -2854,8 +2855,15 @@ class Dataflow_dumplicate(Dataflow):
             list_process.append(p)
         for p in list_process:
             p.start()
-        for p in list_process:
-            p.join()
+
+        while 1:
+            for _i in range(len(list_process)):
+                p = list_process[_i]
+                if not p.is_alive():
+                    p = Process(target=start_thread)
+                    list_process[_i] = p
+                    p.start()
+            time.sleep(1)
 
         # mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,40,1,ots_client=self.ots_client)
         # mt.run()
@@ -3817,7 +3825,7 @@ class Dataflow_dumplicate(Dataflow):
 
             _time = time.time()
             dumplicate_document_in_merge(list_projects)
-            log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
+            # log("dumplicate document %d takes:%.3f"%(len(list_projects),time.time()-_time))
 
             _time = time.time()
             project_json = to_project_json(list_projects)
@@ -3950,6 +3958,7 @@ class Dataflow_dumplicate(Dataflow):
             else:
                 dtmp.setValue(document_tmp_projects,self.merge_document_real(item,list_docids,table_name,dtmp.getProperties().get(document_tmp_save),flow_dumplicate_status_to,b_log),True)
             log("upgrate %s save:%s:docid:%d,final_list:%d,rules:%d,best_docid:%s,dmp_docid:%s"%(str(upgrade),dtmp.getProperties().get(document_tmp_save),item.get(document_tmp_docid),len(final_list),len(list_rules),str(best_docid),dmp_docid))
+
             if upgrade:
                 if table_name=="document_tmp":
                     self.changeSaveStatus(remove_list)
@@ -4036,9 +4045,11 @@ class Dataflow_dumplicate(Dataflow):
 
     def start_flow_dumplicate(self):
         schedule = BlockingScheduler()
-        schedule.add_job(self.flow_dumplicate,"cron",second="*/40")
-        schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/10")
+        schedule.add_job(self.flow_dumplicate,"cron",second="*/10")
+        schedule.add_job(self.flow_dumpcate_comsumer,"cron",second="*/30")
         schedule.add_job(self.bdm.monitor_dumplicate,"cron",minute="*/10")
+        schedule.add_job(self.flow_remove,"cron",hour="20")
+        schedule.add_job(self.flow_remove_project_tmp,"cron",hour="20")
         # schedule.add_job(self.fix_doc_which_not_in_project,"cron",minute="55")
         schedule.start()
 
@@ -4157,7 +4168,7 @@ if __name__ == '__main__':
     df_dump = Dataflow_dumplicate(start_delete_listener=False)
     # df_dump.start_flow_dumplicate()
     a = time.time()
-    df_dump.test_dumplicate(335604520)
+    df_dump.test_dumplicate(350493205)
     # df_dump.test_merge([292315564],[287890754])
     # df_dump.flow_remove_project_tmp()
     print("takes",time.time()-a)

+ 280 - 280
BaseDataMaintenance/maintenance/major_project/unionDocument.py

@@ -351,301 +351,301 @@ class MajorUnion():
             _projectDigest = "。".join(_find.split("。")[0:3])
         return _projectDigest
 
-    def comsumer(self):
 
-        def _handle(item,result_queue):
-            # _major = MajorProject(item)
-            # _major.setValue(major_project_status,1,True)
-            # _major.update_row(self.ots_client)
-            # return
-
-            project_name = item.get(major_project_project_name,"")
-
-
-            province = item.get(major_project_province,"")
-            _major = MajorProject(item)
-            if project_name=="":
-
-                #修改status
-                self.set_status_to_adult(_major)
-                _major.update_row(self.ots_client)
-                return
-
-            enterprise = Enterprise({"name":project_name})
-            if enterprise.exists_row(self.ots_client):
-                _major.setValue(major_project_industry,"",True)
-                _major.setValue(major_project_project_dynamics,"[]",True)
-                _major.setValue(major_project_project_dynamic_number,0,True)
-                _major.setValue(major_project_project_stage,"",True)
-                _major.setValue(major_project_latest_page_time,"",True)
-                _major.setValue(major_project_update_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
-                _major.setValue(major_project_all_project_dynamics,"[]]",True)
-                _major.setValue(major_project_all_project_dynamic_number,0,True)
-                _major.setValue(major_project_project_stages,",".join([]),True)
-                self.set_status_to_adult(_major)
-                _major.update_row(self.ots_client)
-                self.set_status_to_adult(_major)
-                _major.update_row(self.ots_client)
-                return
-
-
-            list_dynamics = []
-
-            if len(project_name)>6:
-                bool_query_sp = BoolQuery(must_queries=[
-                    MatchPhraseQuery("page_content",project_name),
-                    MatchPhraseQuery("province",province)
-                ])
-            else:
-                bool_query_sp = BoolQuery(must_queries=[
-                    MatchPhraseQuery("page_title",project_name),
-                    MatchPhraseQuery("province",province)
-                ])
-
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
-                                                                                SearchQuery(bool_query_sp,limit=100),
-                                                                                ColumnsToGet(["province","page_title","page_content","page_time","sp_type"],ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
-            dict_industry = {}
+    def _handle1(self,item,result_queue):
+        # _major = MajorProject(item)
+        # _major.setValue(major_project_status,1,True)
+        # _major.update_row(self.ots_client)
+        # return
 
-            list_area = []
-            list_construction = []
-            list_contact = []
-            dict_tenderee = {}
+        project_name = item.get(major_project_project_name,"")
 
-            for _data in list_data:
-                _content = _data.get("page_title","")+_data.get("page_content","")[:100]
-                _stage,_stage_keyword = extract_legal_stage(_content,self.stage_pattern,self.stage_priority_dict)
-                _dynamic = {"docid":str(_data.get("id")),
-                            "doctype":2,
-                            "doctitle":_data.get("page_title",""),
-                            "page_time":_data.get("page_time",""),
-                            "sp_type":str(_data.get("sp_type","")),
-                            "project_stage":_stage,
-                            "project_stage_keyword":_stage_keyword}
-                list_dynamics.append(_dynamic)
-                _industry = extract_industry(self.dict_keyword,self.keyword_pattern,_content)
-                if _industry not in dict_industry:
-                    dict_industry[_industry] = 0
-                dict_industry[_industry] += 1
-
-                _construction = self.extract_projectDigest(_content)
-                list_construction.append(_construction)
-
-                _area = _data.get(document_area,"")
-                _province = _data.get(document_province,"")
-                _city = _data.get(document_city,"")
-                _district = _data.get(document_district,"")
-
-                _score = 0
-                if _area!="":
-                    _score += 1
-                if _province!="":
-                    _score += 1
-                if _city!="":
-                    _score += 1
-                if _district!="":
-                    _score += 1
-                list_area.append({document_area:_area,
-                                  document_province:_province,
-                                  document_city:_city,
-                                  document_district:_district,
-                                  "score":_score})
-
-                _tenderee = _data.get(document_tenderee,"")
-                if _tenderee!="":
-                    if _tenderee not in dict_tenderee:
-                        dict_tenderee[_tenderee] = 0
-                    dict_tenderee[_tenderee] += 1
-
-                tenderee_contact = _data.get(document_tenderee_contact,"")
-                tenderee_phone = _data.get(document_tenderee_phone,"")
-                contact_score = 0
-                if tenderee_phone!="":
-                    if recog_likeType(tenderee_phone)=="mobile":
-                        contact_score += 2
-                    else:
-                        contact_score += 1
-                if tenderee_contact!="":
+
+        province = item.get(major_project_province,"")
+        _major = MajorProject(item)
+        if project_name=="":
+
+            #修改status
+            self.set_status_to_adult(_major)
+            _major.update_row(self.ots_client)
+            return
+
+        enterprise = Enterprise({"name":project_name})
+        if enterprise.exists_row(self.ots_client):
+            _major.setValue(major_project_industry,"",True)
+            _major.setValue(major_project_project_dynamics,"[]",True)
+            _major.setValue(major_project_project_dynamic_number,0,True)
+            _major.setValue(major_project_project_stage,"",True)
+            _major.setValue(major_project_latest_page_time,"",True)
+            _major.setValue(major_project_update_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
+            _major.setValue(major_project_all_project_dynamics,"[]]",True)
+            _major.setValue(major_project_all_project_dynamic_number,0,True)
+            _major.setValue(major_project_project_stages,",".join([]),True)
+            self.set_status_to_adult(_major)
+            _major.update_row(self.ots_client)
+            self.set_status_to_adult(_major)
+            _major.update_row(self.ots_client)
+            return
+
+
+        list_dynamics = []
+
+        if len(project_name)>6:
+            bool_query_sp = BoolQuery(must_queries=[
+                MatchPhraseQuery("page_content",project_name),
+                MatchPhraseQuery("province",province)
+            ])
+        else:
+            bool_query_sp = BoolQuery(must_queries=[
+                MatchPhraseQuery("page_title",project_name),
+                MatchPhraseQuery("province",province)
+            ])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("t_shen_pi_xiang_mu","t_shen_pi_xiang_mu_index",
+                                                                            SearchQuery(bool_query_sp,limit=100),
+                                                                            ColumnsToGet(["province","page_title","page_content","page_time","sp_type"],ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        dict_industry = {}
+
+        list_area = []
+        list_construction = []
+        list_contact = []
+        dict_tenderee = {}
+
+        for _data in list_data:
+            _content = _data.get("page_title","")+_data.get("page_content","")[:100]
+            _stage,_stage_keyword = extract_legal_stage(_content,self.stage_pattern,self.stage_priority_dict)
+            _dynamic = {"docid":str(_data.get("id")),
+                        "doctype":2,
+                        "doctitle":_data.get("page_title",""),
+                        "page_time":_data.get("page_time",""),
+                        "sp_type":str(_data.get("sp_type","")),
+                        "project_stage":_stage,
+                        "project_stage_keyword":_stage_keyword}
+            list_dynamics.append(_dynamic)
+            _industry = extract_industry(self.dict_keyword,self.keyword_pattern,_content)
+            if _industry not in dict_industry:
+                dict_industry[_industry] = 0
+            dict_industry[_industry] += 1
+
+            _construction = self.extract_projectDigest(_content)
+            list_construction.append(_construction)
+
+            _area = _data.get(document_area,"")
+            _province = _data.get(document_province,"")
+            _city = _data.get(document_city,"")
+            _district = _data.get(document_district,"")
+
+            _score = 0
+            if _area!="":
+                _score += 1
+            if _province!="":
+                _score += 1
+            if _city!="":
+                _score += 1
+            if _district!="":
+                _score += 1
+            list_area.append({document_area:_area,
+                              document_province:_province,
+                              document_city:_city,
+                              document_district:_district,
+                              "score":_score})
+
+            _tenderee = _data.get(document_tenderee,"")
+            if _tenderee!="":
+                if _tenderee not in dict_tenderee:
+                    dict_tenderee[_tenderee] = 0
+                dict_tenderee[_tenderee] += 1
+
+            tenderee_contact = _data.get(document_tenderee_contact,"")
+            tenderee_phone = _data.get(document_tenderee_phone,"")
+            contact_score = 0
+            if tenderee_phone!="":
+                if recog_likeType(tenderee_phone)=="mobile":
+                    contact_score += 2
+                else:
                     contact_score += 1
-                if contact_score>0:
-                    list_contact.append({document_tenderee_contact:tenderee_contact,
-                                         document_tenderee_phone:tenderee_phone,
-                                         "score":contact_score})
+            if tenderee_contact!="":
+                contact_score += 1
+            if contact_score>0:
+                list_contact.append({document_tenderee_contact:tenderee_contact,
+                                     document_tenderee_phone:tenderee_phone,
+                                     "score":contact_score})
 
 
 
-            log("%s search sp %d"%(item.get("id"),len(list_data)))
+        log("%s search sp %d"%(item.get("id"),len(list_data)))
 
 
-            if len(project_name)>6:
-                bool_query_doc = BoolQuery(must_queries=[
-                    BoolQuery(should_queries=[
-                        MatchPhraseQuery("doctitle",project_name),
-                        TermQuery("project_name",project_name),
-                    ]),
-                    WildcardQuery("province","%s*"%province),
-                    RangeQuery("status",201,300,True,True)
+        if len(project_name)>6:
+            bool_query_doc = BoolQuery(must_queries=[
+                BoolQuery(should_queries=[
+                    MatchPhraseQuery("doctitle",project_name),
+                    TermQuery("project_name",project_name),
+                ]),
+                WildcardQuery("province","%s*"%province),
+                RangeQuery("status",201,300,True,True)
 
-                ],must_not_queries=[
-                    TermQuery("docchannel",106),
-                    TermQuery("docchannel",107)
-                ])
-            else:
-                bool_query_doc = BoolQuery(must_queries=[
-                    BoolQuery(should_queries=[
-                        MatchPhraseQuery("doctitle",project_name),
-                        TermQuery("project_name",project_name),
-                    ]),
-                    WildcardQuery("province","%s*"%province),
-                    RangeQuery("status",201,300,True,True)
-
-                ],must_not_queries=[
-                    TermQuery("docchannel",106),
-                    TermQuery("docchannel",107)
-                ])
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
-                                                                                SearchQuery(bool_query_doc,limit=100),
-                                                                                ColumnsToGet(["doctitle","doctextcon","attachmenttextcon","page_time","docchannel","bidway","sub_docs_json","project_name",document_tenderee,document_tenderee_contact,document_tenderee_phone,document_area,document_province,document_city,document_district,document_total_tenderee_money],ColumnReturnType.SPECIFIED))
-            list_data = getRow_ots(rows)
+            ],must_not_queries=[
+                TermQuery("docchannel",106),
+                TermQuery("docchannel",107)
+            ])
+        else:
+            bool_query_doc = BoolQuery(must_queries=[
+                BoolQuery(should_queries=[
+                    MatchPhraseQuery("doctitle",project_name),
+                    TermQuery("project_name",project_name),
+                ]),
+                WildcardQuery("province","%s*"%province),
+                RangeQuery("status",201,300,True,True)
+
+            ],must_not_queries=[
+                TermQuery("docchannel",106),
+                TermQuery("docchannel",107)
+            ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document","document_index",
+                                                                            SearchQuery(bool_query_doc,limit=100),
+                                                                            ColumnsToGet(["doctitle","doctextcon","attachmenttextcon","page_time","docchannel","bidway","sub_docs_json","project_name",document_tenderee,document_tenderee_contact,document_tenderee_phone,document_area,document_province,document_city,document_district,document_total_tenderee_money],ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
 
-            log("%s search doc %d"%(item.get("id"),len(list_data)))
-            for _data in list_data:
-                _content = _data.get("doctitle","")+_data.get("doctextcon","")+_data.get("attachmenttextcon","")
-
-
-                stage_content = _data.get("doctitle","")+_data.get("project_name","")
-                win_tenderer,win_tenderer_manager,win_tenderer_phone = getWinTenderer(_data.get("sub_docs_json"),self.ots_client)
-                _stage,_stage_keyword = extract_legal_stage(stage_content,self.stage_pattern,self.stage_priority_dict)
-                _dynamic = {"docid":str(_data.get("docid")),
-                            "doctype":1,
-                            "doctitle":_data.get("doctitle",""),
-                            "page_time":_data.get("page_time",""),
-                            "docchannel":_data.get("docchannel",""),
-                            "bidway":str(_data.get("bidway","")),
-                            "project_stage":_stage,
-                            "win_tenderer":win_tenderer,
-                            "win_tenderer_manager":win_tenderer_manager,
-                            "win_tenderer_phone":win_tenderer_phone,
-                            "project_stage_keyword":_stage_keyword}
-                list_dynamics.append(_dynamic)
-                _industry = extract_industry(self.dict_keyword,self.keyword_pattern,_content)
-                if _industry not in dict_industry:
-                    dict_industry[_industry] = 0
-                dict_industry[_industry] += 1
-
-                total_tenderee_money = _data.get(document_total_tenderee_money,0)
-                if total_tenderee_money>0:
-                    if _major.getProperties().get(major_project_total_investment,0)==0:
-                        _major.setValue(major_project_total_investment,total_tenderee_money/10000,True)
-
-                _construction = self.extract_projectDigest(_content)
-                list_construction.append(_construction)
-
-                _area = _data.get(document_area,"")
-                _province = _data.get(document_province,"")
-                _city = _data.get(document_city,"")
-                _district = _data.get(document_district,"")
-
-                _score = 0
-                if _area!="":
-                    _score += 1
-                if _province!="":
-                    _score += 1
-                if _city!="":
-                    _score += 1
-                if _district!="":
-                    _score += 1
-                list_area.append({document_area:_area,
-                                  document_province:_province,
-                                  document_city:_city,
-                                  document_district:_district,
-                                  "score":_score})
-
-                _tenderee = _data.get(document_tenderee,"")
-                if _tenderee!="":
-                    if _tenderee not in dict_tenderee:
-                        dict_tenderee[_tenderee] = 0
-                    dict_tenderee[_tenderee] += 1
-
-                tenderee_contact = _data.get(document_tenderee_contact,"")
-                tenderee_phone = _data.get(document_tenderee_phone,"")
-                contact_score = 0
-                if tenderee_phone!="":
-                    if recog_likeType(tenderee_phone)=="mobile":
-                        contact_score += 2
-                    else:
-                        contact_score += 1
-                if tenderee_contact!="":
+        log("%s search doc %d"%(item.get("id"),len(list_data)))
+        for _data in list_data:
+            _content = _data.get("doctitle","")+_data.get("doctextcon","")+_data.get("attachmenttextcon","")
+
+
+            stage_content = _data.get("doctitle","")+_data.get("project_name","")
+            win_tenderer,win_tenderer_manager,win_tenderer_phone = getWinTenderer(_data.get("sub_docs_json"),self.ots_client)
+            _stage,_stage_keyword = extract_legal_stage(stage_content,self.stage_pattern,self.stage_priority_dict)
+            _dynamic = {"docid":str(_data.get("docid")),
+                        "doctype":1,
+                        "doctitle":_data.get("doctitle",""),
+                        "page_time":_data.get("page_time",""),
+                        "docchannel":_data.get("docchannel",""),
+                        "bidway":str(_data.get("bidway","")),
+                        "project_stage":_stage,
+                        "win_tenderer":win_tenderer,
+                        "win_tenderer_manager":win_tenderer_manager,
+                        "win_tenderer_phone":win_tenderer_phone,
+                        "project_stage_keyword":_stage_keyword}
+            list_dynamics.append(_dynamic)
+            _industry = extract_industry(self.dict_keyword,self.keyword_pattern,_content)
+            if _industry not in dict_industry:
+                dict_industry[_industry] = 0
+            dict_industry[_industry] += 1
+
+            total_tenderee_money = _data.get(document_total_tenderee_money,0)
+            if total_tenderee_money>0:
+                if _major.getProperties().get(major_project_total_investment,0)==0:
+                    _major.setValue(major_project_total_investment,total_tenderee_money/10000,True)
+
+            _construction = self.extract_projectDigest(_content)
+            list_construction.append(_construction)
+
+            _area = _data.get(document_area,"")
+            _province = _data.get(document_province,"")
+            _city = _data.get(document_city,"")
+            _district = _data.get(document_district,"")
+
+            _score = 0
+            if _area!="":
+                _score += 1
+            if _province!="":
+                _score += 1
+            if _city!="":
+                _score += 1
+            if _district!="":
+                _score += 1
+            list_area.append({document_area:_area,
+                              document_province:_province,
+                              document_city:_city,
+                              document_district:_district,
+                              "score":_score})
+
+            _tenderee = _data.get(document_tenderee,"")
+            if _tenderee!="":
+                if _tenderee not in dict_tenderee:
+                    dict_tenderee[_tenderee] = 0
+                dict_tenderee[_tenderee] += 1
+
+            tenderee_contact = _data.get(document_tenderee_contact,"")
+            tenderee_phone = _data.get(document_tenderee_phone,"")
+            contact_score = 0
+            if tenderee_phone!="":
+                if recog_likeType(tenderee_phone)=="mobile":
+                    contact_score += 2
+                else:
                     contact_score += 1
-                if contact_score>0:
-                    list_contact.append({document_tenderee_contact:tenderee_contact,
-                                         document_tenderee_phone:tenderee_phone,
-                                         "score":contact_score})
-
-
-            #补充属性
-            if len(list_area)>0:
-                list_area.sort(key=lambda x:x.get("score"),reverse=True)
-                _dict = list_area[0]
-                _major.setValue(major_project_area,_dict.get(document_area,""),True)
-                _major.setValue(major_project_province,_dict.get(document_province,""),True)
-                _major.setValue(major_project_city,_dict.get(document_city,""),True)
-                _major.setValue(major_project_district,_dict.get(document_district,""),True)
-            if len(list_construction)>0:
-                list_construction.sort(key=lambda x:len(x),reverse=True)
-                _major.setValue(major_project_project_overview,list_construction[0],True)
-            if len(dict_tenderee.keys())>0:
-                _l = []
-                for k,v in dict_tenderee.items():
-                    _l.append([k,v])
-                _l.sort(key=lambda x:x[1],reverse=True)
-                _major.setValue(major_project_construction_enterprise,_l[0][0],True)
-
-            if len(list_contact)>0:
-                list_contact.sort(key=lambda x:x.get("score"),reverse=False)
-                _major.setValue(major_project_project_leader,list_contact[0].get(document_tenderee_contact,""),True)
-                _major.setValue(major_project_project_leader_phone,list_contact[0].get(document_tenderee_phone,""),True)
-            # print(list_data)
-            # print(list_dynamics)
-            list_dynamics_all =  dynamicDumplicate(list_dynamics)
-            industry = getIndustry(dict_industry)
-            all_project_dynamics = json.dumps(list_dynamics_all,ensure_ascii=False)
-            all_project_dynamic_number = len(list_dynamics_all)
-
-            list_dynamics,list_stage = dynamicDumplicate2(list_dynamics_all,self.stage_order)
+            if tenderee_contact!="":
+                contact_score += 1
+            if contact_score>0:
+                list_contact.append({document_tenderee_contact:tenderee_contact,
+                                     document_tenderee_phone:tenderee_phone,
+                                     "score":contact_score})
+
+
+        #补充属性
+        if len(list_area)>0:
+            list_area.sort(key=lambda x:x.get("score"),reverse=True)
+            _dict = list_area[0]
+            _major.setValue(major_project_area,_dict.get(document_area,""),True)
+            _major.setValue(major_project_province,_dict.get(document_province,""),True)
+            _major.setValue(major_project_city,_dict.get(document_city,""),True)
+            _major.setValue(major_project_district,_dict.get(document_district,""),True)
+        if len(list_construction)>0:
+            list_construction.sort(key=lambda x:len(x),reverse=True)
+            _major.setValue(major_project_project_overview,list_construction[0],True)
+        if len(dict_tenderee.keys())>0:
+            _l = []
+            for k,v in dict_tenderee.items():
+                _l.append([k,v])
+            _l.sort(key=lambda x:x[1],reverse=True)
+            _major.setValue(major_project_construction_enterprise,_l[0][0],True)
+
+        if len(list_contact)>0:
+            list_contact.sort(key=lambda x:x.get("score"),reverse=False)
+            _major.setValue(major_project_project_leader,list_contact[0].get(document_tenderee_contact,""),True)
+            _major.setValue(major_project_project_leader_phone,list_contact[0].get(document_tenderee_phone,""),True)
+        # print(list_data)
+        # print(list_dynamics)
+        list_dynamics_all =  dynamicDumplicate(list_dynamics)
+        industry = getIndustry(dict_industry)
+        all_project_dynamics = json.dumps(list_dynamics_all,ensure_ascii=False)
+        all_project_dynamic_number = len(list_dynamics_all)
+
+        list_dynamics,list_stage = dynamicDumplicate2(list_dynamics_all,self.stage_order)
+        list_dynamics.sort(key=lambda x:x.get("page_time",""),reverse=True)
+        project_dynamic_number = len(list_dynamics)
+        project_dynamics = json.dumps(list_dynamics,ensure_ascii=False)
+        # project_stage = getMaxStage(list_dynamics,self.stage_order)
+        current_stage = item.get(major_project_project_stage,"")
+        project_stage = "未知"
+        latest_page_time = ""
+
+        if len(list_dynamics)>0:
             list_dynamics.sort(key=lambda x:x.get("page_time",""),reverse=True)
-            project_dynamic_number = len(list_dynamics)
-            project_dynamics = json.dumps(list_dynamics,ensure_ascii=False)
-            # project_stage = getMaxStage(list_dynamics,self.stage_order)
-            current_stage = item.get(major_project_project_stage,"")
-            project_stage = "未知"
-            latest_page_time = ""
-
-            if len(list_dynamics)>0:
-                list_dynamics.sort(key=lambda x:x.get("page_time",""),reverse=True)
-                latest_page_time = list_dynamics[0].get("page_time","")
-                project_stage = list_dynamics[0].get("project_stage","未知")
-            current_date = getCurrent_date(format="%Y")
-            plan_start_time = item.get(major_project_plan_start_time,"")
-            if (re.search("储备|前期|预备",current_stage) is not None or current_stage=="" or current_stage=="未知") and project_stage=="未知" and plan_start_time>=current_date:
-                project_stage = "预备阶段"
-
-            _major.setValue(major_project_industry,industry,True)
-            _major.setValue(major_project_project_dynamics,project_dynamics,True)
-            _major.setValue(major_project_project_dynamic_number,project_dynamic_number,True)
-            _major.setValue(major_project_project_stage,project_stage,True)
-            _major.setValue(major_project_latest_page_time,latest_page_time,True)
-            _major.setValue(major_project_update_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
-            _major.setValue(major_project_all_project_dynamics,all_project_dynamics,True)
-            _major.setValue(major_project_all_project_dynamic_number,all_project_dynamic_number,True)
-            _major.setValue(major_project_project_stages,",".join(list_stage),True)
-            self.set_status_to_adult(_major)
-            _major.update_row(self.ots_client)
+            latest_page_time = list_dynamics[0].get("page_time","")
+            project_stage = list_dynamics[0].get("project_stage","未知")
+        current_date = getCurrent_date(format="%Y")
+        plan_start_time = item.get(major_project_plan_start_time,"")
+        if (re.search("储备|前期|预备",current_stage) is not None or current_stage=="" or current_stage=="未知") and project_stage=="未知" and plan_start_time>=current_date:
+            project_stage = "预备阶段"
+
+        _major.setValue(major_project_industry,industry,True)
+        _major.setValue(major_project_project_dynamics,project_dynamics,True)
+        _major.setValue(major_project_project_dynamic_number,project_dynamic_number,True)
+        _major.setValue(major_project_project_stage,project_stage,True)
+        _major.setValue(major_project_latest_page_time,latest_page_time,True)
+        _major.setValue(major_project_update_time,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
+        _major.setValue(major_project_all_project_dynamics,all_project_dynamics,True)
+        _major.setValue(major_project_all_project_dynamic_number,all_project_dynamic_number,True)
+        _major.setValue(major_project_project_stages,",".join(list_stage),True)
+        self.set_status_to_adult(_major)
+        _major.update_row(self.ots_client)
 
-        task_queue = self.producer()
+    def comsumer(self):
 
-        mt = MultiThreadHandler(task_queue,_handle,None,30)
+        task_queue = self.producer()
+        mt = MultiThreadHandler(task_queue,self._handle1,None,30)
         mt.run()
 
     def start_union(self):

+ 122 - 3
BaseDataMaintenance/maintenance/preproject/fillColumns.py

@@ -14,7 +14,7 @@ class PreprojectFill():
     def __init__(self):
         self.ots_client = getConnect_ots()
         self.task_queue = Queue(3000)
-
+        self.fill_concat_queue = Queue(10000)
 
     def fill_producer(self):
         q1 = BoolQuery(should_queries=[WildcardQuery("uuid","*"),
@@ -24,7 +24,7 @@ class PreprojectFill():
         rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
                                                                             SearchQuery(query,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
                                                                             ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
-        while True:
+        while True or self.task_queue.full():
             if len(rows)>0:
                 dict_rows = getRow_ots(rows)
                 for _row in dict_rows:
@@ -39,7 +39,6 @@ class PreprojectFill():
 
     def fill_comsumer(self):
 
-
         def comsumer_handle(_row,result_queue):
             if _row.get(preproject_uuid) is None:
                 _row[preproject_uuid] = uuid4().hex
@@ -66,10 +65,130 @@ class PreprojectFill():
         _mul = MultiThreadHandler(self.task_queue,comsumer_handle,None,10)
         _mul.run()
 
+    def fill_contact_producer(self):
+        q1 = BoolQuery(must_queries=[TermQuery("status",1),
+                                       ])
+        columns = [preproject_tenderee,preproject_last_tenderee_contact,preproject_last_tenderee_phone,preproject_last_win_tenderer,preproject_last_win_tenderer_contact,preproject_last_win_tenderer_phone]
+        query = q1
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
+                                                                            SearchQuery(query,sort=Sort(sorters=[FieldSort("uuid")]),get_total_count=True,limit=100),
+                                                                            ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+        dict_rows = getRow_ots(rows)
+        for _row in dict_rows:
+            self.fill_concat_queue.put(_row)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("preproject","preproject_index",
+                                                                                SearchQuery(query,next_token=next_token,get_total_count=True,limit=100),
+                                                                                ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+            dict_rows = getRow_ots(rows)
+            for _row in dict_rows:
+                self.fill_concat_queue.put(_row)
+
+    def fill_contact_comsumer(self):
+
+        def comsumer_handle(_row,result_queue):
+
+            product = _row.get(preproject_product)
+
+            tenderee = _row.get(preproject_tenderee)
+            tenderee_concat = _row.get(preproject_last_tenderee_contact)
+            tenderee_phone = _row.get(preproject_last_tenderee_phone)
+
+            win_tenderer = _row.get(preproject_last_win_tenderer)
+            win_tenderer_concat = _row.get(preproject_last_win_tenderer_contact)
+            win_tenderer_phone = _row.get(preproject_last_win_tenderer_phone)
+
+
+            if tenderee is not None and tenderee!="":
+                if (tenderee_concat is None or tenderee_concat=="") and (tenderee_phone is None or tenderee_phone==""):
+                    #fill tenderee concat and phone
+                    bool_query = BoolQuery(must_queries=[
+                        TermQuery(preproject_tenderee,tenderee),
+                        BoolQuery(should_queries=[
+                            MatchPhraseQuery("doctextcon",product),
+                            MatchPhraseQuery("attachmenttextcon",product)
+                        ])
+                    ])
+
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
+                                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=100),
+                                                                                        ColumnsToGet(["tenderee_contact","tenderee_phone"],return_type=ColumnReturnType.SPECIFIED))
+                    list_data = getRow_ots(rows)
+                    _find = False
+                    for _data in list_data:
+                        tenderee_contact = _data.get("tenderee_contact")
+                        tenderee_phone = _data.get("tenderee_phone")
+                        if (tenderee_contact is not None and tenderee_contact!="") or (tenderee_phone is not None and tenderee_phone!=""):
+                            _find = True
+                            _row[preproject_last_tenderee_contact] = tenderee_contact
+                            _row[preproject_last_tenderee_phone] = tenderee_phone
+                    if not _find:
+                        bool_query = BoolQuery(must_queries=[
+                            TermQuery("enterprise_name",tenderee)
+                        ])
+                        rows,next_token,total_count,is_all_succeed = self.ots_client.search("enterprise_contact","enterprise_contact_index",
+                                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("score",SortOrder.DESC)]),limit=1),
+                                                                                            ColumnsToGet(["contact_person","phone_no"],return_type=ColumnReturnType.SPECIFIED))
+                        list_data = getRow_ots(rows)
+                        if len(list_data)>0:
+                            _data = list_data[0]
+                            tenderee_contact = _data.get("contact_person")
+                            tenderee_phone = _data.get("phone_no")
+                            _row[preproject_last_tenderee_contact] = tenderee_contact
+                            _row[preproject_last_tenderee_phone] = tenderee_phone
+
+
+            if win_tenderer is not None and win_tenderer!="":
+                if (win_tenderer_concat is None or win_tenderer_phone!="") and (win_tenderer_phone is None or win_tenderer_phone==""):
+                    # fill win_tenderer concat and phone
+                    bool_query = BoolQuery(must_queries=[
+                        TermQuery("win_tenderer",win_tenderer),
+                        BoolQuery(should_queries=[
+                            MatchPhraseQuery("doctextcon",product),
+                            MatchPhraseQuery("attachmenttextcon",product)
+                        ])
+                    ])
+
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
+                                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("page_time",SortOrder.DESC)]),limit=100),
+                                                                                        ColumnsToGet(["win_tenderer_manager","win_tenderer_phone"],return_type=ColumnReturnType.SPECIFIED))
+                    list_data = getRow_ots(rows)
+                    _find = False
+                    for _data in list_data:
+                        _contact = _data.get("win_tenderer_manager")
+                        _phone = _data.get("win_tenderer_phone")
+                        if (_contact is not None and _contact!="") or (_phone is not None and _phone!=""):
+                            _find = True
+                            _row[preproject_last_win_tenderer_contact] = _contact
+                            _row[preproject_last_win_tenderer_phone] = _phone
+                    if not _find:
+                        bool_query = BoolQuery(must_queries=[
+                            TermQuery("enterprise_name",win_tenderer)
+                        ])
+                        rows,next_token,total_count,is_all_succeed = self.ots_client.search("enterprise_contact","enterprise_contact_index",
+                                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("score",SortOrder.DESC)]),limit=1),
+                                                                                            ColumnsToGet(["contact_person","phone_no"],return_type=ColumnReturnType.SPECIFIED))
+                        list_data = getRow_ots(rows)
+                        if len(list_data)>0:
+                            _data = list_data[0]
+                            _contact = _data.get("contact_person")
+                            _phone = _data.get("phone_no")
+                            _row[preproject_last_win_tenderer_contact] = _contact
+                            _row[preproject_last_win_tenderer_phone] = _phone
+
+
+            _preproject = Preproject(_row)
+            _preproject.update_row(self.ots_client)
+
+        _mul = MultiThreadHandler(self.fill_concat_queue,comsumer_handle,None,10)
+        _mul.run()
+
     def schedule(self):
         _scheduler = BlockingScheduler()
         _scheduler.add_job(self.fill_producer,"cron",minute="*/10")
         _scheduler.add_job(self.fill_comsumer,"cron",minute="*/10")
+        _scheduler.add_job(self.fill_contact_producer,"cron",minute="*/1")
+        _scheduler.add_job(self.fill_contact_comsumer,"cron",minute="*/1")
         _scheduler.start()
 
 def start_fill_preproject():

+ 2 - 1
BaseDataMaintenance/maintenance/product/make_brand_pattern.py

@@ -3270,4 +3270,5 @@ def get_area_set():
     return _set
 
 if __name__ == '__main__':
-    get_area_set()
+    area_set = get_area_set()
+    print("日本" in area_set)

+ 18 - 2
BaseDataMaintenance/maintenance/product/productUtils.py

@@ -27,6 +27,7 @@ from BaseDataMaintenance.maintenance.product.make_brand_pattern import get_area_
 area_set = get_area_set()
 import jieba
 
+ots_client = getConnect_ots()
 def get_intellect_search(coll,index_name,name,grade,search_params,output_fields,limit,max_steps=5):
 
     vector = []
@@ -98,7 +99,7 @@ def get_embedding_search(coll,index_name,name,grade,vector,search_params,output_
             final_list.sort(key=lambda x:x.get("level",1))
             try:
                 db.set(_md5,json.dumps(final_list))
-                db.expire(_md5,2*60)
+                db.expire(_md5,PRODUCT_REDIS_CACHE_TIME)
             except Exception as e:
                 log("set redis data error")
             return final_list
@@ -180,6 +181,8 @@ def is_area_brand(brand,area_set):
     brand = re.sub("[省市区县等]","",brand)
     if len(brand)>12:
         return 0
+    if brand in area_set:
+        return 2
     for _i in range(2,len(brand)):
         ns = brand[:_i]
         ns1 = brand[_i:]
@@ -520,6 +523,18 @@ def clean_product_brand(product_brand):
     if _search is not None:
         product_brand = _search.groupdict().get("brand")
     brand = re.sub("[/\\,,、.|等]|一批|/无|品牌|^[/.]+",'',product_brand)
+    for i in range(min(len(brand)-2,8)):
+        _n = brand[:i+1]
+        if _n in area_set:
+            n_name = re.sub("^[省市区]]",'',brand[i+1:])
+            face_id = get_document_product_dict_interface_base_id(n_name,BRAND_GRADE)
+            _interface_d = {
+                DOCUMENT_PRODUCT_DICT_INTERFACE_ID:face_id
+            }
+            _dpdi = Document_product_dict_interface(_interface_d)
+            if _dpdi.exists_row(ots_client):
+                brand = n_name
+                break
     return brand
 
 
@@ -573,5 +588,6 @@ if __name__ == '__main__':
     # import Levenshtein
     # print(Levenshtein.ratio('助听器','助行器'))
     # print(clean_product_specs("//4008SverssionV10"))
-    print(is_legal_brand(getConnect_ots(),"保健"))
+    print(is_legal_brand(getConnect_ots(),"产地:中国品牌:天津迈达型号:ODM-2100S"))
+    print(clean_product_brand("产地:中国品牌:天津迈达型号:ODM-2100S"))
     # print(check_specs("500ml","3500ml"))

+ 411 - 51
BaseDataMaintenance/maintenance/product/product_dict.py

@@ -143,7 +143,7 @@ class Product_Dict_Manager():
                 remove_words = item.get(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,"")
                 level = item.get(DOCUMENT_PRODUCT_DICT_LEVEL,1)
 
-                if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level):
+                if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level,wait_sync=False):
 
                     _pd = Document_product_dict_interface({DOCUMENT_PRODUCT_DICT_ID:_id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED})
                     _pd.update_row(self.ots_client)
@@ -250,6 +250,7 @@ class Product_Dict_Manager():
                                                                                     SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
                                                                                     columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                 list_data = getRow_ots(rows)
+                log("insert brand %s %d counts"%(name,total_count))
                 while next_token:
                     rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                         SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
@@ -361,25 +362,24 @@ class Product_Dict_Manager():
                         ])
                         rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                             SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
-                                                                                            columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
+                                                                                            columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
                         list_data = getRow_ots(rows)
                         while next_token:
                             rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                                 SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                                                columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
+                                                                                                columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
                             list_data.extend(getRow_ots(rows))
                         for _d in list_data:
-                            dict_name_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
-                            if dict_name_id is not None and dict_name_id!="":
+                            dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
+                            if dict_brand_id is not None and dict_brand_id!="":
                                 _query = BoolQuery(must_queries=[
-                                    TermQuery(DOCUMENT_PRODUCT_DICT_NAME_ID,dict_name_id)
+                                    TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
                                 ])
                                 rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                                     SearchQuery(_query,get_total_count=True))
                                 if total_count==1:
-                                    dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_name_id})
-                                    self.recurse_delete_dict(dict_name_id)
-                                    dpd.delete_row(self.ots_client)
+                                    dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
+                                    self.recurse_delete_dict(dict_brand_id)
 
                             _id = _d.get(DOCUMENT_PRODUCT_ID)
                             original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
@@ -519,28 +519,27 @@ class Product_Dict_Manager():
                         ])
                         rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                             SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
-                                                                                            columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
+                                                                                            columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
                         list_data = getRow_ots(rows)
 
 
                         while next_token:
                             rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                                 SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                                                columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
+                                                                                                columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
                             list_data.extend(getRow_ots(rows))
 
                         for _d in list_data:
-                            dict_name_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
-                            if dict_name_id is not None and dict_name_id!="":
+                            dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
+                            if dict_brand_id is not None and dict_brand_id!="":
                                 _query = BoolQuery(must_queries=[
-                                    TermQuery(DOCUMENT_PRODUCT_DICT_NAME_ID,dict_name_id)
+                                    TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
                                 ])
                                 rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                                     SearchQuery(_query,get_total_count=True))
                                 if total_count==1:
-                                    dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_name_id})
-                                    self.recurse_delete_dict(dict_name_id)
-                                    dpd.delete_row(self.ots_client)
+                                    dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
+                                    self.recurse_delete_dict(dict_brand_id)
 
                             _id = _d.get(DOCUMENT_PRODUCT_ID)
                             original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
@@ -671,7 +670,7 @@ class Product_Dict_Manager():
 
         # search interface if name and grade exists then update document_product_dict and return
 
-        interface_id = get_milvus_product_dict_id(name)
+        interface_id = get_document_product_dict_interface_base_id(name,grade)
         _interface_d = {
             DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
             DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:alias,
@@ -705,7 +704,7 @@ class Product_Dict_Manager():
                     if _alias==name:
                         continue
                     list_name.append(_alias)
-            time.sleep(1)
+            time.sleep(PRODUCT_REDIS_CACHE_TIME)
 
 
         #judge whether there exists records before this record created,if not process the history data
@@ -847,7 +846,7 @@ class Product_Dict_Manager():
     def act_update(self,name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level):
         # check whether there are change variable
 
-        _interface_id = get_milvus_product_dict_id(name)
+        _interface_id = get_document_product_dict_interface_base_id(name,grade)
         _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:_interface_id}
         _dpdi = Document_product_dict_interface(_d)
         if not _dpdi.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS,DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS,DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL],True):
@@ -863,7 +862,7 @@ class Product_Dict_Manager():
         if not update_flag:
             return
 
-        interface_id = get_milvus_product_dict_id(name)
+        interface_id = get_document_product_dict_interface_base_id(name,grade)
         final_alias = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_alias_set))
         final_standard_alias = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_standard_alias_set))
         final_remove_words = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_remove_words_set))
@@ -888,6 +887,8 @@ class Product_Dict_Manager():
             for _name in delete_standard_names:
                 delete_record_from_milvus(Coll,_name,"")
 
+        time.sleep(PRODUCT_REDIS_CACHE_TIME)
+
         # update document_product_dict
         # update alias
         if len(new_alias_set&original_alias_set)!=len(new_alias_set):
@@ -934,6 +935,14 @@ class Product_Dict_Manager():
                 log("delete id:%s"%(_id))
                 self.recurse_delete_dict(_id)
                 dpd.delete_row(self.ots_client)
+            face_id = get_document_product_dict_interface_base_id(_name,grade)
+            _interface_d = {
+                DOCUMENT_PRODUCT_DICT_INTERFACE_ID:face_id,
+                DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(401,451)
+            }
+            _dpdi = Document_product_dict_interface(_interface_d)
+            if _dpdi.exists_row(self.ots_client):
+                _dpdi.update_row(self.ots_client)
 
         # process history
         if len(delete_standard_names)>0:
@@ -1012,7 +1021,7 @@ class Product_Dict_Manager():
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
             list_data.extend(getRow_ots(rows))
 
-        interface_id = get_milvus_product_dict_id(name)
+        interface_id = get_document_product_dict_interface_base_id(name,grade)
 
 
 
@@ -1020,7 +1029,7 @@ class Product_Dict_Manager():
         Coll,_ = self.get_collection(grade)
 
         delete_record_from_milvus(Coll,name,standard_alias)
-        time.sleep(1)
+        time.sleep(PRODUCT_REDIS_CACHE_TIME)
 
         #process_history data
         self.process_history_by_name([name],grade,"delete")
@@ -1329,7 +1338,7 @@ def clean_similar():
 
 
 
-def insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words="",level=1):
+def insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words="",level=1,wait_sync=True):
 
     n_name = get_milvus_standard_name(name)
     name_id = get_milvus_product_dict_id(n_name)
@@ -1379,25 +1388,28 @@ def insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_
                         [level]
                         ]
                 insert_embedding(Coll,data)
-        while 1:
-            try:
-                log("milvus insert wait for done")
-                list_result = Coll.query(expr=expr,output_fields=["standard_name"])
-                log("list_result"+str(list_result)+str(type(list_result[0])))
-                if len(list_result)==1:
-                    if list_result[0].get("standard_name","")==name:
-                        log("milvus insert done")
-                        return True
-                time.sleep(1)
-            except Exception as e:
-                traceback.print_exc()
+        if wait_sync:
+            while 1:
+                try:
+                    log("milvus insert wait for done")
+                    list_result = Coll.query(expr=expr,output_fields=["standard_name"])
+                    log("list_result"+str(list_result)+str(type(list_result[0])))
+                    if len(list_result)==1:
+                        if list_result[0].get("standard_name","")==name:
+                            log("milvus insert done")
+                            return True
+                    time.sleep(1)
+                except Exception as e:
+                    traceback.print_exc()
+        else:
+            return True
 
 def delete_record_from_milvus(Coll,name,standard_alias):
 
     n_name = get_milvus_standard_name(name)
     name_id = get_milvus_product_dict_id(n_name)
 
-    log("delete name %s grade %s"%(str(name),str(standard_alias)))
+    log("delete name %s standard_alias %s"%(str(name),str(standard_alias)))
 
     expr = " ots_id in ['%s']"%name_id
     Coll.delete(expr)
@@ -1435,13 +1447,44 @@ def dict_interface_delete(name,grade,ots_client = getConnect_ots()):
     dpdi.update_row(ots_client)
 
 
-def interface_deletes():
+def interface_insert():
+    from uuid import uuid4
     a = '''
-    按采购需求执行
     '''
     grade = 4
+
+    new_standard_alias = ""
+    new_remove_words = ""
+
+    list_brand = []
     ots_client=getConnect_ots()
     for s in re.split("[\n\s,.,。、]",a):
+        s = s.strip()
+        if s=="":
+            continue
+        list_brand.append(s)
+    grade = 4
+
+    for brand in list_brand:
+        print(brand)
+        _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:brand,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert",
+              DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:new_remove_words,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
+        dpdi = Document_product_dict_interface(_d)
+        dpdi.update_row(ots_client)
+
+def interface_deletes():
+    a = '''
+    株式会社
+    '''
+    grade = 4
+    ots_client=getConnect_ots()
+    for s in re.split("[\n\s]",a):
         s = s.strip()
         if s=="":
             continue
@@ -1449,9 +1492,9 @@ def interface_deletes():
         dict_interface_delete(s,grade,ots_client)
 
 def interface_update():
-    name = "保健"
-    new_standard_alias = ""
-    new_remove_words = "+设备"
+    name = "万东"
+    new_standard_alias = "+万东康源|北京万东"
+    new_remove_words = ""
     grade = 4
     ots_client = getConnect_ots()
 
@@ -1468,6 +1511,68 @@ def interface_update():
     dpdi = Document_product_dict_interface(_d)
     dpdi.update_row(ots_client)
 
+def interface_brand_update_by_file():
+    import pandas as pd
+    import re
+    filename = "../../test/品牌合并.xlsx"
+    df0 = pd.read_excel(filename,0)
+    df1 = pd.read_excel(filename,1)
+    set_source_brand = set()
+    for b in df0["brands"]:
+        if b is None or b=="":
+            continue
+        list_brand = b.split(",")
+        for brand in list_brand:
+            brand = brand.strip()
+            if brand=="":
+                continue
+            set_source_brand.add(brand)
+    target_brand = df1["brand"]
+    target_standard_alias = df1["standard_alias"]
+    _check_flag = True
+    list_target = []
+    for tbrand,standard_alias in zip(target_brand,target_standard_alias):
+        brand = tbrand.strip()
+        if brand not in set_source_brand:
+            print("not in source:%s"%(brand))
+            _check_flag = False
+        if standard_alias is None or standard_alias=="" or str(standard_alias)=="nan":
+            continue
+        list_brand = re.split("[,,]",standard_alias)
+        set_alias = set()
+        for brand in list_brand:
+            brand = brand.strip()
+            if brand=="":
+                continue
+            if brand not in set_source_brand:
+                print("not in source:%s"%(brand))
+                _check_flag = False
+            set_alias.add(brand)
+        _d = {"brand":tbrand.strip(),
+              "standard_alias":"+"+"|".join(list(set_alias))}
+        list_target.append(_d)
+
+
+    if _check_flag or 1:
+        grade = 4
+        ots_client = getConnect_ots()
+        from uuid import uuid4
+        for target in list_target:
+            name = target["brand"]
+            new_standard_alias = target["standard_alias"]
+            _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
+                  DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
+                  DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
+                  DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
+                  DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
+                  DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
+                  DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:"",
+                  DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
+            dpdi = Document_product_dict_interface(_d)
+            dpdi.update_row(ots_client)
+        print(list_target)
+
+
 def clean_brands():
     from queue import Queue as TQueue
     task_queue = TQueue()
@@ -1475,13 +1580,17 @@ def clean_brands():
 
     list_data = []
 
-    columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE]
+    table_name = Document_product_dict_interface_table_name
+    table_index = table_name+"_index"
+
+    columns=[DOCUMENT_PRODUCT_DICT_INTERFACE_NAME,DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE]
 
     bool_query = BoolQuery(must_queries=[
-        RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,4,4,True,True),
+        TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
+        RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,4,4,True,True),
     ])
 
-    rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
+    rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
                                                                    columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
 
@@ -1490,7 +1599,7 @@ def clean_brands():
         list_data.append(_d)
 
     while next_token:
-        rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
+        rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
                                                                        SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                        columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
         list_dict = getRow_ots(rows)
@@ -1502,6 +1611,7 @@ def clean_brands():
 
     set_key = set()
     list_process_data = []
+    set_brand = set()
     for _d in list_data:
         name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
         grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
@@ -1511,21 +1621,36 @@ def clean_brands():
         set_key.add(_key)
         task_queue.put(_d)
         list_process_data.append(_d)
+        if grade==BRAND_GRADE:
+            set_brand.add(name)
     def _handle(item,result_queue):
         name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
 
+        _legal = is_legal_brand(ots_client,name)
         if is_legal_brand(ots_client,name):
             item["legal"] = 1
+        elif _legal==False:
+            item["legal"] = 0
         else:
+            item["legal"] = 0
             bool_query = BoolQuery(must_queries=[
                 TermQuery("brand",name)
             ])
             rows,next_token,total_count,is_all_succeed = ots_client.search("document_product","document_product_index",
                                                                            SearchQuery(bool_query,get_total_count=True))
-            if total_count>0:
+            if total_count>=2:
                 item["legal"] = 1
             else:
-                item["legal"] = 0
+                bool_query = BoolQuery(must_queries=[
+                    NestedQuery("products",WildcardQuery("products.brand",name)),
+                ])
+                rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
+                                                                               SearchQuery(bool_query,get_total_count=True))
+                if total_count>=1:
+                    item["legal"] = 1
+                else:
+                    item["legal"] = 0
+
     mt = MultiThreadHandler(task_queue,_handle,None,30)
     mt.run()
 
@@ -1545,10 +1670,203 @@ def clean_brands():
         for _name in list_illegal:
             f.write("%s\n"%(_name))
 
+def merge_brands():
+    from queue import Queue as TQueue
+    import pandas as pd
+    task_queue = TQueue()
+    ots_client = getConnect_ots()
+
+    list_data = []
+
+    table_name = Document_product_dict_interface_table_name
+    table_index = table_name+"_index"
+
+    columns=[DOCUMENT_PRODUCT_DICT_INTERFACE_NAME,DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS]
+
+    bool_query = BoolQuery(must_queries=[
+        TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
+        RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,4,4,True,True),
+    ])
+
+    rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
+                                                                   columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+
+    list_dict = getRow_ots(rows)
+    for _d in list_dict:
+        list_data.append(_d)
+
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
+                                                                       SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                       columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
+        list_dict = getRow_ots(rows)
+        for _d in list_dict:
+            list_data.append(_d)
+        # if len(list_data)>=1000:
+        #     break
+    log("product_dict embedding total_count:%d"%total_count)
+
+    set_key = set()
+    list_process_data = []
+    set_brand = set()
+    for _d in list_data:
+        name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
+        grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
+        _key = "%s-%d"%(name,grade)
+        if _key in set_key:
+            continue
+        set_key.add(_key)
+        task_queue.put(_d)
+        list_process_data.append(_d)
+        if grade==BRAND_GRADE:
+            set_brand.add(name)
+
+    area_set = get_area_set()
+    def _handle(item,result_queue):
+        name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
+        grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
+        for i in range(min(len(name)-2,8)):
+            _n = name[:i+1]
+            if _n in area_set:
+                n_name = re.sub("^[省市区]]",'',name[i+1:])
+                if n_name in set_brand:
+                    item["belongs_to"] = n_name
+        standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
+        if standard_alias is not None and standard_alias!="":
+            for salias in standard_alias.split("|"):
+                face_id = get_document_product_dict_interface_base_id(salias,grade)
+                _interface_d = {
+                    DOCUMENT_PRODUCT_DICT_INTERFACE_ID:face_id,
+                    DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(401,451)
+                }
+                _dpdi = Document_product_dict_interface(_interface_d)
+                if _dpdi.exists_row(ots_client):
+                    _dpdi.update_row(ots_client)
+
+
+    mt = MultiThreadHandler(task_queue,_handle,None,20)
+    mt.run()
+    dict_belongs_alias = {}
+    for _data in list_process_data:
+        name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
+        belongs_to = _data.get("belongs_to")
+        if belongs_to is not None:
+            if belongs_to not in dict_belongs_alias:
+                dict_belongs_alias[belongs_to] = []
+            dict_belongs_alias[belongs_to].append(name)
+    df_data = {"brand":[],"standard_alias":[]}
+    for k,v in dict_belongs_alias.items():
+        df_data["brand"].append(k)
+        df_data["standard_alias"].append("|".join(v))
+    df = pd.DataFrame(df_data)
+    df.to_excel("../../merge.xlsx",columns=["brand","standard_alias"])
+
+
+    # grade = 4
+    # ots_client = getConnect_ots()
+    # from uuid import uuid4
+    # for k,v in dict_belongs_alias.items():
+    #     name = k
+    #     new_standard_alias = "+%s"%("|".join(v))
+    #     print(k,new_standard_alias)
+    #     _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
+    #           DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
+    #           DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
+    #           DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
+    #           DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
+    #           DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
+    #           DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:"",
+    #           DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
+    #     dpdi = Document_product_dict_interface(_d)
+    #     dpdi.update_row(ots_client)
+
+
+def interface_delete_brands():
+    from uuid import uuid4
+    ots_client=getConnect_ots()
+    list_brand = []
+
+    a = '''
+    日本
+    '''
+    grade = 4
+
+    for s in re.split("[\n\s,.,。、]",a):
+        s = s.strip()
+        if s=="":
+            continue
+        list_brand.append(s)
+
+    with open("../../test/illegal_brand.txt","r",encoding="utf8") as f:
+        while 1:
+            brand = f.readline()
+            if not brand:
+                break
+            brand = brand.strip()
+            if brand!="":
+                list_brand.append(brand)
+
+    for brand in list_brand:
+        print(brand)
+        _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:brand,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
+              DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"delete",
+              DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+              "is_temp":1
+              }
+        dpdi = Document_product_dict_interface(_d)
+        dpdi.update_row(ots_client)
+
+def clean_interface_delete_temp():
+    ots_client = getConnect_ots()
+
+    table_name = Document_product_dict_interface_table_name
+    table_index = table_name+"_index"
+    columns = ["is_temp","status","name"]
+
+    task_queue = Queue()
+    bool_query = BoolQuery(must_queries=[
+        TermQuery("action","delete")
+    ])
+    rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
+                                                                   columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+    list_data = getRow_ots(rows)
+    for _data in list_data:
+        task_queue.put(_data)
+    print("%d/%d"%(task_queue.qsize(),total_count))
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
+                                                                       SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                       columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            task_queue.put(_data)
+        print("%d/%d"%(task_queue.qsize(),total_count))
+    def _handle(item,result_queue):
+        is_temp = item.get("is_temp",0)
+        status = item.get("status",0)
+        name = item.get("name")
+        dpdi = Document_product_dict_interface(item)
+        if is_temp==1 and status>=201:
+            dpdi.delete_row(ots_client)
+        else:
+            pass
+            # dpdi.setValue("status",1,True)
+            # dpdi.update_row(ots_client)
+    mt = MultiThreadHandler(task_queue,_handle,None,30)
+    mt.run()
+
+
+
+
 def clean_product_dict():
     ots_client = getConnect_ots()
     bool_query = BoolQuery(must_queries=[
-        RangeQuery("status",0)
+        RangeQuery("grade",3)
     ])
     task_queue = Queue()
     rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
@@ -1604,11 +1922,53 @@ def clean_product_dict_interface():
     mt = MultiThreadHandler(task_queue,_handle,None,30)
     mt.run()
 
+def rerun_interface_deletes():
+    ots_client = getConnect_ots()
+
+    table_name = Document_product_dict_interface_table_name
+    table_index = table_name+"_index"
+    columns = ["is_temp","status","name"]
+
+    task_queue = Queue()
+    bool_query = BoolQuery(must_queries=[
+        TermQuery("action","delete")
+    ])
+    rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
+                                                                   columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+    list_data = getRow_ots(rows)
+    for _data in list_data:
+        task_queue.put(_data)
+    print("%d/%d"%(task_queue.qsize(),total_count))
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
+                                                                       SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
+                                                                       columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            task_queue.put(_data)
+        print("%d/%d"%(task_queue.qsize(),total_count))
+    def _handle(item,result_queue):
+        status = item.get("status",0)
+        dpdi = Document_product_dict_interface(item)
+        dpdi.setValue("status",1,True)
+        dpdi.update_row(ots_client)
+    mt = MultiThreadHandler(task_queue,_handle,None,30)
+    mt.run()
+
+
+
 if __name__ == '__main__':
     # start_embedding_product_dict()
-    # interface_deletes()
+    interface_deletes()
+    interface_insert()
     # interface_update()
+    # interface_brand_update_by_file()
     # clean_similar()
     # clean_brands()
+    # merge_brands()
+    # interface_delete_brands()
+    # clean_interface_delete_temp()
     # clean_product_dict()
-    clean_product_dict_interface()
+    # clean_product_dict_interface()
+    # rerun_interface_deletes()

+ 2 - 0
BaseDataMaintenance/maintenance/product/product_setting.py

@@ -22,3 +22,5 @@ COLLECTION_NAME_SPECS = "product_dict_embedding_specs_single"
 NAME_GRADE = 3
 BRAND_GRADE = 4
 SPECS_GRADE = 5
+
+PRODUCT_REDIS_CACHE_TIME = 30

+ 28 - 19
BaseDataMaintenance/maintenance/product/products.py

@@ -99,8 +99,8 @@ class Product_Manager(Product_Dict_Manager):
             mt = MultiThreadHandler(self.process_queue,self.comsumer_handle,None,thread_count,1,False,True)
             mt.run()
 
-        process_count = 4
-        thread_count = 10
+        process_count = 6
+        thread_count = 6
         list_process = []
         for _i in range(process_count):
             p = Process(target=start_thread,args=(thread_count,))
@@ -255,6 +255,8 @@ class Product_Manager(Product_Dict_Manager):
                                 continue
                             original_brand = brand
                             if original_brand==original_name:
+                                if len(new_name)+len(ots_name)>len(original_name):
+                                    continue
                                 if original_brand.find(ots_name)>=1:
                                     continue
                                 if len(original_brand)<=3:
@@ -303,18 +305,22 @@ class Product_Manager(Product_Dict_Manager):
                     for brand in l_brand:
                         if len(brand)>100:
                             continue
-                        if self.check_new_brand(brand):
-                            new_brand = clean_product_brand(brand)
-                            if new_brand=="":
+                        c_brand = clean_product_brand(brand)
+                        if self.check_new_brand(c_brand):
+                            if c_brand=="":
                                 continue
                             original_brand = brand
                             if original_brand==original_name:
+                                if len(new_name)+len(c_brand)>len(original_name):
+                                    continue
                                 if new_name==original_brand:
                                     continue
-                                if original_brand.find(new_brand)>=1:
+                                if original_brand.find(c_brand)>=1:
                                     continue
                                 if len(original_brand)<=3:
                                     continue
+                            new_brand = c_brand
+
                             log("adding new brand %s"%(str(new_brand)))
                             _d_brand = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
                                         DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_brand,
@@ -324,7 +330,8 @@ class Product_Manager(Product_Dict_Manager):
                                         DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:name_ots_id,
                                         DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
                                         DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
+                                        DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert",
+                                        DOCUMENT_PRODUCT_ORIGINAL_BRAND:brand
                                         }
                             dpdi = Document_product_dict_interface(_d_brand)
                             dpdi.update_row(self.ots_client)
@@ -362,6 +369,8 @@ class Product_Manager(Product_Dict_Manager):
 
                                 original_brand = brand
                                 if original_brand==original_name:
+                                    if len(new_name)+len(ots_name)>len(original_name):
+                                        continue
                                     if original_brand.find(ots_name)>=1:
                                         continue
                                     if len(original_brand)<=3:
@@ -438,7 +447,7 @@ class Product_Manager(Product_Dict_Manager):
                                 if check_specs(c_specs,ots_name):
                                     break_flag = True
                                     original_specs = c_specs
-                                    if standard_name==new_name:
+                                    if standard_name==new_name or standard_name==new_brand:
                                         continue
                                     new_specs = standard_name
                                     log("check_specs %s succeed %s"%(specs,new_specs))
@@ -486,7 +495,7 @@ class Product_Manager(Product_Dict_Manager):
                             original_specs = specs
 
                             new_specs = clean_product_specs(specs)
-                            if new_specs==new_name:
+                            if new_specs==new_name or new_specs==new_brand:
                                 new_specs = ""
                                 continue
                             # insert into document_product_dict a new record
@@ -564,7 +573,7 @@ class Product_Manager(Product_Dict_Manager):
                                     break_flag = True
                                     original_specs = c_specs
                                     new_specs = standard_name
-                                    if new_specs==new_name:
+                                    if new_specs==new_name or new_specs==new_brand:
                                         new_specs = ""
                                         continue
                                     if brand_ots_id is not None:
@@ -977,9 +986,9 @@ def fix_product_data():
     columns = [DOCUMENT_PRODUCT_TMP_NEW_ID,DOCUMENT_PRODUCT_TMP_STATUS]
 
 
-    table_name = Document_product_table_name
-    table_index = Document_product_table_name+"_index"
-    columns = [DOCUMENT_PRODUCT_ORIGINAL_ID]
+    # table_name = Document_product_table_name
+    # table_index = Document_product_table_name+"_index"
+    # columns = [DOCUMENT_PRODUCT_ORIGINAL_ID]
 
 
     ots_client = getConnect_ots()
@@ -987,7 +996,7 @@ def fix_product_data():
         # RangeQuery("status",501),
         # TermQuery("docid",246032980)
 
-        RangeQuery("status",401,501),
+        RangeQuery("status",201,501),
         # RangeQuery("status",401,451)
         # WildcardQuery(DOCUMENT_PRODUCT_ORIGINAL_SPECS,"MFUSOne")
         # TermQuery(DOCUMENT_PRODUCT_SPECS,"MFUSOne")
@@ -1049,8 +1058,8 @@ def fix_product_data():
         original_id = item.get(DOCUMENT_PRODUCT_TMP_ID)
         new_id = item.get(DOCUMENT_PRODUCT_TMP_NEW_ID)
 
-        original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
-        new_id = item.get(DOCUMENT_PRODUCT_ID)
+        # original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
+        # new_id = item.get(DOCUMENT_PRODUCT_ID)
 
         print("original_id",original_id,"id",item.get(DOCUMENT_PRODUCT_ID))
         # delete data and rerun
@@ -1126,7 +1135,7 @@ def test_check_brand():
             f.write(b+"\n")
 
 def test_match():
-    a = "桂林市啄木鸟医疗器械有限公司"
+    a = "迈瑞晟"
 
     # vector = request_embedding(get_milvus_standard_name(a))
     # vector = [get_embedding_request(b) for b in a]
@@ -1479,7 +1488,7 @@ def clean_product_dict_interface():
 
 if __name__ == '__main__':
 
-    # test()
+    test()
     # start_process_product()
     # print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))
     # print(Product_Manager.get_bid_filemd5s(155415770,getConnect_ots()))
@@ -1487,4 +1496,4 @@ if __name__ == '__main__':
     # ots_name = "一氧化碳分析仪"
     # print(is_similar(name,ots_name),check_product(name,ots_name))
     # print(is_legal_specs('SCM-A/SB(0.18D)'))
-    clean_product_dict_interface()
+    # clean_product_dict_interface()

+ 26 - 29
BaseDataMaintenance/maintenance/proposedBuilding/DataSynchronization.py

@@ -24,6 +24,8 @@ class DataSynchronization():
         self.designed_project_table_index = "designed_project_index"
         self.ots_client = getConnect_ots()
 
+        self.task_queue = queue.Queue()
+
     def producer(self,task_queue):
         '''
         :return:生产数据
@@ -55,37 +57,33 @@ class DataSynchronization():
                 break
 
 
-    def comsumer(self,task_queue):
+    def comsumer_handle(self,_proposed,result_queue):
 
-        def _handle(_proposed,result_queue,ots_client):
 
+        #修改designed_project
+        _time = time.time()
 
-            #修改designed_project
+        try:
+            _project_dict = _proposed.toDesigned_project(self.ots_client)
+            log("toDesigned_project takes %.2fs"%(time.time()-_time))
             _time = time.time()
+            if _project_dict is not None:
+                #更新数据
+                # log("project_dict:"+json.dumps(_project_dict))
+                _designed_project = designed_project(_project_dict)
+                _designed_project.update_project(self.ots_client)
+                log("update desined takes %.2fs"%(time.time()-_time))
+
+            #删除tmp
+            _proposed.delete_row(self.ots_client)
+            log("update designed takes %.2fs"%(time.time()-_time))
+        except Exception as e:
+            log("comsumer failed cause of %s"%(str(e)))
+            log("proposed:%s"%(str(_proposed)))
+            log(traceback.format_exc())
 
-            try:
-                _project_dict = _proposed.toDesigned_project(ots_client)
-                log("toDesigned_project takes %.2fs"%(time.time()-_time))
-                _time = time.time()
-                if _project_dict is not None:
-                    #更新数据
-                    # log("project_dict:"+json.dumps(_project_dict))
-                    _designed_project = designed_project(_project_dict)
-                    _designed_project.update_project(ots_client)
-                    log("update desined takes %.2fs"%(time.time()-_time))
-
-                #删除tmp
-                _proposed.delete_row(ots_client)
-                log("update designed takes %.2fs"%(time.time()-_time))
-            except Exception as e:
-                log("comsumer failed cause of %s"%(str(e)))
-                log("proposed:%s"%(str(_proposed)))
-                log(traceback.format_exc())
-
-
-        result_queue = queue.Queue()
-
-        mt = MultiThreadHandler(task_queue,_handle,result_queue,thread_count=30,ots_client=self.ots_client)
+    def comsumer(self,task_queue):
+        mt = MultiThreadHandler(task_queue,self.comsumer_handle,None,thread_count=30)
         mt.run()
 
 
@@ -99,9 +97,8 @@ class DataSynchronization():
 
     def maxcompute2ots(self):
 
-        task_queue = queue.Queue()
 
-        self.producer(task_queue)
+        self.producer(self.task_queue)
 
         # _dict = {"uuid":"12313","crtime":123,
         #                 "json_list_group":'''
@@ -109,7 +106,7 @@ class DataSynchronization():
         #                 '''}
         # task_queue.put(proposedBuilding_tmp(_dict))
 
-        self.comsumer(task_queue)
+        self.comsumer(self.task_queue)
 
     def turn_stage(self):
         '''

+ 6 - 2
BaseDataMaintenance/maxcompute/documentMerge.py

@@ -2108,14 +2108,18 @@ def dumplicate_projects(list_projects,b_log=False):
     appendKeyvalueCount(list_projects)
     list_projects.sort(key=lambda x:x.get(project_page_time,""))
     list_projects.sort(key=lambda x:x.get("keyvaluecount",0),reverse=True)
-    cluster_projects = list_projects
-    while 1:
+    cluster_projects = list_projects[:50]
+    _count = 10
+    while _count>0:
+        _count -= 1
         _update = False
         list_p = []
         # log("================")
         # for _p in cluster_projects:
         #     log("docids:%s"%(_p.get(project_docids,"")))
+        _c = 0
         for _pp in cluster_projects:
+            _c += 1
             _find = False
             list_prob = []
             for _p in list_p:

+ 4 - 0
BaseDataMaintenance/model/ots/Preproject.py

@@ -30,6 +30,10 @@ preproject_last_tenderee_phone = "last_tenderee_phone"
 preproject_uuid = "uuid"
 preproject_has_bidfile="has_bidfile"
 
+preproject_last_doctitle = "last_doctitle"
+preproject_last_win_tenderer_contact = "last_win_tenderer_contact"
+preproject_last_win_tenderer_phone = "last_win_tenderer_phone"
+
 
 
 class Preproject(BaseModel):

+ 2 - 2
BaseDataMaintenance/model/ots/document_product_dict_interface.py

@@ -42,5 +42,5 @@ class Document_product_dict_interface(BaseModel):
 
 
 from BaseDataMaintenance.common.documentFingerprint import getMD5
-def get_document_product_dict_interface_base_id(name):
-    return "mdd5="+getMD5(name)
+def get_document_product_dict_interface_base_id(name,grade):
+    return "mdd5="+getMD5("%s-%d"%(name,grade))