Эх сурвалжийг харах

参数接口表逻辑修正

luojiehua 1 жил өмнө
parent
commit
be9e008e99

+ 2 - 0
BaseDataMaintenance/dataMonitor/data_monitor.py

@@ -493,6 +493,8 @@ class BaseDataMonitor():
             _cmd = 'cat %s | grep -c "%s.*upgrate True save"'%(flow_dumplicate_log_path,self.get_last_tenmin_time())
             process_count = self.cmd_execute(_cmd)
             atAll = False
+            if process_count=="":
+                process_count = 0
             if int(process_count)==0:
                 atAll = True
             _msg = "数据流报警:待去重公告数为:%d,最近十分钟去重数为:%s"%(total_count,str(process_count))

+ 3 - 0
BaseDataMaintenance/maintenance/dataflow.py

@@ -2857,6 +2857,9 @@ class Dataflow_dumplicate(Dataflow):
         for p in list_process:
             p.join()
 
+        # mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,40,1,ots_client=self.ots_client)
+        # mt.run()
+
 
     def search_docs(self,list_docids,columns_to_get = [document_doctitle,document_tmp_save,document_bidway,document_status,document_page_time,document_info_source,document_fingerprint,document_docchannel,document_life_docchannel,document_area,document_province,document_city,document_district,document_tmp_sub_docs_json,document_industry,document_info_type,document_qcodes,document_project_name,document_project_code,document_tenderee,document_tenderee_addr,document_tenderee_phone,document_tenderee_contact,document_agency,document_agency_phone,document_agency_contact,project_procurement_system,document_project_codes,document_product,document_moneysource,document_time_bidclose,document_time_bidopen,document_time_bidstart,document_time_commencement,document_time_completion,document_time_earnest_money_start,document_time_earnest_money_end,document_time_get_file_end,document_time_get_file_start,document_time_publicity_end,document_time_publicity_start,document_time_registration_end,document_time_registration_start,document_time_release,document_tmp_extract_count,document_nlp_enterprise,document_nlp_enterprise_attachment]):
         '''

+ 8 - 2
BaseDataMaintenance/maintenance/product/productUtils.py

@@ -220,7 +220,11 @@ def clean_product_brand(product_brand):
     :param product_brand:
     :return:
     '''
-    return product_brand
+    _search = re.search("品牌[::;;](?P<brand>.{2,8}?)([.。、;::]|规格|型号|生产厂家|厂家)",product_brand)
+    if _search is not None:
+        product_brand = _search.groupdict().get("brand")
+    brand = re.sub("[/\\,,、.|等]|一批|/无|品牌",'',product_brand)
+    return brand
 
 
 def clean_product_specs(product_specs):
@@ -269,4 +273,6 @@ if __name__ == '__main__':
     print(is_similar('128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10','VolusonE10'))
     print(re.split("[^\u4e00-\u9fff]",'128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10'))
     import Levenshtein
-    print(Levenshtein.ratio('助听器','助行器'))
+    print(Levenshtein.ratio('助听器','助行器'))
+    a = "无锡贝尔森品牌"
+    print(clean_product_brand(a))

+ 98 - 36
BaseDataMaintenance/maintenance/product/product_dict.py

@@ -176,28 +176,6 @@ class Product_Dict_Manager():
         mt.run()
         log("process embedding %d records cost %.2f s"%(q_size,time.time()-start_time))
 
-    def embedding_interface_producer(self):
-
-        bool_query = BoolQuery(must_queries=[RangeQuery("status",1,50,True,True)])
-
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
-                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)]),limit=100,get_total_count=True),
-                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
-        list_data = getRow_ots(rows)
-        for _data in list_data:
-            self.queue_product_interface.put(_data)
-
-        while next_token:
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
-                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
-            list_data = getRow_ots(rows)
-            for _data in list_data:
-                self.queue_product_interface.put(_data)
-            if self.queue_product_dict.qsize()>1000:
-                break
-        log("embedding interface total_count %d"%(total_count))
-
     def process_history_name(self,list_name,action):
         if action=="insert":
             # search document_product_temp and update status
