Просмотр исходного кода

重建milvus集合,通过名称md5进行去重,解决匹配不了大小写等不完整匹配问题和提升匹配效率

luojiehua 1 год назад
Родитель
Сommit
a833766409

+ 2 - 2
BaseDataMaintenance/common/milvusUtil.py

@@ -18,7 +18,7 @@ def create_embedding_schema(collection_name,fields,index_name,index_params):
     if not has:
         print("creating collection")
         coll_schema = CollectionSchema(fields,"this is the embedding schema")
-        coll = Collection(collection_name,coll_schema,consistency_level="Strong")
+        coll = Collection(collection_name,coll_schema)
 
         #create index for milvus_embedding
         coll.create_index(index_name,index_params=index_params)
@@ -47,7 +47,7 @@ def insert_embedding(coll,entities,retry_times =3):
 
 
 def search_embedding(coll,index_name,vector,search_params,output_fields,limit=3,retry_times=3):
-    for _ in retry_times:
+    for _ in range(retry_times):
         try:
             list_result = []
             result = coll.search(vector,index_name,search_params,top_k=limit,output_fields=output_fields,limit=limit)

+ 2 - 2
BaseDataMaintenance/maintenance/dataflow.py

@@ -2842,8 +2842,8 @@ class Dataflow_dumplicate(Dataflow):
 
     def flow_dumpcate_comsumer(self):
         from multiprocessing import Process
-        process_count = 1
-        thread_count = 20
+        process_count = 2
+        thread_count = 30
         list_process = []
         def start_thread():
             mt = MultiThreadHandler(self.queue_dumplicate,self.dumplicate_comsumer_handle,None,thread_count,1,need_stop=False,ots_client=self.ots_client)

+ 10 - 11
BaseDataMaintenance/maintenance/product/productUtils.py

@@ -180,13 +180,16 @@ def is_similar(source,target,_radio=None):
         return False
     #判断相似度
     similar = fuzz.ratio(source,target)
+    print("similar",similar)
     if similar>=min_ratio:
         return True
     similar_jaro = Levenshtein.jaro(source,target)
+    print("similar_jaro",similar_jaro)
     if similar_jaro*100>=min_ratio:
         return True
     similar_jarow = Levenshtein.jaro_winkler(source,target)
-    if similar_jarow*100>=90:
+    print("similar_jaro",similar_jaro)
+    if similar_jarow*100>=min_ratio:
         return True
 
     if min_len>=5:
@@ -200,9 +203,6 @@ def is_similar(source,target,_radio=None):
     if len(source)==max_len and judge_pur_chinese(target):
         if str(source).find(target)>=0:
             return True
-    if len(target)==max_len and judge_pur_chinese(source):
-        if target.find(source)>=0:
-            return True
     return False
 
 
@@ -441,10 +441,9 @@ def clean_product_quantity(product_quantity):
     return ""
 
 if __name__ == '__main__':
-    print(is_similar('128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10','VolusonE10'))
-    print(re.split("[^\u4e00-\u9fff]",'128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10'))
-    import Levenshtein
-    print(Levenshtein.ratio('助听器','助行器'))
-    a = "无锡贝尔森品牌"
-    print(clean_product_brand(a))
-    print(is_legal_brand(getConnect_ots(),"液晶显示"))
+    print(is_similar('超声','超声炮',_radio=99))
+    # print(re.split("[^\u4e00-\u9fff]",'128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10'))
+    # import Levenshtein
+    # print(Levenshtein.ratio('助听器','助行器'))
+    # print(clean_product_brand(a))
+    # print(is_legal_brand(getConnect_ots(),"康复"))

+ 19 - 3
BaseDataMaintenance/maintenance/product/product_dict.py

@@ -375,6 +375,7 @@ class Product_Dict_Manager():
             _d = {DOCUMENT_PRODUCT_DICT_ID:original_id,
                   DOCUMENT_PRODUCT_DICT_ALIAS:alias,
                   DOCUMENT_PRODUCT_DICT_NAME:name,
+                  DOCUMENT_PRODUCT_DICT_STATUS:1,
                   DOCUMENT_PRODUCT_DICT_GRADE:grade,
                   DOCUMENT_PRODUCT_DICT_PARENT_ID:parent_id,
                   DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias,
@@ -430,6 +431,7 @@ class Product_Dict_Manager():
                             [grade]]
                     insert_embedding(Coll,data)
                     list_name.append(_alias)
