Browse Source

迁移数据

luojiehua 1 year ago
parent
commit
c8dbdfcf6b

+ 2 - 2
BaseDataMaintenance/dataSource/source.py

@@ -155,12 +155,12 @@ def getConnect_redis_doc():
 
 def getConnect_redis_product():
     db = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT,
-                           db=8,password=REDIS_PASS)
+                           db=9,password=REDIS_PASS)
     return db
 
 def getConnect_redis_product_pool():
     pool = redis.ConnectionPool(host=REDIS_HOST, port=REDIS_PORT,
-                                db=8,password=REDIS_PASS,max_connections=40)
+                                db=9,password=REDIS_PASS,max_connections=40)
     return pool
 
 if __name__=="__main__":

+ 46 - 7
BaseDataMaintenance/maintenance/product/1.py

@@ -24,14 +24,53 @@ x = [[1,1,22,2,2,2,2],
      [3,1,22,2,2,2,2],
      [1.5,1,22,2,2,2,2]]
 km.fit(x)
-print(km.predict(x))
-
-from uuid import uuid4
-
-print(type(uuid4().hex))
-
-print(5==5.00)
 
+a = '''
+bidding_budget double,
+    brand_specs string,
+    province string,
+    city STRING,
+    district string,
+    create_time string,
+    dict_name_id string,
+    docchannel bigint,
+    docid bigint,
+    doctitle string,
+    full_name string,
+    industry string,
+    info_type string,
+    page_time string,
+    page_time_year string,
+    procurement_system STRING,
+    project_code string,
+    project_name string,
+    quantity bigint,
+    quantity_unit string,
+    supplier string,
+    tenderee string,
+    tenderee_contact string,
+    tenderee_phone string,
+    update_time string,
+    win_bid_price double,
+    win_tenderer string,
+    win_tenderer_manager string,
+    win_tenderer_phone string,
+    dict_brand_id string,
+    dict_specs_id string,
+    dump_id string,
+    total_price double,
+    unit_price double,
+    bid_filemd5s string
+'''
+
+list_c = []
+for b in a.split("\n"):
+     c = b.strip()
+     if c=="":
+          continue
+     d = c.split(" ")[0]
+     list_c.append(d)
+print(",".join(list_c))
 
 
 

+ 9 - 11
BaseDataMaintenance/maintenance/product/productUtils.py

@@ -53,7 +53,7 @@ def get_embedding_search(coll,index_name,name,grade,vector,search_params,output_
                 for k in output_fields:
                     _d[k] = _search.entity.get(k)
                 final_list.append(_d)
-            final_list = remove_repeat_item(final_list,k="standard_name")
+            final_list = remove_repeat_item(final_list,k="ots_name")
             try:
                 db.set(_md5,json.dumps(final_list))
                 db.expire(_md5,2*60)
@@ -180,15 +180,12 @@ def is_similar(source,target,_radio=None):
         return False
     #判断相似度
     similar = fuzz.ratio(source,target)
-    print("similar",similar)
     if similar>=min_ratio:
         return True
     similar_jaro = Levenshtein.jaro(source,target)
-    print("similar_jaro",similar_jaro)
     if similar_jaro*100>=min_ratio:
         return True
     similar_jarow = Levenshtein.jaro_winkler(source,target)
-    print("similar_jaro",similar_jaro)
     if similar_jarow*100>=min_ratio:
         return True
 
@@ -270,7 +267,7 @@ def is_legal_brand(ots_client,brand):
         TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,"delete")
     ])
 
-    rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+    rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
                                                                         SearchQuery(bool_query,get_total_count=True))
     if total_count>0:
         return False
@@ -284,7 +281,7 @@ def is_legal_brand(ots_client,brand):
         ])
 
     ])
-    rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
+    rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                         SearchQuery(bool_query,get_total_count=True))
     if total_count>0:
         return False
@@ -312,7 +309,7 @@ def is_legal_brand(ots_client,brand):
     bool_query = BoolQuery(must_queries=[
         TermQuery(DOCUMENT_PRODUCT_TMP_BRAND,brand)
     ])
-    rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_temp","document_product_temp_index",
+    rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
                                                                         SearchQuery(bool_query,get_total_count=True))
 
     if total_count>=5:
@@ -369,6 +366,7 @@ import requests
 session = requests.Session()
 def request_embedding(sentence,retry_times=3):
     for _ in range(retry_times):
+        sentence = get_milvus_standard_name(sentence)
         resp = session.post(embedding_url,json={"sentence":sentence})
         if resp.status_code==200:
             content = resp.content.decode("utf-8")
@@ -394,17 +392,17 @@ def clean_product_brand(product_brand):
     _search = re.search("品牌[::;;](?P<brand>.{2,8}?)([.。、;::]|规格|型号|生产厂家|厂家)",product_brand)
     if _search is not None:
         product_brand = _search.groupdict().get("brand")
