Prechádzať zdrojové kódy

产品数据处理流增加别名映射支持

luojiehua 1 rok pred
rodič
commit
d598e2aacf

+ 10 - 0
BaseDataMaintenance/maintenance/product/1.py

@@ -16,6 +16,16 @@ print(Levenshtein.jaro("1abdd","1abbd"))
 print((4/5+4/5+4/4)/3)
 print((5/5+5/5+3/5)/3)
 
+from sklearn.cluster import KMeans
+
+
+km = KMeans(n_clusters=2)
+x = [[1,1,22,2,2,2,2],
+     [3,1,22,2,2,2,2],
+     [1.5,1,22,2,2,2,2]]
+km.fit(x)
+print(km.predict(x))
+
 
 
 

+ 4 - 2
BaseDataMaintenance/maintenance/product/productUtils.py

@@ -87,9 +87,11 @@ def check_specs(source,target):
 
 import json
 
-def request_embedding(self,sentence,retry_times=3):
+import requests
+session = requests.Session()
+def request_embedding(sentence,retry_times=3):
     for _ in range(retry_times):
-        resp = self.session.post(embedding_url,json={"sentence":sentence})
+        resp = session.post(embedding_url,json={"sentence":sentence})
         if resp.status_code==200:
             content = resp.content.decode("utf-8")
             _d = json.loads(content)

+ 31 - 4
BaseDataMaintenance/maintenance/product/product_dict.py

@@ -25,6 +25,9 @@ class Product_Dict_Manager():
 
     def __init__(self):
 
+        self.collection_name_name = COLLECTION_NAME_NAME
+        self.collection_name_brand = COLLECTION_NAME_BRAND
+        self.collection_name_specs = COLLECTION_NAME_SPECS
         self.search_params = {"metric_type":"IP",
                               "params":{"nprobe":10}}
 
@@ -34,9 +37,7 @@ class Product_Dict_Manager():
 
         self.session = requests.Session()
 
-        self.collection_name_name = COLLECTION_NAME_NAME
-        self.collection_name_brand = COLLECTION_NAME_BRAND
-        self.collection_name_specs = COLLECTION_NAME_SPECS
+
         self.Coll_name = getCollection(self.collection_name_name)
         self.Coll_brand = getCollection(self.collection_name_brand)
         self.Coll_specs = getCollection(self.collection_name_specs)
@@ -48,6 +49,8 @@ class Product_Dict_Manager():
             # FieldSchema(name="pk_id",dtype=DataType.INT64,is_primary=True,auto_id=True), # pk is the same as ots
             FieldSchema(name="ots_id",dtype=DataType.VARCHAR,max_length=32,is_primary=True),
             FieldSchema(name="ots_name",dtype=DataType.VARCHAR,max_length=MAX_NAME_LENGTH),
+            FieldSchema(name="standard_name",dtype=DataType.VARCHAR,max_length=MAX_NAME_LENGTH),
+            FieldSchema(name="standard_name_id",dtype=DataType.VARCHAR,max_length=32),
             FieldSchema(name="embedding",dtype=DataType.FLOAT_VECTOR,dim=1024),
             FieldSchema(name="ots_parent_id",dtype=DataType.VARCHAR,max_length=32),
             FieldSchema(name="ots_grade",dtype=DataType.INT64)
@@ -70,7 +73,7 @@ class Product_Dict_Manager():
 
 
 
-    def embedding_producer(self,columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE]):
+    def embedding_producer(self,columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS]):
 
         bool_query = BoolQuery(
             must_queries=[RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3,5,True,True)],
@@ -120,17 +123,41 @@ class Product_Dict_Manager():
                 parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
                 grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
                 Coll,_ = self.get_collection(grade)
+                standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
                 if vector is not None and Coll is not None:
 
                     data = [[id],
                             [name],
+                            [name],
+                            [id],
                             [vector],
                             [parent_id],
                             [grade]]
                     insert_embedding(Coll,data)
+
+
+                    if standard_alias is not None and standard_alias!="":
+                        list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
+                        for _alias in list_alias:
+                            _alias = _alias.strip()
+                            if len(_alias)==0:
+                                continue
+                            if _alias==name:
+                                continue
+                            _id = get_document_product_dict_standard_alias_id(_alias)
+                            data = [[_id],
+                                    [_alias],
+                                    [name],
+                                    [id],
+                                    [vector],
+                                    [parent_id],
+                                    [grade]]
+                            insert_embedding(Coll,data)
+
                     _pd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:2})
                     _pd.update_row(self.ots_client)
 