+            time.sleep(3)
 
         #judge whether there exists records before this record created,if not process the history data
         if not self.exists_records(name,grade,create_time):
@@ -519,6 +521,7 @@ class Product_Dict_Manager():
                                 [grade]]
                         insert_embedding(Coll,data)
                         list_name.append(_alias)
+            time.sleep(3)
 
         # process history
         delete_names = list(old_name_set-new_name_set)
@@ -615,6 +618,7 @@ class Product_Dict_Manager():
             o_id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
             expr = " ots_id in ['%s']"%o_id
             Coll.delete(expr)
+        time.sleep(3)
 
         #process_history data
         self.process_history([name],grade,"delete")
@@ -813,8 +817,7 @@ def search_similar():
     df.to_excel("search_similar1.xlsx",columns=df_columns)
 
 
-def insert_interface_delete(name,grade):
-    ots_client = getConnect_ots()
+def dict_interface_delete(name,grade,ots_client = getConnect_ots()):
     from uuid import uuid4
     _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
           DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
@@ -826,6 +829,19 @@ def insert_interface_delete(name,grade):
     dpdi.update_row(ots_client)
 
 
+def interface_deletes():
+    a = '''
+    MFUSONE
+    '''
+    ots_client=getConnect_ots()
+    for s in re.split("[\n\s,.,。、]",a):
+        s = s.strip()
+        if s=="":
+            continue
+        print(s)
+        dict_interface_delete(s,4,ots_client)
+
+
 if __name__ == '__main__':
     # start_embedding_product_dict()
-    insert_interface_delete("液晶显示屏",4)
+    interface_deletes()

+ 7 - 3
BaseDataMaintenance/maintenance/product/product_setting.py

@@ -11,9 +11,13 @@ PRODUCT_TMEP_STATUS_TO_SYNC = [201,300]
 PRODUCT_TEMP_STATUS_TO_NO_SYNC = [401,450]
 PRODUCT_TEMP_STATUS_TO_REPEATED = [451,500]
 
-COLLECTION_NAME_NAME = "product_dict_embedding_name"
-COLLECTION_NAME_BRAND = "product_dict_embedding_brand"
-COLLECTION_NAME_SPECS = "product_dict_embedding_specs"
+# COLLECTION_NAME_NAME = "product_dict_embedding_name"
+# COLLECTION_NAME_BRAND = "product_dict_embedding_brand"
+# COLLECTION_NAME_SPECS = "product_dict_embedding_specs"
+
+COLLECTION_NAME_NAME = "product_dict_embedding_name_single"
+COLLECTION_NAME_BRAND = "product_dict_embedding_brand_single"
+COLLECTION_NAME_SPECS = "product_dict_embedding_specs_single"
 
 NAME_GRADE = 3
 BRAND_GRADE = 4

+ 124 - 25
BaseDataMaintenance/maintenance/product/products.py

@@ -164,13 +164,15 @@ class Product_Manager(Product_Dict_Manager):
 
                 for _search in search_list:
                     ots_id = _search.get("standard_name_id")
-                    ots_name = _search.get("standard_name")
+                    ots_name = _search.get("ots_name")
+                    standard_name = _search.get("standard_name")
                     ots_parent_id = _search.get("ots_parent_id")
 
                     if is_similar(name,ots_name) or check_product(name,ots_name):
                         name_ots_id = ots_id
-                        new_name = ots_name
+                        new_name = standard_name
 
+                        log("checking name %s succeed %s"%(name,ots_name))
                         # #update alias of name
                         # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
                         # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
@@ -186,12 +188,15 @@ class Product_Manager(Product_Dict_Manager):
 
                     for _search in search_list:
                         ots_id = _search.get("standard_name_id")
-                        ots_name = _search.get("standard_name")
+                        ots_name = _search.get("ots_name")
+                        standard_name = _search.get("standard_name")
                         ots_parent_id = _search.get("ots_parent_id")
 
-                        if is_similar(name,ots_name) or check_product(name,ots_name):
+                        if is_similar(name,ots_name,_radio=95):
+
+                            log("checking name %s succeed %s"%(name,ots_name))
                             name_ots_id = ots_id
-                            new_name = ots_name
+                            new_name = standard_name
 
                             # #update alias of name
                             # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