-    brand = re.sub("[/\\,,、.|等]|一批|/无|品牌",'',product_brand)
+    brand = re.sub("[/\\,,、.|等]|一批|/无|品牌|^[/.]+",'',product_brand)
     return brand
 
 
-def clean_product_specs(product_specs):
+def clean_product_specs(product_specs,_PATTERN = re.compile("[^A-Za-z0-9-\\/()().]|^[\\/.-]+")):
     '''
     clean before insert
     :param product_specs:
     :return:
     '''
-    _specs = re.sub(SPECS_PATTERN,'',product_specs)
+    _specs = re.sub(_PATTERN,'',product_specs)
     if len(_specs)>0:
         return _specs
     return product_specs
@@ -445,5 +443,5 @@ if __name__ == '__main__':
     # print(re.split("[^\u4e00-\u9fff]",'128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10'))
     # import Levenshtein
     # print(Levenshtein.ratio('助听器','助行器'))
-    # print(clean_product_brand(a))
+    print(clean_product_specs("//4008SverssionV10"))
     # print(is_legal_brand(getConnect_ots(),"康复"))

+ 136 - 150
BaseDataMaintenance/maintenance/product/product_dict.py

@@ -105,7 +105,7 @@ class Product_Dict_Manager():
             must_queries=[RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3,5,True,True)],
             must_not_queries=[TermQuery(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED)])
 
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                             SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
                                                                             columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
 
@@ -114,7 +114,7 @@ class Product_Dict_Manager():
             self.queue_product_dict.put(_d)
 
         while next_token:
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                                 SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                 columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
             list_dict = getRow_ots(rows)
@@ -129,43 +129,14 @@ class Product_Dict_Manager():
     def embedding_comsumer(self):
         def handle(item,result_queue):
             try:
-                id = item.get(DOCUMENT_PRODUCT_DICT_ID)
                 name = str(item.get(DOCUMENT_PRODUCT_DICT_NAME))[:MAX_NAME_LENGTH]
-                vector = request_embedding(name)
+
                 parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
                 grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
                 Coll,_ = self.get_collection(grade)
                 standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
-                if vector is not None and Coll is not None:
-
-                    data = [[id],
-                            [name],
-                            [name],
-                            [id],
-                            [vector],
-                            [parent_id],
-                            [grade]]
-                    insert_embedding(Coll,data)
-
-
-                    if standard_alias is not None and standard_alias!="":
-                        list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
-                        for _alias in list_alias:
-                            _alias = _alias.strip()
-                            if len(_alias)==0:
-                                continue
-                            if _alias==name:
-                                continue
-                            _id = get_document_product_dict_standard_alias_id(_alias)
-                            vector = request_embedding(_alias)
-                            data = [[_id],
-                                    [_alias],
-                                    [name],
-                                    [id],
-                                    [vector],
-                                    [parent_id],
-                                    [grade]]
-                            insert_embedding(Coll,data)
+
+                if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias):
 
                     _pd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED})
                     _pd.update_row(self.ots_client)
@@ -194,12 +165,12 @@ class Product_Dict_Manager():
                     should_q
                 ])
 
-                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
                                                                                     SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
                                                                                     columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                 list_data = getRow_ots(rows)
                 while next_token:
-                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
                                                                                         SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                         columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                     list_data.extend(getRow_ots(rows))
@@ -218,12 +189,12 @@ class Product_Dict_Manager():
             for name in list_name:
                 bool_query = self.make_query(name,DOCUMENT_PRODUCT_NAME,TermQuery,len(name),5)
                 if bool_query is not None:
-                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                         SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
                                                                                         columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                     list_data = getRow_ots(rows)
                     while next_token:
-                        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                             SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                             columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                         list_data.extend(getRow_ots(rows))
@@ -245,12 +216,12 @@ class Product_Dict_Manager():
             for name in list_name:
                 bool_query = self.make_query(name,DOCUMENT_PRODUCT_NAME,TermQuery,len(name),5)
                 if bool_query is not None:
-                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                         SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
                                                                                         columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                     list_data = getRow_ots(rows)
                     while next_token:
-                        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                             SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                             columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                         list_data.extend(getRow_ots(rows))
@@ -268,12 +239,12 @@ class Product_Dict_Manager():
             else:
                 bool_query = self.make_query(name,DOCUMENT_PRODUCT_BRAND,TermQuery,len(name),5)
             if bool_query is not None:
-                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                     SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
                                                                                     columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                 list_data = getRow_ots(rows)
                 while next_token:
-                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                         SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                         columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                     list_data.extend(getRow_ots(rows))
@@ -292,12 +263,12 @@ class Product_Dict_Manager():
             else:
                 bool_query = self.make_query(name,DOCUMENT_PRODUCT_SPECS,TermQuery,len(name),5)
             if bool_query is not None:
-                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                     SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
                                                                                     columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                 list_data = getRow_ots(rows)
                 while next_token:
-                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                         SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                         columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
                     list_data.extend(getRow_ots(rows))