+
             except Exception as e:
                 traceback.print_exc()
 

+ 21 - 14
BaseDataMaintenance/maintenance/product/products.py

@@ -19,6 +19,11 @@ from random import randint
 from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
 from apscheduler.schedulers.blocking import BlockingScheduler
 
+import logging
+
+root = logging.getLogger()
+root.setLevel(logging.INFO)
+
 class Product_Manager(Product_Dict_Manager):
 
     def __init__(self):
@@ -41,7 +46,7 @@ class Product_Manager(Product_Dict_Manager):
             unit_price = ""
         else:
             unit_price = "%.2f"%float(unit_price)
-        product_id = getMD5(str(docid))+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity)
+        product_id = getMD5(str(docid)+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity))
         return product_id
 
     def producer(self,process_count=3000):
@@ -63,7 +68,7 @@ class Product_Manager(Product_Dict_Manager):
             list_id.append(_id)
             self.process_queue.put(_d)
         while next_token:
-            rows,next_token,total_count_is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
                                                                                 SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
                                                                                 columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
             list_data = getRow_ots(rows)
@@ -99,7 +104,7 @@ class Product_Manager(Product_Dict_Manager):
         self.standardize(item)
 
 
-    def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id"]):
+    def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]):
         '''
         Standardizes the product data
         通过匹配标准参数表进行标准化,匹配是非精确匹配,校验规则是?
@@ -121,9 +126,9 @@ class Product_Manager(Product_Dict_Manager):
 
         document_product_tmp = Document_product_tmp(tmp_dict)
 
-        name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME)
-        brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND)
-        specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS)
+        name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,"")
+        brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,"")
+        specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,"")
 
         new_name = ""
         new_brand = ""
@@ -139,8 +144,8 @@ class Product_Manager(Product_Dict_Manager):
                 search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
 
                 for _search in search_list:
-                    ots_id = _search.entity.get("ots_id")
-                    ots_name = _search.entity.get("ots_name")
+                    ots_id = _search.entity.get("standard_name_id")
+                    ots_name = _search.entity.get("standard_name")
                     ots_parent_id = _search.entity.get("ots_parent_id")
 
                     if is_similar(name,ots_name):
@@ -161,8 +166,9 @@ class Product_Manager(Product_Dict_Manager):
                     search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
 
                     for _search in search_list:
-                        ots_id = _search.entity.get("ots_id")
-                        ots_name = _search.entity.get("ots_name")
+
+                        ots_id = _search.entity.get("standard_name_id")
+                        ots_name = _search.entity.get("standard_name")
                         ots_parent_id = _search.entity.get("ots_parent_id")
 
                         if is_similar(brand,ots_name):
@@ -175,7 +181,7 @@ class Product_Manager(Product_Dict_Manager):
 
                                 _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
                                             DOCUMENT_PRODUCT_DICT_NAME:new_brand,
-                                            DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(brand,new_brand),
+                                            DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(str(brand).lower(),str(new_brand).lower()),
                                             DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
                                             DOCUMENT_PRODUCT_DICT_STATUS:1,
                                             DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
@@ -204,8 +210,9 @@ class Product_Manager(Product_Dict_Manager):
                     search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
 
                     for _search in search_list:
-                        ots_id = _search.entity.get("ots_id")
-                        ots_name = _search.entity.get("ots_name")
+
+                        ots_id = _search.entity.get("standard_name_id")
+                        ots_name = _search.entity.get("standard_name")
                         ots_parent_id = _search.entity.get("ots_parent_id")
 
                         if is_similar(specs,ots_name):
@@ -219,7 +226,7 @@ class Product_Manager(Product_Dict_Manager):
 
                                     _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
                                                 DOCUMENT_PRODUCT_DICT_NAME:new_specs,
-                                                DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&"%(specs,new_specs),
+                                                DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(str(specs).lower(),str(new_specs).lower()),
                                                 DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
                                                 DOCUMENT_PRODUCT_DICT_STATUS:1,
                                                 DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,

+ 22 - 0
BaseDataMaintenance/maxcompute/documentAnalysis.py

@@ -661,6 +661,28 @@ class f_table_cell_process():
                 list_table_cell.append(_line)
         return json.dumps(list_table_cell,ensure_ascii=False)
 
+@annotate('string -> string,string,string')
+class f_extract_products(BaseUDTF):
+    '''
+    生成关联的词组信息,以供后续计算
+    '''
+    def __init__(self):
+        import logging
+        import json
+        import time,re
+        global json,logging,time,re
+        self.time_pattern = "\d{4}\-\d{2}\-\d{2}.*"
+        logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+
+    def process(self, products):
+        if products is not None and products!='':
+            list_products = json.loads(products)
+            for _p in list_products:
+                product = _p.get("product","")
+                brand = _p.get("brand","")
+                specs = _p.get("specs","")
+                self.forward(product,brand,specs)
+
 
 @annotate('string -> string,string,string,string')
 class f_words_contact(BaseUDTF):

+ 2 - 2
BaseDataMaintenance/model/ots/document_product.py

@@ -45,12 +45,12 @@ DOCUMENT_PRODUCT_DUMP_ID = "dump_id"
 
 class Document_product(BaseModel):
 
-    def initialize(self,dict):
+    def __init__(self,dict):
         BaseModel.__init__(self)
         for k,v in dict.items():
             self.setValue(k,v,True)
 
         self.table_name = 'document_product'
 
-    def getPrimaryKey_turple(self):
+    def getPrimary_keys(self):
         return ['id']

+ 8 - 1
BaseDataMaintenance/model/ots/document_product_dict.py

@@ -11,6 +11,10 @@ DOCUMENT_PRODUCT_DICT_CREATE_TIME = "create_time"
 DOCUMENT_PRODUCT_DICT_UPDATE_TIME = "update_time"
 DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED = "is_synchonized"
 
+DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS = "standard_alias"
+
+DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR = "|"
+
 MAX_NAME_LENGTH = 300
 
 class Document_product_dict(BaseModel):
@@ -39,4 +43,7 @@ class Document_product_dict(BaseModel):
 
 from BaseDataMaintenance.common.documentFingerprint import getMD5
 def get_document_product_dict_id(parent_md5,name):
-    return getMD5(parent_md5+"&&%s"%name)
+    return getMD5(parent_md5+"&&%s"%name)
+
+def get_document_product_dict_standard_alias_id(name):
+    return getMD5("alias%s"%name)

+ 2 - 2
BaseDataMaintenance/model/ots/document_product_tmp.py

@@ -41,11 +41,11 @@ DOCUMENT_PRODUCT_TMP_NEW_ID = "new_id"
 
 class Document_product_tmp(BaseModel):
 
-    def initialize(self,dict):
+    def __init__(self,dict):
         BaseModel.__init__(self)
         for k,v in dict.items():
             self.setValue(k,v,True)
         self.table_name = 'document_product_temp'
 
-    def getPrimaryKey_turple(self):
+    def getPrimary_keys(self):
         return ['id']

+ 31 - 0
BaseDataMaintenance/start_product.py

@@ -0,0 +1,31 @@
+
+import sys
+import os
+sys.path.append(os.path.dirname(__file__)+"/..")
+import argparse
+
+def main(args=None):
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--product_dict_synchonize",dest="product_dict_synchonize",action="store_true",help="start product_dict_synchonize process")
+    parser.add_argument("--delete_product_collections",dest="delete_product_collections",action="store_true",help="start product_dict_synchonize process")
+    parser.add_argument("--search_similar",dest="search_similar",action="store_true",help="start product_dict_synchonize process")
+    parser.add_argument("--start_process_product",dest="start_process_product",action="store_true",help="start product_dict_synchonize process")
+
+    args = parser.parse_args(args)
+    if args.product_dict_synchonize:
+        from BaseDataMaintenance.maintenance.product.product_dict import start_embedding_product_dict
+        start_embedding_product_dict()
+    if args.delete_product_collections:
+        from BaseDataMaintenance.maintenance.product.product_dict import drop_product_dict_collections
+        drop_product_dict_collections()
+    if args.search_similar:
+        from BaseDataMaintenance.maintenance.product.product_dict import search_similar
+        search_similar()
+    if args.start_process_product:
+        from BaseDataMaintenance.maintenance.product.products import start_process_product
+        start_process_product()
+
+
+
+if __name__ == '__main__':
+    main()