@@ -280,6 +258,7 @@ class Product_Dict_Manager():
         # search document_product and rerun
         for name in list_brand:
             if action=="insert":
+                name = re.sub("有限|责任|公司",'',name)
                 bool_query = self.make_query(name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,4,5)
             else:
                 bool_query = self.make_query(name,DOCUMENT_PRODUCT_BRAND,TermQuery,len(name),5)
@@ -379,11 +358,14 @@ class Product_Dict_Manager():
 
 
     def act_insert(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
+
+
         #update document_product_dict
         if original_id is None or original_id=="":
             original_id = get_document_product_dict_id(parent_id,name)
         _d = {DOCUMENT_PRODUCT_DICT_ID:original_id,
               DOCUMENT_PRODUCT_DICT_ALIAS:alias,
+              DOCUMENT_PRODUCT_DICT_NAME:name,
               DOCUMENT_PRODUCT_DICT_GRADE:grade,
               DOCUMENT_PRODUCT_DICT_PARENT_ID:parent_id,
               DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias,
@@ -393,6 +375,17 @@ class Product_Dict_Manager():
         _dpd = Document_product_dict(_d)
         _dpd.update_row(self.ots_client)
 
+        # search interface if name and grade exists then update document_product_dict and return
+        bool_query = BoolQuery(must_queries=[
+            TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_NAME,name),
+            TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,grade),
+            RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS,201,301)
+        ])
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+                                                                            SearchQuery(bool_query,get_total_count=True))
+        if total_count>0:
+            return
+
         list_name = []
         #update milvus
         vector = request_embedding(name)
@@ -436,11 +429,15 @@ class Product_Dict_Manager():
 
     def act_update(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
         # check whether there are change variable
+        if original_id is None or original_id=="":
+            return
         _d = {DOCUMENT_PRODUCT_DICT_ID:original_id}
         dpd = Document_product_dict(_d)
-        if not dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],True):
+        if not dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_ALIAS,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_PARENT_ID],True):
             return
 
+        if parent_id is None or parent_id=="":
+            parent_id = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
         old_name = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_NAME)
         old_name_set = set([old_name])
         _alias = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS)
@@ -476,10 +473,15 @@ class Product_Dict_Manager():
                 expr = " ots_id in ['%s']"%o_id
                 Coll.delete(expr)
 
+        if old_name!=name:
+            new_id = get_document_product_dict_id(parent_id, name)
+        else:
+            new_id = original_id
+
         list_name = []
         vector = request_embedding(name)
         if vector is not None and Coll is not None:
-            id = original_id
+            id = new_id
             data = [[id],
                     [name],
                     [name],
@@ -511,12 +513,12 @@ class Product_Dict_Manager():
                     list_name.append(_alias)
 
         # process history
-        insert_names = list(new_name_set-old_name_set)
-        if len(insert_names)>0:
-            self.process_history(insert_names,grade,"insert")
         delete_names = list(old_name_set-new_name_set)
         if len(delete_names)>0:
             self.process_history([old_name],grade,"update")
+        insert_names = list(new_name_set-old_name_set)
+        if len(insert_names)>0:
+            self.process_history(insert_names,grade,"insert")
 
 
         # update document_product_dict
@@ -524,9 +526,43 @@ class Product_Dict_Manager():
               DOCUMENT_PRODUCT_DICT_NAME:name,
               DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
               DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias}
+
+        if alias is not None and alias!="":
+            _d[DOCUMENT_PRODUCT_DICT_ALIAS] = alias
+        if parent_id is not None and parent_id!="":
+            _d[DOCUMENT_PRODUCT_DICT_PARENT_ID] = parent_id
+
         dpd = Document_product_dict(_d)
         dpd.update_row(self.ots_client)
 