@@ -354,7 +325,7 @@ class Product_Dict_Manager():
                 TermQuery(term_columns,str(name)),
                 RangeQuery(DOCUMENT_PRODUCT_DICT_CREATE_TIME,None,str(create_time))
             ])
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                 SearchQuery(bool_query,get_total_count=True,limit=1),
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
             if total_count>0:
@@ -391,27 +362,17 @@ class Product_Dict_Manager():
             TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,grade),
             RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS,201,301)
         ])
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
                                                                             SearchQuery(bool_query,get_total_count=True))
         if total_count>0:
             return
 
         list_name = []
         #update milvus
-        vector = request_embedding(name)
         Coll,_ = self.get_collection(grade)
-        if vector is not None and Coll is not None:
-            id = original_id
-            data = [[id],
-                    [name],
-                    [name],
-                    [id],
-                    [vector],
-                    [parent_id],
-                    [grade]]
-            insert_embedding(Coll,data)
-            list_name.append(name)
 
+        if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias):
+            list_name.append(name)
             if standard_alias is not None and standard_alias!="":
                 list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
                 for _alias in list_alias:
@@ -420,18 +381,9 @@ class Product_Dict_Manager():
                         continue
                     if _alias==name:
                         continue
-                    _id = get_document_product_dict_standard_alias_id(_alias)
-                    vector = request_embedding(_alias)
-                    data = [[_id],
-                            [_alias],
-                            [name],
-                            [id],
-                            [vector],
-                            [parent_id],
-                            [grade]]
-                    insert_embedding(Coll,data)
                     list_name.append(_alias)
-            time.sleep(3)
+            time.sleep(1)
+
 
         #judge whether there exists records before this record created,if not process the history data
         if not self.exists_records(name,grade,create_time):
@@ -468,70 +420,27 @@ class Product_Dict_Manager():
         else:
             new_id = original_id
 
-        # update the milvus
-        if not (len(new_name_set)==len(old_name_set) and len(new_name_set)==len(new_name_set&old_name_set)):
-            Coll,_ = self.get_collection(grade)
-            o_id = original_id
-            expr = " ots_id in ['%s']"%o_id
-            Coll.delete(expr)
 
-            _alias = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS)
-            if _alias is not None and _alias!="":
-                list_alias = _alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
-                for _alias in list_alias:
-                    _alias = _alias.strip()
-                    if len(_alias)==0:
-                        continue
-                    if _alias==name:
-                        continue
-                    _id = get_document_product_dict_standard_alias_id(_alias)
-                    expr = " ots_id in ['%s']"%o_id
-                    Coll.delete(expr)
-
-            list_name = []
-            vector = request_embedding(name)
-            if vector is not None and Coll is not None:
-                id = new_id
-                data = [[id],
-                        [name],
-                        [name],
-                        [id],
-                        [vector],
-                        [parent_id],
-                        [grade]]
-                insert_embedding(Coll,data)
-                list_name.append(name)
-
-                if standard_alias is not None and standard_alias!="":
-                    list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
-                    for _alias in list_alias:
-                        _alias = _alias.strip()
-                        if len(_alias)==0:
-                            continue
-                        if _alias==name:
-                            continue
-                        _id = get_document_product_dict_standard_alias_id(_alias)
-                        vector = request_embedding(_alias)
-                        data = [[_id],
-                                [_alias],
-                                [name],
-                                [id],
-                                [vector],
-                                [parent_id],
-                                [grade]]
-                        insert_embedding(Coll,data)
-                        list_name.append(_alias)
-            time.sleep(3)
+        Coll,_ = self.get_collection(grade)
 
-        # process history
         delete_names = list(old_name_set-new_name_set)
+        insert_names = list(new_name_set-old_name_set)
+        # update the milvus
+        if len(delete_names)>0:
+            for _name in delete_names:
+                delete_record_from_milvus(Coll,_name,"")
+            time.sleep(1)
+        if len(insert_names)>0:
+            insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias)
+
+        # process history
         if len(delete_names)>0:
             self.process_history([old_name],grade,"update")
-        insert_names = list(new_name_set-old_name_set)
         if len(insert_names)>0:
             self.process_history(insert_names,grade,"insert")
 
 
+
         # update document_product_dict
         _d = {DOCUMENT_PRODUCT_DICT_ID:new_id,
               DOCUMENT_PRODUCT_DICT_NAME:name,
@@ -568,13 +477,13 @@ class Product_Dict_Manager():
         bool_query = BoolQuery(must_queries=[
             TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,parent_id)
         ])
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                             SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
 
         list_data = getRow_ots(rows)
         while next_token:
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                                 SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
 
@@ -600,25 +509,23 @@ class Product_Dict_Manager():
             TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
         ])
 
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                             SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
         if total_count==0:
             return
         list_data = getRow_ots(rows)
         while next_token:
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                                 SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
             list_data.extend(getRow_ots(rows))
 
         #delete milvus records
         Coll,_ = self.get_collection(grade)