@@ -221,7 +226,8 @@ class Product_Manager(Product_Dict_Manager):
                         for _search in search_list:
 
                             ots_id = _search.get("standard_name_id")
-                            ots_name = _search.get("standard_name")
+                            ots_name = _search.get("ots_name")
+                            standard_name = _search.get("standard_name")
                             ots_parent_id = _search.get("ots_parent_id")
 
                             # log("check brand %s and %s"%(brand,ots_name))
@@ -231,7 +237,7 @@ class Product_Manager(Product_Dict_Manager):
 
                                 if ots_name==new_name:
                                     continue
-                                new_brand = ots_name
+                                new_brand = standard_name
 
                                 log("checking brand %s succeed %s"%(brand,new_brand))
                                 # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
@@ -320,15 +326,16 @@ class Product_Manager(Product_Dict_Manager):
 
 
                                 ots_id = _search.get("standard_name_id")
-                                ots_name = _search.get("standard_name")
+                                ots_name = _search.get("ots_name")
+                                standard_name = _search.get("standard_name")
                                 ots_parent_id = _search.get("ots_parent_id")
 
                                 # log("check brand %s and %s"%(brand,ots_name))
-                                if is_similar(brand,ots_name,_radio=95) or check_brand(brand,ots_name):
+                                if is_similar(brand,ots_name,_radio=95):
                                     # log("check brand similar succeed:%s and %s"%(brand,ots_name))
                                     if ots_name==new_name:
                                         continue
-                                    new_brand = ots_name
+                                    new_brand = standard_name
 
                                     log("checking brand %s succeed %s"%(brand,new_brand))
                                     # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
@@ -390,7 +397,8 @@ class Product_Manager(Product_Dict_Manager):
                         for _search in search_list:
 
                             ots_id = _search.get("standard_name_id")
-                            ots_name = _search.get("standard_name")
+                            ots_name = _search.get("ots_name")
+                            standard_name = _search.get("standard_name")
                             ots_parent_id = _search.get("ots_parent_id")
 
                             debug("checking specs %s and %s"%(specs,ots_name))
@@ -398,7 +406,7 @@ class Product_Manager(Product_Dict_Manager):
                                 # log("specs is_similar")
                                 if check_specs(c_specs,ots_name):
                                     break_flag = True
-                                    new_specs = ots_name
+                                    new_specs = standard_name
                                     log("check_specs %s succeed %s"%(specs,new_specs))
 
                                     # to update the document_product_dict which is builded for search
@@ -502,7 +510,9 @@ class Product_Manager(Product_Dict_Manager):
                                 break
 
                             ots_id = _search.get("standard_name_id")
-                            ots_name = _search.get("standard_name")
+
+                            ots_name = _search.get("ots_name")
+                            standard_name = _search.get("standard_name")
                             ots_parent_id = _search.get("ots_parent_id")
 
                             debug("checking specs %s and %s"%(specs,ots_name))
@@ -510,7 +520,7 @@ class Product_Manager(Product_Dict_Manager):
                                 # log("specs is_similar")
                                 if check_specs(c_specs,ots_name):
                                     break_flag = True
-                                    new_specs = ots_name
+                                    new_specs = standard_name
                                     if brand_ots_id is not None:
                                         # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
                                         specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
@@ -922,7 +932,6 @@ def fix_product_data():
             dpt.setValue(DOCUMENT_PRODUCT_TMP_STATUS,1,True)
             dpt.update_row(ots_client)
 
-
     mt = MultiThreadHandler(task_queue,handle,None,30,1)
     mt.run()
 
@@ -978,29 +987,119 @@ def test_check_brand():
             f.write(b+"\n")
 
 def test_match():
-    a = "Mini-7"
+    a = "MFUSONE"
     vector = request_embedding(a)
     pm = Product_Manager()
-    Coll,_ = pm.get_collection(NAME_GRADE)
+    Coll,_ = pm.get_collection(SPECS_GRADE)
     output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]
-    search_list = search_embedding(Coll,embedding_index_name,[vector],pm.search_params,output_fields,limit=60)
+    search_list = search_embedding(Coll,embedding_index_name,[vector],pm.search_params,output_fields,limit=20)
     print(search_list)
 
 