+        if old_name!=name:
+            # in the case of name changed ,delete the old name row
+            _d = {DOCUMENT_PRODUCT_DICT_ID:original_id}
+            dpd = Document_product_dict(_d)
+            dpd.delete_row(self.ots_client)
+
+            # change the next level parent_id
+
+            bool_query = BoolQuery(must_queries=[
+                TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,original_id)
+            ])
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
+                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+            list_data = getRow_ots(rows)
+            while next_token:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                    SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                    columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+
+                list_data.extend(getRow_ots(rows))
+            for _data in list_data:
+                dpd = Document_product_dict(_data)
+                dpd.setValue(DOCUMENT_PRODUCT_DICT_PARENT_ID,new_id,True)
+                dpd.update_row(self.ots_client)
+
+
 
     def act_delete(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
         #search records which name=name and grade=grade
@@ -558,12 +594,37 @@ class Product_Dict_Manager():
         self.process_history([name],grade,"delete")
 
         #delete document_product_dict
+        log("delete document_product_dict name:%s grade:%s count:%s"%(str(name),str(grade),str(len(list_data))))
         for _data in list_data:
             id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
             _d = {DOCUMENT_PRODUCT_DICT_ID:id}
-            dpdi = Document_product_dict_interface(_d)
-            dpdi.delete_row(self.ots_client)
+            dpd = Document_product_dict(_d)
+            dpd.delete_row(self.ots_client)
+
+    def embedding_interface_producer(self):
+
+        bool_query = BoolQuery(must_queries=[
+            # TermQuery("name",'济南鑫驰'),
+            RangeQuery("status",1,50,True,True)
+        ])
 
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)]),limit=100,get_total_count=True),
+                                                                            columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            self.queue_product_interface.put(_data)
+
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                self.queue_product_interface.put(_data)
+            if self.queue_product_dict.qsize()>1000:
+                break
+        log("embedding interface total_count %d"%(total_count))
 
     def embedding_interface_comsumer(self):
         def _handle(item,result_queue):
@@ -577,12 +638,13 @@ class Product_Dict_Manager():
             standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
             create_time = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)
 
-            if action=="insert":
-                self.act_insert(name,alias,grade,original_id,parent_id,standard_alias,create_time)
-            elif action=="update":
-                self.act_update(name,alias,grade,original_id,parent_id,standard_alias,create_time)
-            elif action=="delete":
-                self.act_delete(name,alias,grade,original_id,parent_id,standard_alias,create_time)
+            if name is not None and len(name)>1 and len(name)<MAX_NAME_LENGTH:
+                if action=="insert":
+                    self.act_insert(name,alias,grade,original_id,parent_id,standard_alias,create_time)
+                elif action=="update":
+                    self.act_update(name,alias,grade,original_id,parent_id,standard_alias,create_time)
+                elif action=="delete":
+                    self.act_delete(name,alias,grade,original_id,parent_id,standard_alias,create_time)
 
             _pdi = Document_product_dict_interface({DOCUMENT_PRODUCT_DICT_INTERFACE_ID:id,
                                                     DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300)})
@@ -590,7 +652,7 @@ class Product_Dict_Manager():
 
         self.embedding_interface_producer()
 
-        mt = MultiThreadHandler(self.queue_product_interface,_handle,None,5,1)
+        mt = MultiThreadHandler(self.queue_product_interface,_handle,None,20,1)
         mt.run()
 
 

+ 245 - 205
BaseDataMaintenance/maintenance/product/products.py

@@ -24,6 +24,7 @@ from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Ma
 from apscheduler.schedulers.blocking import BlockingScheduler
 
 from BaseDataMaintenance.maintenance.product.make_brand_pattern import *
+from BaseDataMaintenance.maintenance.product.product_dict import IS_SYNCHONIZED
 import logging
 
 root = logging.getLogger()
@@ -177,227 +178,198 @@ class Product_Manager(Product_Dict_Manager):
                         #     _dpd.update_row(self.ots_client)
                         break
         if name_ots_id is not None:
+
             if brand is not None and brand!="":