-        for _data in list_data:
-            o_id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
-            expr = " ots_id in ['%s']"%o_id
-            Coll.delete(expr)
-        time.sleep(3)
+
+        delete_record_from_milvus(Coll,name,standard_alias)
+        time.sleep(1)
 
         #process_history data
         self.process_history([name],grade,"delete")
@@ -640,13 +547,13 @@ class Product_Dict_Manager():
         bool_query = BoolQuery(must_queries=[
             TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,id)
         ])
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                             SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
         list_data = getRow_ots(rows)
         while next_token:
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                                 SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
 
@@ -665,7 +572,7 @@ class Product_Dict_Manager():
             RangeQuery("status",1,50,True,True)
         ])
 
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
                                                                             SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)]),limit=100,get_total_count=True),
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
         list_data = getRow_ots(rows)
@@ -673,7 +580,7 @@ class Product_Dict_Manager():
             self.queue_product_interface.put(_data)
 
         while next_token:
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
                                                                                 SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
             list_data = getRow_ots(rows)
@@ -749,7 +656,7 @@ def search_similar():
         must_queries=[RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,5,5,True,True)]
     )
 
-    rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
+    rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                         SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
                                                                         columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
 
@@ -758,7 +665,7 @@ def search_similar():
         list_data.append(_d)
 
     while next_token:
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
+        rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                             SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                             columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
         list_dict = getRow_ots(rows)
@@ -767,7 +674,14 @@ def search_similar():
         if len(list_data)>=100000:
             break
     log("product_dict embedding total_count:%d"%total_count)
+    set_key = set()
     for _d in list_data:
+        name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
+        grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
+        _key = "%s-%d"%(name,grade)
+        if _key in set_key:
+            continue
+        set_key.add(set_key)
         task_queue.put(_d)
 
     result_queue = Queue()
@@ -775,21 +689,27 @@ def search_similar():
     def handle(item,result_queue):
         id = item.get(DOCUMENT_PRODUCT_DICT_ID)
         name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
-        vector = pdm.request_embedding(name)
+        vector = get_embedding_request(name)
         parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
         grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
         Coll,Coll_name = pdm.get_collection(grade)
-        output_fields = ['ots_id','ots_name',"ots_parent_id"]
+        output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name"]
         if vector is not None and Coll is not None:
-            search_list = search_embedding(Coll,embedding_index_name,[vector],pdm.search_params,output_fields,limit=10)
+            search_list = get_embedding_search(Coll,embedding_index_name,name,grade,[vector],pdm.search_params,output_fields,limit=10)
             for _item in search_list:
-                ots_id = _item.id
-                ots_name = _item.entity.get("ots_name")
-                ots_parent_id = _item.entity.get("ots_parent_id")
+                ots_id = _item.get("id")
+                ots_name = _item.get("ots_name")
+                ots_parent_id = _item.get("ots_parent_id")
+                standard_name = _item.get("standard_name")
                 if name!=ots_name:
-                    if is_similar(name,ots_name):
-                        _d = {"source_id":id,"source_name":name,"grade":grade,"target_id":ots_id,"target_name":ots_name,"parent_id":parent_id,"target_parent_id":ots_parent_id}
-                        result_queue.put(_d)
+                    if grade==4:
+                        if is_similar(name,ots_name) or check_brand(name,ots_name):
+                            _d = {"source_id":id,"source_name":name,"grade":grade,"target_id":ots_id,"target_name":ots_name,"parent_id":parent_id,"target_parent_id":ots_parent_id,"target_standard_name":standard_name}
+                            result_queue.put(_d)
+                    elif grade==5:
+                        if is_similar(name,ots_name) and check_specs(name,ots_name):
+                            _d = {"source_id":id,"source_name":name,"grade":grade,"target_id":ots_id,"target_name":ots_name,"parent_id":parent_id,"target_parent_id":ots_parent_id,"target_standard_name":standard_name}
+                            result_queue.put(_d)
 
 
     mt = MultiThreadHandler(task_queue,handle,result_queue,5,1)
@@ -817,6 +737,71 @@ def search_similar():
     df.to_excel("search_similar1.xlsx",columns=df_columns)
 
 