+def rebuild_milvus():
+
+    pdm = Product_Dict_Manager()
+    from multiprocessing import Queue as PQueue
+    bool_query = BoolQuery(must_queries=[
+        RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3)
+    ])
+    ots_client = getConnect_ots()
+    rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("grade")]),limit=100,get_total_count=True),
+                                                                   ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
+
+    list_data = getRow_ots(rows)
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                       ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
+
+        list_data.extend(getRow_ots(rows))
+        print("%d/%d"%(len(list_data),total_count))
+    set_name_grade = set()
+    task_queue = PQueue()
+    for _data in list_data:
+        name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
+        grade = _data.get(DOCUMENT_PRODUCT_DICT_GRADE)
+        _key = "%s--%d"%(name,grade)
+        if _key not in set_name_grade:
+            task_queue.put(_data)
+        set_name_grade.add(_key)
+
+    log("rebuild milvus %d counts"%(task_queue.qsize()))
+    def insert_into_milvus(item,result_queue):
+
+        name = item.get(DOCUMENT_PRODUCT_DICT_NAME,"")
+        n_name = get_milvus_standard_name(name)
+
+        name_id = get_milvus_product_dict_id(n_name)
+
+        vector = request_embedding(n_name)
+        parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID,"")
+        grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
+        Coll,_ = pdm.get_collection(grade)
+        standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
+        if vector is not None and Coll is not None:
+
+            data = [[name_id],
+                    [name],
+                    [name],
+                    [name_id],
+                    [vector],
+                    [parent_id],
+                    [grade]]
+            insert_embedding(Coll,data)
+
+            if standard_alias is not None and standard_alias!="":
+                list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
+                for _alias in list_alias:
+                    _alias = _alias.strip()
+                    if len(_alias)==0:
+                        continue
+                    if _alias==name:
+                        continue
+                    _id = get_document_product_dict_standard_alias_id(_alias)
+                    n_alias = get_milvus_standard_name(_alias)
+                    vector = request_embedding(n_alias)
+                    data = [[_id],
+                            [_alias],
+                            [name],
+                            [name_id],
+                            [vector],
+                            [parent_id],
+                            [grade]]
+                    insert_embedding(Coll,data)
+
+    def start_thread():
+        mt = MultiThreadHandler(task_queue,insert_into_milvus,None,5)
+        mt.run()
+    p_count = 5
+    list_p = []
+    for i in range(p_count):
+        p = Process(target=start_thread)
+        list_p.append(p)
+    for p in list_p:
+        p.start()
+    for p in list_p:
+        p.join()
+
+
 def test():
     # pm = Product_Manager()
     # pm.test()
-    fix_product_data()
+    # fix_product_data()
     # test_check_brand()
     # test_match()
+    rebuild_milvus()
+
 
 if __name__ == '__main__':
 
     # start_process_product()
     # print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))
-    test()
-    print(Product_Manager.get_bid_filemd5s(155415770,getConnect_ots()))
-    name = "一"
-    ots_name = "一氧化碳分析仪"
-    print(is_similar(name,ots_name),check_product(name,ots_name))
-    print(is_legal_specs('SCM-A/SB(0.18D)'))
+    # print(Product_Manager.get_bid_filemd5s(155415770,getConnect_ots()))
+    # name = "一"
+    # ots_name = "一氧化碳分析仪"
+    # print(is_similar(name,ots_name),check_product(name,ots_name))
+    # print(is_legal_specs('SCM-A/SB(0.18D)'))
+    test()

+ 8 - 2
BaseDataMaintenance/model/ots/document_product_dict.py

@@ -15,8 +15,8 @@ DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS = "standard_alias"
 
 DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR = "|"
 
-MAX_NAME_LENGTH = 300
 
+MAX_NAME_LENGTH = 300
 class Document_product_dict(BaseModel):
 
     def __init__(self,_dict):
@@ -46,4 +46,10 @@ def get_document_product_dict_id(parent_md5,name):
     return getMD5(parent_md5+"&&%s"%name)
 
 def get_document_product_dict_standard_alias_id(name):
-    return getMD5("alias&&%s"%name)
+    return get_milvus_product_dict_id(name)
+
+def get_milvus_standard_name(name):
+    return "%s"%(str(name)[:MAX_NAME_LENGTH].lower())
+
+def get_milvus_product_dict_id(name):
+    return getMD5(get_milvus_standard_name(name))