-                s_brand = brand
-                l_brand = [brand]
-                brand_ch = get_chinese_string(brand)
-                l_brand.extend(brand_ch)
-
-                _find = False
-                for brand in l_brand:
-
-                    brand_vector = request_embedding(brand)
-                    if brand_vector is not None:
-                        Coll,_ = self.get_collection(BRAND_GRADE)
-                        search_list = search_embedding(Coll,embedding_index_name,[brand_vector],self.search_params,output_fields,limit=60)
-
-                        # log("search brand %s"%(brand))
-                        for _search in search_list:
-
-                            ots_id = _search.entity.get("standard_name_id")
-                            ots_name = _search.entity.get("standard_name")
-                            ots_parent_id = _search.entity.get("ots_parent_id")
-
-                            # log("check brand %s and %s"%(brand,ots_name))
-                            if is_similar(brand,ots_name) or check_brand(brand,ots_name):
-
-                                # log("check brand similar succeed:%s and %s"%(brand,ots_name))
-                                new_brand = ots_name
-
-                                # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
-
-                                if name_ots_id is not None:
-                                    brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
-
-                                    _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
-                                                DOCUMENT_PRODUCT_DICT_NAME:new_brand,
-                                                DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(brand).lower()),
-                                                DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
-                                                DOCUMENT_PRODUCT_DICT_STATUS:1,
-                                                DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
-                                                DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                                DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                                }
-                                    _dpd_brand = Document_product_dict(_d_brand)
-                                    # _dpd_brand.updateAlias(str(new_brand).lower())
-                                    if not _dpd_brand.exists_row(self.ots_client):
-                                        # _dpd_brand.update_row(self.ots_client)
-                                        _d_brand = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
-                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_brand,
-                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(str(brand).lower()),
-                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:BRAND_GRADE,
-                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
-                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:name_ots_id,
-                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
-                                                    }
-                                        dpdi = Document_product_dict_interface(_d_brand)
-                                        dpdi.update_row(self.ots_client)
-                                    else:
-                                        pass
-                                        # #update alias
-                                        # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
-                                        # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
-                                        # if _flag:
-                                        #     if _dpd.updateAlias(brand):
-                                        #         _dpd.update_row(self.ots_client)
-
-                                _find = True
-                                break
-                            else:
-                                # log("check brand similar failed:%s and %s"%(brand,ots_name))
-                                # add new brand?
-                                pass
-                        if _find:
-                            break
-                if not _find:
+
+                #check ots
+                bool_query = BoolQuery(must_queries=[
+                    TermQuery(DOCUMENT_PRODUCT_DICT_NAME,brand),
+                    TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,BRAND_GRADE)
+                ])
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                                    SearchQuery(bool_query,get_total_count=True))
+                if total_count>0:
+                    new_brand = brand
+                else:
+                    s_brand = brand
+                    l_brand = [brand]
+                    l_brand.append(clean_product_brand(s_brand))
+                    brand_ch = get_chinese_string(brand)
+                    l_brand.extend(brand_ch)
+
+                    _find = False
                     for brand in l_brand:
-                        if self.check_new_brand(brand):
-                            _d_brand = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:brand,
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(str(brand).lower()),
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:BRAND_GRADE,
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:name_ots_id,
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                        DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
-                                        }
-                            dpdi = Document_product_dict_interface(_d_brand)
-                            dpdi.update_row(self.ots_client)
 
+                        brand_vector = request_embedding(brand)
+                        if brand_vector is not None:
+                            Coll,_ = self.get_collection(BRAND_GRADE)
+                            search_list = search_embedding(Coll,embedding_index_name,[brand_vector],self.search_params,output_fields,limit=60)
 