+def insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias):
+
+    n_name = get_milvus_standard_name(name)
+    name_id = get_milvus_product_dict_id(n_name)
+
+    vector = request_embedding(n_name)
+
+    log("insert name %s grade %d"%(name,grade))
+    if vector is not None and Coll is not None:
+
+        data = [[name_id],
+                [name],
+                [name],
+                [name_id],
+                [vector],
+                [parent_id],
+                [grade]]
+        insert_embedding(Coll,data)
+
+        if standard_alias is not None and standard_alias!="":
+            list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
+            for _alias in list_alias:
+                _alias = _alias.strip()
+                if len(_alias)==0:
+                    continue
+                if _alias==name:
+                    continue
+                _id = get_document_product_dict_standard_alias_id(_alias)
+                n_alias = get_milvus_standard_name(_alias)
+                vector = request_embedding(n_alias)
+                data = [[_id],
+                        [_alias],
+                        [name],
+                        [name_id],
+                        [vector],
+                        [parent_id],
+                        [grade]]
+                insert_embedding(Coll,data)
+        return True
+
+def delete_record_from_milvus(Coll,name,standard_alias):
+
+    n_name = get_milvus_standard_name(name)
+    name_id = get_milvus_product_dict_id(n_name)
+
+    log("delete name %s grade %s"%(str(name),str(standard_alias)))
+
+    expr = " ots_id in ['%s']"%name_id
+    Coll.delete(expr)
+
+    if standard_alias is not None and standard_alias!="":
+        list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
+        for _alias in list_alias:
+            _alias = _alias.strip()
+            if len(_alias)==0:
+                continue
+            if _alias==name:
+                continue
+            _id = get_document_product_dict_standard_alias_id(_alias)
+
+            expr = " ots_id in ['%s']"%_id
+            Coll.delete(expr)
+
+
+
 def dict_interface_delete(name,grade,ots_client = getConnect_ots()):
     from uuid import uuid4
     _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
@@ -831,15 +816,16 @@ def dict_interface_delete(name,grade,ots_client = getConnect_ots()):
 
 def interface_deletes():
     a = '''
-    MFUSONE
+    眼科
     '''
+    grade = 4
     ots_client=getConnect_ots()
     for s in re.split("[\n\s,.,。、]",a):
         s = s.strip()
         if s=="":
             continue
         print(s)
-        dict_interface_delete(s,4,ots_client)
+        dict_interface_delete(s,grade,ots_client)
 
 
 if __name__ == '__main__':

+ 137 - 69
BaseDataMaintenance/maintenance/product/products.py

@@ -63,7 +63,7 @@ class Product_Manager(Product_Dict_Manager):
             return
         bool_query = BoolQuery(must_queries=[RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,1,51)])
 
-        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
                                                                             SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
                                                                             columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
         list_data = getRow_ots(rows)
@@ -76,7 +76,7 @@ class Product_Manager(Product_Dict_Manager):
             list_id.append(_id)
             self.process_queue.put(_d)
         while next_token:
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
                                                                                 SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
             list_data = getRow_ots(rows)
@@ -160,7 +160,7 @@ class Product_Manager(Product_Dict_Manager):
             if name_vector is not None:
                 Coll,_ = self.get_collection(NAME_GRADE)
 
-                search_list = get_embedding_search(Coll,embedding_index_name,name,NAME_GRADE,[name_vector],self.search_params,output_fields,limit=60)
+                search_list = get_embedding_search(Coll,embedding_index_name,name,NAME_GRADE,[name_vector],self.search_params,output_fields,limit=20)
 
                 for _search in search_list:
                     ots_id = _search.get("standard_name_id")
@@ -184,7 +184,7 @@ class Product_Manager(Product_Dict_Manager):
                 name_vector = get_embedding_request(name)
                 if name_vector is not None:
                     Coll,_ = self.get_collection(NAME_GRADE)
-                    search_list = get_embedding_search(Coll,embedding_index_name,name,NAME_GRADE,[name_vector],self.search_params,output_fields,limit=20)
+                    search_list = get_embedding_search(Coll,embedding_index_name,name,NAME_GRADE,[name_vector],self.search_params,output_fields,limit=10)
 
                     for _search in search_list:
                         ots_id = _search.get("standard_name_id")
@@ -220,7 +220,7 @@ class Product_Manager(Product_Dict_Manager):
                     brand_vector = get_embedding_request(brand)
                     if brand_vector is not None:
                         Coll,_ = self.get_collection(BRAND_GRADE)
-                        search_list = get_embedding_search(Coll,embedding_index_name,brand,BRAND_GRADE,[brand_vector],self.search_params,output_fields,limit=60)
+                        search_list = get_embedding_search(Coll,embedding_index_name,brand,BRAND_GRADE,[brand_vector],self.search_params,output_fields,limit=20)
 
                         # log("search brand %s"%(brand))
                         for _search in search_list:
@@ -381,9 +381,9 @@ class Product_Manager(Product_Dict_Manager):
                 for s in re.split("[\u4e00-\u9fff]",specs):
                     if s!="" and len(s)>4:
                         list_specs.append(s)
-                similar_flag = None
                 _index = 0
                 break_flag = False
+                list_similar_specs = []
                 for c_specs in list_specs:
                     if break_flag:
                         break
@@ -392,7 +392,7 @@ class Product_Manager(Product_Dict_Manager):
 
                     if specs_vector is not None:
                         Coll,_ = self.get_collection(SPECS_GRADE)
-                        search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=60)
+                        search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=20)
 
                         for _search in search_list:
 
@@ -437,48 +437,52 @@ class Product_Manager(Product_Dict_Manager):
                                             # if _flag:
                                             #     if _dpd.updateAlias(specs):
                                             #         _dpd.update_row(self.ots_client)
+                                    break_flag = True
                                     break
-                                else:
-                                    if _index == 1:
-                                        similar_flag = True
-
+                            else:
+                                list_similar_specs.append(specs)
                 # add new specs?
-                debug("specs not similar")
-                if is_legal_specs(specs) and len(specs)<MAX_NAME_LENGTH and len(specs)>=5:
-                    debug("is_legal_specs")
-                    new_specs = clean_product_specs(specs)
-                    # insert into document_product_dict a new record
-                    # to update the document_product_dict which is builded for search
-                    # add new specs
-                    if brand_ots_id is not None and name_ots_id is not None:
-                        _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
-
-                        # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
-                        #       DOCUMENT_PRODUCT_DICT_NAME:new_specs,
-                        #       DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
-                        #       DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
-                        #       DOCUMENT_PRODUCT_DICT_STATUS:1,
-                        #       DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
-                        #       DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                        #       DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                        #       }
-                        # _dpd = Document_product_dict(_d)
-                        # _dpd.update_row(self.ots_client)
-
-                        log("adding new specs %s"%(new_specs))
-                        # user interface to add
-                        _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                              DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
-                              }
-                        _dpdi = Document_product_dict_interface(_d)
-                        _dpdi.update_row(self.ots_client)
+                if new_specs is not None and new_specs!="":
+                    pass
+                else:
+                    debug("specs not similar")
+                    for specs in list_similar_specs:
+                        if is_legal_specs(specs) and len(specs)<MAX_NAME_LENGTH and len(specs)>=5:
+                            debug("is_legal_specs")
+                            new_specs = clean_product_specs(specs)
+                            # insert into document_product_dict a new record
+                            # to update the document_product_dict which is builded for search
+                            # add new specs
+                            if brand_ots_id is not None and name_ots_id is not None:
+                                _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
+
+                                # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
+                                #       DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                #       DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
+                                #       DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                #       DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                #       DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                #       DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                #       DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                #       }
+                                # _dpd = Document_product_dict(_d)
+                                # _dpd.update_row(self.ots_client)
+
+                                log("adding new specs %s"%(new_specs))
+                                # user interface to add
+                                _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                      DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
+                                      }
+                                _dpdi = Document_product_dict_interface(_d)
+                                _dpdi.update_row(self.ots_client)
+                            break
         if specs_ots_id is None:
             _find = False
             for specs in list_candidates:
@@ -503,14 +507,13 @@ class Product_Manager(Product_Dict_Manager):
 
                     if specs_vector is not None:
                         Coll,_ = self.get_collection(SPECS_GRADE)
-                        search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=20)
+                        search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=10)
 
                         for _search in search_list:
                             if _find:
                                 break
 
                             ots_id = _search.get("standard_name_id")
-
                             ots_name = _search.get("ots_name")
                             standard_name = _search.get("standard_name")
                             ots_parent_id = _search.get("ots_parent_id")
@@ -567,11 +570,12 @@ class Product_Manager(Product_Dict_Manager):
                 if unit_price>0:
                     new_quantity = total_price/unit_price
                     if new_quantity!=quantity:
-                        if new_quantity==total_price//unit_price:
-                            quantity = int(new_quantity)
-                            _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
-                        else:
-                            is_legal_data = False
+                        # if new_quantity==total_price//unit_price:
+                        #     quantity = int(new_quantity)
+                        #     _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
+                        # else:
+                        #     is_legal_data = False
+                        is_legal_data = False
                 elif quantity>0:
                     unit_price = total_price/quantity
                     _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
@@ -744,7 +748,7 @@ class Product_Manager(Product_Dict_Manager):
                                                  TermQuery("quantity",quantity)
                                                  ])
 
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                 SearchQuery(bool_query,limit=1),
                                                                                 columns_to_get=ColumnsToGet(["name",'brand','specs'],return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
@@ -760,7 +764,7 @@ class Product_Manager(Product_Dict_Manager):
                                                  TermQuery(DOCUMENT_PRODUCT_SUPPLIER,supplier),
                                                  ])
 