-            if specs is not None and specs!="":
-                debug("getting sepcs %s"%(specs))
-                list_specs = []
-                c_specs = clean_product_specs(specs)
-                list_specs.append(c_specs)
-
-                for s in re.split("[\u4e00-\u9fff]",specs):
-                    if s!="" and len(s)>4:
-                        list_specs.append(s)
-                similar_flag = None
-                _index = 0
-                break_flag = False
-                for c_specs in list_specs:
-                    if break_flag:
-                        break
-                    _index += 1
-                    specs_vector = request_embedding(c_specs)
-
-                    if specs_vector is not None:
-                        Coll,_ = self.get_collection(SPECS_GRADE)
-                        search_list = search_embedding(Coll,embedding_index_name,[specs_vector],self.search_params,output_fields,limit=60)
-
-                        for _search in search_list:
-
-                            ots_id = _search.entity.get("standard_name_id")
-                            ots_name = _search.entity.get("standard_name")
-                            ots_parent_id = _search.entity.get("ots_parent_id")
-
-                            log("checking specs %s and %s"%(specs,ots_name))
-                            if is_similar(specs,ots_name):
-                                # log("specs is_similar")
-                                if check_specs(c_specs,ots_name):
-                                    log("check_specs succeed")
-                                    break_flag = True
-                                    new_specs = ots_name
-
-                                    # to update the document_product_dict which is builded for search
-                                    if brand_ots_id is not None:
-                                        # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
-                                        specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
-
-                                        _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
-                                                    DOCUMENT_PRODUCT_DICT_NAME:new_specs,
-                                                    DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(specs).lower()),
-                                                    DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                            # log("search brand %s"%(brand))
+                            for _search in search_list:
+
+                                ots_id = _search.entity.get("standard_name_id")
+                                ots_name = _search.entity.get("standard_name")
+                                ots_parent_id = _search.entity.get("ots_parent_id")
+
+                                # log("check brand %s and %s"%(brand,ots_name))
+                                if is_similar(brand,ots_name) or check_brand(brand,ots_name):
+
+                                    # log("check brand similar succeed:%s and %s"%(brand,ots_name))
+                                    new_brand = ots_name
+
+                                    log("checking brand %s succeed %s"%(brand,new_brand))
+                                    # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
+
+                                    if name_ots_id is not None:
+                                        brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
+
+                                        _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
+                                                    DOCUMENT_PRODUCT_DICT_NAME:new_brand,
+                                                    DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(brand).lower()),
+                                                    DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
                                                     DOCUMENT_PRODUCT_DICT_STATUS:1,
-                                                    DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                                    DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
+                                                    DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
                                                     DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
                                                     DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
                                                     }
-                                        _dpd_specs = Document_product_dict(_d_specs)
-                                        # _dpd_specs.updateAlias(str(new_specs).lower())
-                                        if not _dpd_specs.exists_row(self.ots_client):
-                                            # _dpd_specs.update_row(self.ots_client)
-                                            # user interface to add
-                                            _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
-                                                  DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
-                                                  DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
-                                                  DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
-                                                  DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
-                                                  DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
-                                                  DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                                  DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                                  DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
-                                                  }
-                                            _dpdi = Document_product_dict_interface(_d)
-                                            _dpdi.update_row(self.ots_client)
+                                        _dpd_brand = Document_product_dict(_d_brand)
+                                        # _dpd_brand.updateAlias(str(new_brand).lower())
+                                        if not _dpd_brand.exists_row(self.ots_client):
+                                            _dpd_brand.update_row(self.ots_client)
 
                                         else:
                                             pass
                                             # #update alias
-                                            # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
+                                            # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
                                             # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
                                             # if _flag:
-                                            #     if _dpd.updateAlias(specs):
+                                            #     if _dpd.updateAlias(brand):
                                             #         _dpd.update_row(self.ots_client)
+
+                                    _find = True
                                     break
                                 else:
-                                    if _index == 1:
-                                        similar_flag = True
-
-                if not break_flag and similar_flag:
-                    log("check_specs failed")
-                    new_specs = clean_product_specs(specs)
-                    # insert into document_product_dict a new record
-                    # to update the document_product_dict which is builded for search
-                    # add new specs
-                    if brand_ots_id is not None and name_ots_id is not None:
-                        # _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
-                        # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
-                        #       DOCUMENT_PRODUCT_DICT_NAME:new_specs,
-                        #       DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(new_specs.lower()),
-                        #       DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
-                        #       DOCUMENT_PRODUCT_DICT_STATUS:1,
-                        #       DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
-                        #       DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                        #       DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                        #       }
-                        # _dpd = Document_product_dict(_d)
-                        # # _dpd.updateAlias(new_specs)
-                        # _dpd.update_row(self.ots_client)
-
-                        # user interface to add
-                        _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(specs,new_specs.lower()),
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
-                              }
-                        _dpdi = Document_product_dict_interface(_d)
-                        _dpdi.update_row(self.ots_client)
+                                    # log("check brand similar failed:%s and %s"%(brand,ots_name))
+                                    # add new brand?
+                                    pass
+                            if _find:
+                                break
+                    if not _find:
+                        for brand in l_brand:
+                            if self.check_new_brand(brand):
+                                new_brand = clean_product_brand(brand)
+                                if new_brand=="":
+                                    continue
+                                log("adding new brand %s"%(str(new_brand)))
+                                _d_brand = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
+                                            DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_brand,
+                                            DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(str(brand).lower()),
+                                            DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:BRAND_GRADE,
+                                            DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
+                                            DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:name_ots_id,
+                                            DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                            DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                            DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
+                                            }
+                                dpdi = Document_product_dict_interface(_d_brand)
+                                dpdi.update_row(self.ots_client)
+                                break
+
+
+
+            if specs is not None and specs!="":
+
+                #check ots
+                bool_query = BoolQuery(must_queries=[
+                    TermQuery(DOCUMENT_PRODUCT_DICT_NAME,specs),
+                    TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,SPECS_GRADE)
+                ])
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                                    SearchQuery(bool_query,get_total_count=True))
+                if total_count>0:
+                    new_specs = specs
                 else:
-                    # add new specs?
-                    debug("specs not similar")
-                    if is_legal_specs(specs):
-                        debug("is_legal_specs")
+                    debug("getting sepcs %s"%(specs))
+                    list_specs = []
+                    c_specs = clean_product_specs(specs)
+                    list_specs.append(c_specs)
+
+                    for s in re.split("[\u4e00-\u9fff]",specs):
+                        if s!="" and len(s)>4:
+                            list_specs.append(s)
+                    similar_flag = None
+                    _index = 0
+                    break_flag = False
+                    for c_specs in list_specs:
+                        if break_flag:
+                            break
+                        _index += 1
+                        specs_vector = request_embedding(c_specs)
+
+                        if specs_vector is not None:
+                            Coll,_ = self.get_collection(SPECS_GRADE)
+                            search_list = search_embedding(Coll,embedding_index_name,[specs_vector],self.search_params,output_fields,limit=60)
+
+                            for _search in search_list:
+
+                                ots_id = _search.entity.get("standard_name_id")
+                                ots_name = _search.entity.get("standard_name")
+                                ots_parent_id = _search.entity.get("ots_parent_id")
+
+                                debug("checking specs %s and %s"%(specs,ots_name))
+                                if is_similar(specs,ots_name):
+                                    # log("specs is_similar")
+                                    if check_specs(c_specs,ots_name):
+                                        break_flag = True
+                                        new_specs = ots_name
+                                        log("check_specs %s succeed %s"%(specs,new_specs))
+
+                                        # to update the document_product_dict which is builded for search
+                                        if brand_ots_id is not None:
+                                            # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
+                                            specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
+
+                                            _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
+                                                        DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                                        DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(specs).lower()),
+                                                        DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                                        DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                                        DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                                        DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
+                                                        DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                                        DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                                        }
+                                            _dpd_specs = Document_product_dict(_d_specs)
+                                            # _dpd_specs.updateAlias(str(new_specs).lower())
+                                            if not _dpd_specs.exists_row(self.ots_client):
+                                                _dpd_specs.update_row(self.ots_client)
+                                                # user interface to add
+                                            else:
+                                                pass
+                                                # #update alias
+                                                # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
+                                                # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
+                                                # if _flag:
+                                                #     if _dpd.updateAlias(specs):
+                                                #         _dpd.update_row(self.ots_client)
+                                        break
+                                    else:
+                                        if _index == 1:
+                                            similar_flag = True
+
+                    if not break_flag and similar_flag:
+                        debug("check_specs failed")
                         new_specs = clean_product_specs(specs)
                         # insert into document_product_dict a new record
                         # to update the document_product_dict which is builded for search
                         # add new specs