-            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
                                                                                 SearchQuery(bool_query,limit=50),
                                                                                 columns_to_get=ColumnsToGet(['name','brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
             list_data = getRow_ots(rows)
@@ -858,11 +862,19 @@ def fix_product_data():
     '''
     table_name = "document_product_temp"
     table_index = "document_product_temp_index"
-    columns = [DOCUMENT_PRODUCT_TMP_WIN_BID_PRICE]
+
+    # table_name = "document_product"
+    # table_index = "document_product_index"
+
+    columns = [DOCUMENT_PRODUCT_TMP_NEW_ID]
     ots_client = getConnect_ots()
     bool_query = BoolQuery(must_queries=[
-        RangeQuery("status",501),
+        # RangeQuery("status",501),
         # TermQuery("docid",246032980)
+
+        RangeQuery("status",201,301)
+        # WildcardQuery(DOCUMENT_PRODUCT_ORIGINAL_SPECS,"MFUSOne")
+        # TermQuery(DOCUMENT_PRODUCT_SPECS,"MFUSOne")
     ])
 
     rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
@@ -886,6 +898,8 @@ def fix_product_data():
     def fix_missing_data(item,result_queue):
 
         original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
+
+        print("original_id",original_id)
         _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
         dpt = Document_product_tmp(_d)
         dpt.fix_columns(ots_client,["name","brand","specs"],True)
@@ -915,13 +929,15 @@ def fix_product_data():
 
     def deleteAndReprocess(item,result_queue):
 
-        original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
+        original_id = item.get(DOCUMENT_PRODUCT_TMP_ID)
+        print("original_id",original_id,"id",item.get(DOCUMENT_PRODUCT_ID))
         # delete data and rerun
         _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
         dpt = Document_product_tmp(_d)
         dpt.update_row(ots_client)
 
-        _d = {DOCUMENT_PRODUCT_ID:item.get(DOCUMENT_PRODUCT_ID)}
+        new_id = item.get(DOCUMENT_PRODUCT_TMP_NEW_ID)
+        _d = {DOCUMENT_PRODUCT_ID:new_id}
         dp = Document_product(_d)
         dp.delete_row(ots_client)
 
@@ -932,7 +948,7 @@ def fix_product_data():
             dpt.setValue(DOCUMENT_PRODUCT_TMP_STATUS,1,True)
             dpt.update_row(ots_client)
 
-    mt = MultiThreadHandler(task_queue,handle,None,30,1)
+    mt = MultiThreadHandler(task_queue,deleteAndReprocess,None,30,1)
     mt.run()
 
 def test_check_brand():
@@ -987,8 +1003,8 @@ def test_check_brand():
             f.write(b+"\n")
 
 def test_match():
-    a = "MFUSONE"
-    vector = request_embedding(a)
+    a = "-SL-10XL"
+    vector = request_embedding(get_milvus_standard_name(a))
     pm = Product_Manager()
     Coll,_ = pm.get_collection(SPECS_GRADE)
     output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]
@@ -1004,18 +1020,22 @@ def rebuild_milvus():
         RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3)
     ])
     ots_client = getConnect_ots()
-    rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
-                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("grade")]),limit=100,get_total_count=True),
+    rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("name")]),limit=100,get_total_count=True),
                                                                    ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
 
     list_data = getRow_ots(rows)
     while next_token:
-        rows,next_token,total_count,is_all_succeed = ots_client.search("document_product_dict","document_product_dict_index",
+        rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
                                                                        SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                        ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
 
         list_data.extend(getRow_ots(rows))
         print("%d/%d"%(len(list_data),total_count))
+
+        # if len(list_data)>1000:
+        #     break
+
     set_name_grade = set()
     task_queue = PQueue()
     for _data in list_data:
@@ -1030,15 +1050,25 @@ def rebuild_milvus():
     def insert_into_milvus(item,result_queue):
 
         name = item.get(DOCUMENT_PRODUCT_DICT_NAME,"")
-        n_name = get_milvus_standard_name(name)
+        grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
+
+        if grade==SPECS_GRADE:
+            name = clean_product_specs(name)
+            if len(name)<2:
+                return
 
+        n_name = get_milvus_standard_name(name)
         name_id = get_milvus_product_dict_id(n_name)
 
         vector = request_embedding(n_name)
         parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID,"")
-        grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
+
         Coll,_ = pdm.get_collection(grade)
         standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
+
+
+
+        log("insert name %s grade %d"%(name,grade))
         if vector is not None and Coll is not None:
 
             data = [[name_id],
@@ -1083,6 +1113,43 @@ def rebuild_milvus():
     for p in list_p:
         p.join()
 
+def move_document_product():
+    bool_query = BoolQuery(must_queries=[
+        ExistsQuery(DOCUMENT_PRODUCT_NAME)
+    ])
+    ots_client = getConnect_ots()
+    Document_product_table_name = "document_product"
+    rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("name")]),limit=100,get_total_count=True),
+                                                                   ColumnsToGet(return_type=ColumnReturnType.ALL))
+    list_data = getRow_ots(rows)
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                       ColumnsToGet(return_type=ColumnReturnType.ALL))
+        list_data.extend(getRow_ots(rows))
+        print("%d/%d"%(len(list_data),total_count))
+        # if len(list_data)>=1000:
+        #     break
+
+    task_queue = Queue()
+
+    for _data in list_data:
+        task_queue.put(_data)
+
+    def _handle(item,result_queue):
+
+        D1 = Document_product(item)
+        D1.update_row(ots_client)
+
+        D1.table_name = Document_product_table_name
+        D1.delete_row(ots_client)
+
+    mt = MultiThreadHandler(task_queue,_handle,None,30)
+    mt.run()
+
+
+
 
 def test():
     # pm = Product_Manager()
@@ -1090,8 +1157,9 @@ def test():
     # fix_product_data()
     # test_check_brand()
     # test_match()
-    rebuild_milvus()
+    # rebuild_milvus()
 
+    move_document_product()
 
 if __name__ == '__main__':
 

+ 2 - 4
BaseDataMaintenance/model/ots/document_product.py

@@ -49,7 +49,7 @@ DOCUMENT_PRODUCT_ORIGINAL_SPECS = "original_specs"
 
 DOCUMENT_PRODUCT_BID_FILEMD5S = "bid_filemd5s"
 
-
+Document_product_table_name = "document_product2"
 
 class Document_product(BaseModel):
 
@@ -66,9 +66,7 @@ class Document_product(BaseModel):
                     v = _v
             self.setValue(k,v,True)
 
-
-
-        self.table_name = 'document_product'
+        self.table_name = Document_product_table_name
 
     def getPrimary_keys(self):
         return ['id']

+ 61 - 2
BaseDataMaintenance/model/ots/document_product_dict.py

@@ -17,13 +17,16 @@ DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR = "|"
 
 
 MAX_NAME_LENGTH = 300
+
+Document_product_dict_table_name = "document_product_dict2"
+
 class Document_product_dict(BaseModel):
 
     def __init__(self,_dict):
         BaseModel.__init__(self)
         for k,v in _dict.items():
             self.setValue(k,v,True)
-        self.table_name = "document_product_dict"
+        self.table_name = Document_product_dict_table_name
 
     def getPrimary_keys(self):
         return ["id"]
@@ -52,4 +55,60 @@ def get_milvus_standard_name(name):
     return "%s"%(str(name)[:MAX_NAME_LENGTH].lower())
 
 def get_milvus_product_dict_id(name):
-    return getMD5(get_milvus_standard_name(name))
+    return getMD5(get_milvus_standard_name(name))
+
+
+
+from BaseDataMaintenance.model.ots.document_product import *
+from BaseDataMaintenance.dataSource.source import getConnect_ots
+from tablestore import *
+from BaseDataMaintenance.common.Utils import *
+from BaseDataMaintenance.common.multiThread import MultiThreadHandler
+from queue import Queue
+def move_document_product_dict():
+
+    bool_query = BoolQuery(must_queries=[
+        ExistsQuery(DOCUMENT_PRODUCT_NAME)
+    ])
+    ots_client = getConnect_ots()
+    Document_product_table_name = "document_product_dict"
+    rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
+                                                                   SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("grade")]),limit=100,get_total_count=True),
+                                                                   ColumnsToGet(return_type=ColumnReturnType.ALL))
+    list_data = getRow_ots(rows)
+    while next_token:
+        rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
+                                                                       SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                       ColumnsToGet(return_type=ColumnReturnType.ALL))
+        list_data.extend(getRow_ots(rows))
+        print("%d/%d"%(len(list_data),total_count))
+        # if len(list_data)>=2000:
+        #     break
+
+    task_queue = Queue()
+
+    dict_id_dict = {}
+
+    for _data in list_data:
+        task_queue.put(_data)
+        id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
+        dict_id_dict[id] = _data
+
+    def _handle(item,result_queue):
+
+        status = item.get(DOCUMENT_PRODUCT_DICT_STATUS)
+
+        D1 = Document_product_dict(item)
+        if status==1:
+
+            D1.update_row(ots_client)
+
+        D1.table_name = Document_product_table_name
+        D1.delete_row(ots_client)
+
+    mt = MultiThreadHandler(task_queue,_handle,None,30)
+    mt.run()
+
+if __name__ == '__main__':
+    # print(get_milvus_product_dict_id("-sl-10xls"))
+    move_document_product_dict()

+ 3 - 1
BaseDataMaintenance/model/ots/document_product_dict_interface.py

@@ -17,13 +17,15 @@ DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION = "action" #insert delete update
 
 MAX_NAME_LENGTH = 300
 
+Document_product_dict_interface_table_name = "document_product_dict_interface"
+
 class Document_product_dict_interface(BaseModel):
 
     def __init__(self,_dict):
         BaseModel.__init__(self)
         for k,v in _dict.items():
             self.setValue(k,v,True)
-        self.table_name = "document_product_dict_interface"
+        self.table_name = Document_product_dict_interface_table_name
 
     def getPrimary_keys(self):
         return ["id"]

+ 3 - 1
BaseDataMaintenance/model/ots/document_product_tmp.py

@@ -40,13 +40,15 @@ DOCUMENT_PRODUCT_TMP_UPDATE_TIME = 'update_time'
 DOCUMENT_PRODUCT_TMP_NEW_ID = "new_id"
 
 
+Document_product_tmp_table_name = "document_product_temp"
+
 class Document_product_tmp(BaseModel):
 
     def __init__(self,dict):
         BaseModel.__init__(self)
         for k,v in dict.items():
             self.setValue(k,v,True)
-        self.table_name = 'document_product_temp'
+        self.table_name = Document_product_tmp_table_name
 
     def getPrimary_keys(self):
         return ['id']