-                        if brand_ots_id is not None and name_ots_id is not None:
-                            _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
-
+                        if brand_ots_id is not None and name_ots_id is not None and len(specs)<MAX_NAME_LENGTH:
+                            # _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
                             # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
                             #       DOCUMENT_PRODUCT_DICT_NAME:new_specs,
-                            #       DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
+                            #       DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(new_specs.lower()),
                             #       DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
                             #       DOCUMENT_PRODUCT_DICT_STATUS:1,
                             #       DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
@@ -405,12 +377,14 @@ class Product_Manager(Product_Dict_Manager):
                             #       DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
                             #       }
                             # _dpd = Document_product_dict(_d)
+                            # # _dpd.updateAlias(new_specs)
                             # _dpd.update_row(self.ots_client)
 
                             # user interface to add
+                            log("adding new specs %s"%(new_specs))
                             _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
                                   DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
-                                  DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
+                                  DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(specs),
                                   DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
                                   DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
                                   DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
@@ -420,6 +394,44 @@ class Product_Manager(Product_Dict_Manager):
                                   }
                             _dpdi = Document_product_dict_interface(_d)
                             _dpdi.update_row(self.ots_client)
+                    else:
+                        # add new specs?
+                        debug("specs not similar")
+                        if is_legal_specs(specs) and len(specs)<MAX_NAME_LENGTH:
+                            debug("is_legal_specs")
+                            new_specs = clean_product_specs(specs)
+                            # insert into document_product_dict a new record
+                            # to update the document_product_dict which is builded for search
+                            # add new specs
+                            if brand_ots_id is not None and name_ots_id is not None:
+                                _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
+
+                                # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
+                                #       DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                #       DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
+                                #       DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                #       DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                #       DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                #       DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                #       DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                #       }
+                                # _dpd = Document_product_dict(_d)
+                                # _dpd.update_row(self.ots_client)
+
+                                log("adding new specs %s"%(new_specs))
+                                # user interface to add
+                                _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
+                                      }
+                                _dpdi = Document_product_dict_interface(_d)
+                                _dpdi.update_row(self.ots_client)
 
         # judge if the product matches the standard product
         if name_ots_id is not None:
@@ -485,6 +497,10 @@ class Product_Manager(Product_Dict_Manager):
         save_product_tmp.update_row(self.ots_client)
 
     def check_new_brand(self,brand):
+
+        _search = re.search("品牌[::;;](?P<brand>.{2,8}?)([.。、;::]|规格|型号|生产厂家|厂家)",brand)
+        if _search is not None:
+            brand = _search.groupdict().get("brand")
         if brand is None or len(brand)<2:
             return False
         # check whether this brand exists in interface and action is delete
@@ -501,14 +517,14 @@ class Product_Manager(Product_Dict_Manager):
 
         # check whether this brand exists in dict and grade=name_grade or grade=specs_grade
         bool_query = BoolQuery(must_queries=[
-            TermQuery(DOCUMENT_PRODUCT_DICT_NAME,name),
+            TermQuery(DOCUMENT_PRODUCT_DICT_NAME,brand),
             BoolQuery(should_queries=[
                 TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,NAME_GRADE),
                 TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,SPECS_GRADE)
             ])
 
         ])
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
                                                                             SearchQuery(bool_query,get_total_count=True))
         if total_count>0:
             return False
@@ -535,9 +551,12 @@ class Product_Manager(Product_Dict_Manager):
 
         if total_count>=5:
             new_brand = re.sub("[^\u4e00-\u9fff]",'',brand)
-            if re.search("详见|无|国产|null|其他|详细|废标|[0-9/]|品牌|文件") is None and len(brand)<=8:
+            if re.search("详见|无|国产|null|其他|详细|废标|[0-9/]|品牌|文件",brand) is None and len(brand)<=8:
                 return True
 
+        # extract the brand
+        # "品牌[::]?(<brand>.{2,5}([.。、::]|型号|生产厂家|厂家))"
+
 
 
     @staticmethod
@@ -816,6 +835,11 @@ def fix_product_data():
     mt.run()
 
 def test_check_brand():
+    import logging
+    root = logging.getLogger()
+    root.setLevel(logging.DEBUG)
+    from queue import Queue
+
     brand_path = "brand.txt"
     list_brand = []
     with open(brand_path,"r",encoding="utf8") as f:
@@ -827,6 +851,8 @@ def test_check_brand():
             if len(line)>0:
                 brand = {"brand":line}
                 list_brand.append(brand)
+            # if len(list_brand)>100:
+            #     break
     task_queue = Queue()
     for _d in list_brand:
         task_queue.put(_d)
@@ -835,18 +861,22 @@ def test_check_brand():
 
     def _handle(item,result_queue):
         brand = item.get("brand")
+        new_brand = clean_product_brand(brand)
         _f = pm.check_new_brand(brand)
         item["f"] = _f
+        item["new_brand"] = new_brand
     mt = MultiThreadHandler(task_queue,_handle,None,30,1)
     mt.run()
     list_legal_brand = []
     list_illegal_brand = []
     for _d in list_brand:
-        brand = _d.get("brand")
-        f = _d["f"]
+        f = _d.get("f")
+        log("brand %s flag %s"%(brand,str(f)))
         if f:
+            brand = _d.get("new_brand")
             list_legal_brand.append(brand)
         else:
+            brand = _d.get("brand")
             list_illegal_brand.append(brand)
     with open("legal_brand.txt","w",encoding="utf8") as f:
         for b in list_legal_brand:
@@ -855,12 +885,22 @@ def test_check_brand():
         for b in list_illegal_brand:
             f.write(b+"\n")
 
+def test_match():
+    a = "Mini-7"
+    vector = request_embedding(a)
+    pm = Product_Manager()
+    Coll,_ = pm.get_collection(NAME_GRADE)
+    output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]
+    search_list = search_embedding(Coll,embedding_index_name,[vector],pm.search_params,output_fields,limit=60)
+    print(search_list)
+
 
 def test():
     # pm = Product_Manager()
     # pm.test()
-    fix_product_data()
-
+    # fix_product_data()
+    # test_check_brand()
+    test_match()
 
 if __name__ == '__main__':
 

+ 4 - 1
BaseDataMaintenance/maintenance/proposedBuilding/DataSynchronization.py

@@ -200,9 +200,12 @@ class DataSynchronization():
         mt.run()
 
     def scheduler(self):
+        from BaseDataMaintenance.maintenance.major_project.unionDocument import MajorUnion
+        mu = MajorUnion()
         _scheduler = BlockingScheduler()
-        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/1")
+        _scheduler.add_job(self.maxcompute2ots,"cron",minute="*/8")
         _scheduler.add_job(self.turn_stage,"cron",hour="*/5")
+        _scheduler.add_job(mu.comsumer,"cron",minute="*/8")
         _scheduler.start()
 
 def startSychro():

+ 3 - 1
BaseDataMaintenance/model/ots/proposedBuilding_tmp.py

@@ -27,6 +27,8 @@ stage_priority_dict = {
     "竣工阶段": "竣工阶段"
 }
 
+data_flow = Dataflow_dumplicate(start_delete_listener=False)
+
 class proposedBuilding_tmp(BaseModel):
 
     def __init__(self,_dict):
@@ -37,7 +39,7 @@ class proposedBuilding_tmp(BaseModel):
         self.dict_enterprise = {}
         self.dict_enterprise_contact = {}
         self.dict_document = {}
-        self.data_flow = Dataflow_dumplicate(start_delete_listener=False)
+        self.data_flow = data_flow
 
     def getPrimary_keys(self):
         return ["uuid"]