Browse Source

产品字典增加接口表,支持增加,删除和修改参数,并且联动历史数据

luojiehua 2 years ago
parent
commit
79a4ce068d

+ 2 - 1
BaseDataMaintenance/maintenance/product/productUtils.py

@@ -253,6 +253,7 @@ def clean_product_quantity(product_quantity):
     return ""
 
 if __name__ == '__main__':
-    print(is_similar('空气波压力治疗仪','空气波治疗仪'))
+    print(is_similar('128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10','VolusonE10'))
+    print(re.split("[\u4e00-\u9fff]",'128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10'))
     import Levenshtein
     print(Levenshtein.ratio('助听器','助行器'))

+ 440 - 17
BaseDataMaintenance/maintenance/product/product_dict.py

@@ -6,6 +6,9 @@ from BaseDataMaintenance.common.multiThread import MultiThreadHandler
 from apscheduler.schedulers.blocking import BlockingScheduler
 
 from BaseDataMaintenance.model.ots.document_product_dict import *
+from BaseDataMaintenance.model.ots.document_product_dict_interface import *
+from BaseDataMaintenance.model.ots.document_product import *
+from BaseDataMaintenance.model.ots.document_product_tmp import *
 
 from BaseDataMaintenance.dataSource.source import getConnect_ots
 
@@ -18,8 +21,10 @@ import time
 import traceback
 import json
 import requests
+from random import randint
 
 
+IS_SYNCHONIZED = 2
 
 class Product_Dict_Manager():
 
@@ -35,6 +40,8 @@ class Product_Dict_Manager():
         self.ots_client = getConnect_ots()
         self.queue_product_dict = Queue()
 
+        self.queue_product_interface = Queue()
+
         self.session = requests.Session()
 
 
@@ -71,13 +78,27 @@ class Product_Dict_Manager():
 
 
 
+    def get_collection(self,grade):
+        Coll = None
+        Coll_name = None
+        if grade ==SPECS_GRADE:
+            Coll = self.Coll_specs
+            Coll_name = self.collection_name_specs
+        if grade == BRAND_GRADE:
+            Coll = self.Coll_brand
+            Coll_name = self.collection_name_brand
+        if grade == NAME_GRADE:
+            Coll = self.Coll_name
+            Coll_name = self.collection_name_name
+        return Coll,Coll_name
+
 
 
     def embedding_producer(self,columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS]):
 
         bool_query = BoolQuery(
             must_queries=[RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3,5,True,True)],
-            must_not_queries=[TermQuery(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,2)])
+            must_not_queries=[TermQuery(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED)])
 
         rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
                                                                             SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
@@ -100,20 +121,6 @@ class Product_Dict_Manager():
 
 
 
-    def get_collection(self,grade):
-        Coll = None
-        Coll_name = None
-        if grade ==SPECS_GRADE:
-            Coll = self.Coll_specs
-            Coll_name = self.collection_name_specs
-        if grade == BRAND_GRADE:
-            Coll = self.Coll_brand
-            Coll_name = self.collection_name_brand
-        if grade == NAME_GRADE:
-            Coll = self.Coll_name
-            Coll_name = self.collection_name_name
-        return Coll,Coll_name
-
     def embedding_comsumer(self):
         def handle(item,result_queue):
             try:
@@ -145,6 +152,7 @@ class Product_Dict_Manager():
                             if _alias==name:
                                 continue
                             _id = get_document_product_dict_standard_alias_id(_alias)
+                            vector = request_embedding(_alias)
                             data = [[_id],
                                     [_alias],
                                     [name],
@@ -154,14 +162,13 @@ class Product_Dict_Manager():
                                     [grade]]
                             insert_embedding(Coll,data)
 
-                    _pd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:2})
+                    _pd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED})
                     _pd.update_row(self.ots_client)
 
 
             except Exception as e:
                 traceback.print_exc()
 
-
         self.embedding_producer()
         start_time = time.time()
         q_size = self.queue_product_dict.qsize()
@@ -169,6 +176,422 @@ class Product_Dict_Manager():
         mt.run()
         log("process embedding %d records cost %.2f s"%(q_size,time.time()-start_time))
 
+    def embedding_interface_producer(self):
+
+        bool_query = BoolQuery(must_queries=[RangeQuery("status",1,50,True,True)])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)]),limit=100,get_total_count=True),
+                                                                            columns_to_get=ColumnsToGet(ColumnReturnType.ALL))
+        list_data = getRow_ots(rows)
+        for _data in list_data:
+            self.queue_product_interface.put(_data)
+
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict_interface","document_product_dict_interface_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                columns_to_get=ColumnsToGet(ColumnReturnType.ALL))
+            list_data = getRow_ots(rows)
+            for _data in list_data:
+                self.queue_product_interface.put(_data)
+            if self.queue_product_dict.qsize()>1000:
+                break
+        log("embedding interface total_count %d"%(total_count))
+
+    def process_history_name(self,list_name,action):
+        if action=="insert":
+            # search document_product_temp and update status
+            # query in blur mode
+            for name in list_name:
+                should_q = self.make_query(name,DOCUMENT_PRODUCT_TMP_NAME,MatchPhraseQuery,4,5)
+                if should_q is None:
+                    continue
+                bool_query =BoolQuery(must_queries=[
+                    RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,401,450,True,True),
+                    should_q
+                ])
+
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
+                                                                                    columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                list_data = getRow_ots(rows)
+                while next_token:
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
+                                                                                        SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                        columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                    list_data.extend(getRow_ots(rows))
+                for _data in list_data:
+                    id = _data.get(DOCUMENT_PRODUCT_TMP_ID)
+                    status = randint(1,50)
+                    _d = {DOCUMENT_PRODUCT_TMP_ID:id,
+                          DOCUMENT_PRODUCT_TMP_STATUS:status}
+                    _dt = Document_product_tmp(_d)
+                    _dt.update_row(self.ots_client)
+
+        elif action=="delete":
+            # delete document_product
+            # update temp new_id and status to 401-450
+
+            for name in list_name:
+                bool_query = self.make_query(name,DOCUMENT_PRODUCT_NAME,TermQuery,len(name),5)
+                if bool_query is not None:
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
+                                                                                        columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                    list_data = getRow_ots(rows)
+                    while next_token:
+                        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                            columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                        list_data.extend(getRow_ots(rows))
+                    for _d in list_data:
+                        _id = _d.get(DOCUMENT_PRODUCT_ID)
+
+                        _d = {DOCUMENT_PRODUCT_ID:_id}
+                        dp = Document_product(_d)
+                        dp.delete_row(self.ots_client)
+
+                        original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
+                        _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:randint(401,450)}
+                        dpt = Document_product_tmp(_d)
+                        dpt.update_row(self.ots_client)
+
+
+        elif action=="update":
+            # delete document_product and update document_product_temp to rerun
+            for name in list_name:
+                bool_query = self.make_query(name,DOCUMENT_PRODUCT_NAME,TermQuery,len(name),5)
+                if bool_query is not None:
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                        SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
+                                                                                        columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                    list_data = getRow_ots(rows)
+                    while next_token:
+                        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                            SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                            columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                        list_data.extend(getRow_ots(rows))
+                    for _d in list_data:
+                        _id = _d.get(DOCUMENT_PRODUCT_ID)
+                        original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
+                        self.rerun(_id,original_id)
+
+    def process_history_brand(self,list_brand,action):
+        # search document_product and rerun
+        for name in list_brand:
+            if action=="insert":
+                bool_query = self.make_query(name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,4,5)
+            else:
+                bool_query = self.make_query(name,DOCUMENT_PRODUCT_BRAND,TermQuery,len(name),5)
+            if bool_query is not None:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
+                                                                                    columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                list_data = getRow_ots(rows)
+                while next_token:
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                        SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                        columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                    list_data.extend(getRow_ots(rows))
+                for _d in list_data:
+                    _id = _d.get(DOCUMENT_PRODUCT_ID)
+                    original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
+                    self.rerun(_id,original_id)
+
+
+    def process_history_specs(self,list_specs,action):
+        # search document_product and rerun
+
+        for name in list_specs:
+            if action=="insert":
+                bool_query = self.make_query(name,DOCUMENT_PRODUCT_ORIGINAL_SPECS,MatchPhraseQuery,len(name),5)
+            else:
+                bool_query = self.make_query(name,DOCUMENT_PRODUCT_SPECS,TermQuery,len(name),5)
+            if bool_query is not None:
+                rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
+                                                                                    columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                list_data = getRow_ots(rows)
+                while next_token:
+                    rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
+                                                                                        SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                        columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
+                    list_data.extend(getRow_ots(rows))
+                for _d in list_data:
+                    _id = _d.get(DOCUMENT_PRODUCT_ID)
+                    original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
+                    self.rerun(_id,original_id)
+
+    def rerun(self,id,original_id):
+        _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
+        dpt = Document_product_tmp(_d)
+        dpt.update_row(self.ots_client)
+
+        _d = {DOCUMENT_PRODUCT_ID:id}
+        dp = Document_product(_d)
+        dp.delete_row(self.ots_client)
+
+
+    def make_query(self,name,column,query_type,min_len,strides):
+
+        should_q = []
+        strides_spce = len(name)-min_len+1
+        for _i in range(min(strides_spce,strides)):
+            _s = str(name)[_i:min_len]
+            if query_type==WildcardQuery:
+                should_q.append(query_type(column,"*%s*"%_s))
+            elif query_type==TermQuery or query_type==MatchPhraseQuery:
+                should_q.append(query_type(column,"%s"%(_s)))
+        if len(should_q)>0:
+            return BoolQuery(should_queries=should_q)
+        return None
+
+
+
+
+    def process_history(self,list_name,grade,action):
+        if grade==NAME_GRADE:
+            self.process_history_name(list_name,action)
+        elif grade==BRAND_GRADE:
+            self.process_history_brand(list_name,action)
+        elif grade==SPECS_GRADE:
+            self.process_history_specs(list_name,action)
+
+    def exists_records(self,name,grade,create_time):
+        term_columns = None
+        if grade==NAME_GRADE:
+            term_columns = DOCUMENT_PRODUCT_NAME
+        elif grade==BRAND_GRADE:
+            term_columns = DOCUMENT_PRODUCT_BRAND
+        elif grade==SPECS_GRADE:
+            term_columns = DOCUMENT_PRODUCT_SPECS
+        if term_columns is not None:
+            bool_query = BoolQuery(must_queries=[
+                TermQuery(term_columns,str(name)),
+                RangeQuery(DOCUMENT_PRODUCT_DICT_CREATE_TIME,None,str(create_time))
+            ])
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product",
+                                                                                SearchQuery(bool_query,get_total_count=True,limit=1),
+                                                                                columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
+            if total_count>0:
+                return True
+        return False
+
+
+    def act_insert(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
+        #update document_product_dict
+        if original_id is None or original_id=="":
+            original_id = get_document_product_dict_id(parent_id,name)
+        _d = {DOCUMENT_PRODUCT_DICT_ID:original_id,
+              DOCUMENT_PRODUCT_DICT_ALIAS:alias,
+              DOCUMENT_PRODUCT_DICT_GRADE:grade,
+              DOCUMENT_PRODUCT_DICT_PARENT_ID:parent_id,
+              DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias,
+              DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+              DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
+        _dpd = Document_product_dict(_d)
+        _dpd.update_row(self.ots_client)
+
+        list_name = []
+        #update milvus
+        vector = request_embedding(name)
+        Coll,_ = self.get_collection(grade)
+        if vector is not None and Coll is not None:
+            id = original_id
+            data = [[id],
+                    [name],
+                    [name],
+                    [id],
+                    [vector],
+                    [parent_id],
+                    [grade]]
+            insert_embedding(Coll,data)
+            list_name.append(name)
+
+            if standard_alias is not None and standard_alias!="":
+                list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
+                for _alias in list_alias:
+                    _alias = _alias.strip()
+                    if len(_alias)==0:
+                        continue
+                    if _alias==name:
+                        continue
+                    _id = get_document_product_dict_standard_alias_id(_alias)
+                    vector = request_embedding(_alias)
+                    data = [[_id],
+                            [_alias],
+                            [name],
+                            [id],
+                            [vector],
+                            [parent_id],
+                            [grade]]
+                    insert_embedding(Coll,data)
+                    list_name.append(_alias)
+
+        #judge whether there exists records before this record created,if not process the history data
+        if not self.exists_records(name,grade,create_time):
+            self.process_history(list_name,grade,"insert")
+
+
+    def act_update(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
+        # check whether there are change variable
+        _d = {DOCUMENT_PRODUCT_DICT_ID:original_id}
+        dpd = Document_product_dict(_d)
+        if not dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],True):
+            return
+
+        old_name = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_NAME)
+        old_name_set = set([old_name])
+        _alias = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS)
+        if _alias is not None:
+            for s in _alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR):
+                if s!="":
+                    old_name_set.add(s)
+
+        new_name_set = set([name])
+        if standard_alias is not None:
+            for s in standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR):
+                if s!="":
+                    new_name_set.add(s)
+        if len(new_name_set)==len(old_name_set) and len(new_name_set)==len(new_name_set&old_name_set):
+            return
+
+        # update the milvus
+        Coll,_ = self.get_collection(grade)
+        o_id = original_id
+        expr = " ots_id in ['%s']"%o_id
+        Coll.delete(expr)
+
+        _alias = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS)
+        if _alias is not None and _alias!="":
+            list_alias = _alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
+            for _alias in list_alias:
+                _alias = _alias.strip()
+                if len(_alias)==0:
+                    continue
+                if _alias==name:
+                    continue
+                _id = get_document_product_dict_standard_alias_id(_alias)
+                expr = " ots_id in ['%s']"%o_id
+                Coll.delete(expr)
+
+        list_name = []
+        vector = request_embedding(name)
+        if vector is not None and Coll is not None:
+            id = original_id
+            data = [[id],
+                    [name],
+                    [name],
+                    [id],
+                    [vector],
+                    [parent_id],
+                    [grade]]
+            insert_embedding(Coll,data)
+            list_name.append(name)
+
+            if standard_alias is not None and standard_alias!="":
+                list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
+                for _alias in list_alias:
+                    _alias = _alias.strip()
+                    if len(_alias)==0:
+                        continue
+                    if _alias==name:
+                        continue
+                    _id = get_document_product_dict_standard_alias_id(_alias)
+                    vector = request_embedding(_alias)
+                    data = [[_id],
+                            [_alias],
+                            [name],
+                            [id],
+                            [vector],
+                            [parent_id],
+                            [grade]]
+                    insert_embedding(Coll,data)
+                    list_name.append(_alias)
+
+        # process history
+        insert_names = list(new_name_set-old_name_set)
+        if len(insert_names)>0:
+            self.process_history(insert_names,grade,"insert")
+        delete_names = list(old_name_set-new_name_set)
+        if len(delete_names)>0:
+            self.process_history([old_name],grade,"update")
+
+
+        # update document_product_dict
+        _d = {DOCUMENT_PRODUCT_DICT_ID:original_id,
+              DOCUMENT_PRODUCT_DICT_NAME:name,
+              DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias}
+        dpd = Document_product_dict(_d)
+        dpd.update_row(self.ots_client)
+
+
+    def act_delete(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
+        #search records which name=name and grade=grade
+        bool_query = BoolQuery(must_queries=[
+            TermQuery(DOCUMENT_PRODUCT_DICT_NAME,str(name)),
+            TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
+        ])
+
+        rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                            SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
+                                                                            columns_to_get=ColumnsToGet(ColumnReturnType.ALL))
+        if total_count==0:
+            return
+        list_data = getRow_ots(rows)
+        while next_token:
+            rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_dict","document_product_dict_index",
+                                                                                SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
+                                                                                columns_to_get=ColumnsToGet(ColumnReturnType.ALL))
+            list_data.extend(getRow_ots(rows))
+
+        #delete milvus records
+        Coll,_ = self.get_collection(grade)
+        for _data in list_data:
+            o_id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
+            expr = " ots_id in ['%s']"%o_id
+            Coll.delete(expr)
+
+        #process_history data
+        self.process_history([name],grade,"delete")
+
+        #delete document_product_dict
+        for _data in list_data:
+            id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
+            _d = {DOCUMENT_PRODUCT_DICT_ID:id}
+            dpdi = Document_product_dict_interface(_d)
+            dpdi.delete_row(self.ots_client)
+
+
+    def embedding_interface_comsumer(self):
+        def _handle(item,result_queue):
+            id = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ID)
+            action = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION)
+            name = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_NAME)
+            alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS)
+            grade = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE)
+            original_id = item.get(DOCUMENT_PRODUCT_DICT_ORIGINAL_ID)
+            parent_id = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID)
+            standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
+            create_time = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)
+
+            if action=="insert":
+                self.act_insert(name,alias,grade,original_id,parent_id,standard_alias,create_time)
+            elif action=="update":
+                self.act_update(name,alias,grade,original_id,parent_id,standard_alias,create_time)
+            elif action=="delete":
+                self.act_delete(name,alias,grade,original_id,parent_id,standard_alias,create_time)
+
+            _pdi = Document_product_dict_interface({DOCUMENT_PRODUCT_DICT_INTERFACE_ID:id,
+                                                    DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300)})
+            _pdi.update_row(self.ots_client)
+
+        self.embedding_interface_producer()
+
+        mt = MultiThreadHandler(self.queue_product_interface,_handle,None,5,1)
+        mt.run()
+
+
 
     def start_embedding_product_dict(self):
         from apscheduler.schedulers.blocking import BlockingScheduler

+ 139 - 113
BaseDataMaintenance/maintenance/product/products.py

@@ -237,97 +237,114 @@ class Product_Manager(Product_Dict_Manager):
 
 
             if specs is not None and specs!="":
-                specs_vector = request_embedding(specs)
                 debug("getting sepcs %s"%(specs))
-                if specs_vector is not None:
-                    Coll,_ = self.get_collection(SPECS_GRADE)
-                    search_list = search_embedding(Coll,embedding_index_name,[specs_vector],self.search_params,output_fields,limit=60)
-
-                    for _search in search_list:
-
-                        ots_id = _search.entity.get("standard_name_id")
-                        ots_name = _search.entity.get("standard_name")
-                        ots_parent_id = _search.entity.get("ots_parent_id")
-
-                        # log("checking specs %s and %s"%(specs,ots_name))
-                        if is_similar(specs,ots_name):
-                            # log("specs is_similar")
-                            if check_specs(specs,ots_name):
-                                # log("check_specs succeed")
-                                new_specs = ots_name
-
-                                # to update the document_product_dict which is builded for search
-                                if brand_ots_id is not None:
-                                    # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
-                                    specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
-
-                                    _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
-                                                DOCUMENT_PRODUCT_DICT_NAME:new_specs,
-                                                DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(specs).lower()),
-                                                DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
-                                                DOCUMENT_PRODUCT_DICT_STATUS:1,
-                                                DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
-                                                DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                                DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                                }
-                                    _dpd_specs = Document_product_dict(_d_specs)
-                                    # _dpd_specs.updateAlias(str(new_specs).lower())
-                                    if not _dpd_specs.exists_row(self.ots_client):
-                                        _dpd_specs.update_row(self.ots_client)
-                                    else:
-                                        pass
-                                        # #update alias
-                                        # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
-                                        # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
-                                        # if _flag:
-                                        #     if _dpd.updateAlias(specs):
-                                        #         _dpd.update_row(self.ots_client)
-                            else:
-                                # log("check_specs failed")
-                                new_specs = clean_product_specs(specs)
-                                # insert into document_product_dict a new record
-                                # to update the document_product_dict which is builded for search
-                                # add new specs
-                                if brand_ots_id is not None and name_ots_id is not None:
-                                    _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
-                                    _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
-                                          DOCUMENT_PRODUCT_DICT_NAME:new_specs,
-                                          DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(new_specs.lower()),
-                                          DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
-                                          DOCUMENT_PRODUCT_DICT_STATUS:1,
-                                          DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
-                                          DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                          DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                          }
-                                    _dpd = Document_product_dict(_d)
-                                    # _dpd.updateAlias(new_specs)
-                                    _dpd.update_row(self.ots_client)
-                            break
+                list_specs = []
+                c_specs = clean_product_specs(specs)
+                list_specs.append(c_specs)
+
+                for s in re.split("[\u4e00-\u9fff]",specs):
+                    if s!="" and len(s)>4:
+                        list_specs.append(s)
+                similar_flag = None
+                _index = 0
+                break_flag = False
+                for c_specs in list_specs:
+                    if break_flag:
+                        break
+                    _index += 1
+                    specs_vector = request_embedding(c_specs)
 
-                        else:
-                            # add new specs?
-                            # log("specs not similar")
-                            if is_legal_specs(specs):
-                                # log("is_legal_specs")
-                                new_specs = clean_product_specs(specs)
-                                # insert into document_product_dict a new record
-                                # to update the document_product_dict which is builded for search
-                                # add new specs
-                                if brand_ots_id is not None and name_ots_id is not None:
-                                    _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
-                                    _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
-                                          DOCUMENT_PRODUCT_DICT_NAME:new_specs,
-                                          DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
-                                          DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
-                                          DOCUMENT_PRODUCT_DICT_STATUS:1,
-                                          DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
-                                          DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                          DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
-                                          }
-                                    _dpd = Document_product_dict(_d)
-                                    _dpd.update_row(self.ots_client)
-                                break
+                    if specs_vector is not None:
+                        Coll,_ = self.get_collection(SPECS_GRADE)
+                        search_list = search_embedding(Coll,embedding_index_name,[specs_vector],self.search_params,output_fields,limit=60)
+
+                        for _search in search_list:
+
+                            ots_id = _search.entity.get("standard_name_id")
+                            ots_name = _search.entity.get("standard_name")
+                            ots_parent_id = _search.entity.get("ots_parent_id")
 
+                            log("checking specs %s and %s"%(specs,ots_name))
+                            if is_similar(specs,ots_name):
+                                # log("specs is_similar")
+                                if check_specs(c_specs,ots_name):
+                                    log("check_specs succeed")
+                                    break_flag = True
+                                    new_specs = ots_name
+
+                                    # to update the document_product_dict which is builded for search
+                                    if brand_ots_id is not None:
+                                        # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
+                                        specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
+
+                                        _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
+                                                    DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                                    DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(specs).lower()),
+                                                    DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                                    DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                                    DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                                    DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                                    DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                                    }
+                                        _dpd_specs = Document_product_dict(_d_specs)
+                                        # _dpd_specs.updateAlias(str(new_specs).lower())
+                                        if not _dpd_specs.exists_row(self.ots_client):
+                                            _dpd_specs.update_row(self.ots_client)
+                                        else:
+                                            pass
+                                            # #update alias
+                                            # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
+                                            # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
+                                            # if _flag:
+                                            #     if _dpd.updateAlias(specs):
+                                            #         _dpd.update_row(self.ots_client)
+                                    break
+                                else:
+                                    if _index == 1:
+                                        similar_flag = True
+
+                if not break_flag and similar_flag:
+                    log("check_specs failed")
+                    new_specs = clean_product_specs(specs)
+                    # insert into document_product_dict a new record
+                    # to update the document_product_dict which is builded for search
+                    # add new specs
+                    if brand_ots_id is not None and name_ots_id is not None:
+                        _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
+                        _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
+                              DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                              DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(new_specs.lower()),
+                              DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                              DOCUMENT_PRODUCT_DICT_STATUS:1,
+                              DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                              DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                              DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                              }
+                        _dpd = Document_product_dict(_d)
+                        # _dpd.updateAlias(new_specs)
+                        _dpd.update_row(self.ots_client)
+                else:
+                    # add new specs?
+                    debug("specs not similar")
+                    if is_legal_specs(specs):
+                        debug("is_legal_specs")
+                        new_specs = clean_product_specs(specs)
+                        # insert into document_product_dict a new record
+                        # to update the document_product_dict which is builded for search
+                        # add new specs
+                        if brand_ots_id is not None and name_ots_id is not None:
+                            _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
+                            _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
+                                  DOCUMENT_PRODUCT_DICT_NAME:new_specs,
+                                  DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
+                                  DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
+                                  DOCUMENT_PRODUCT_DICT_STATUS:1,
+                                  DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
+                                  DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                  DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
+                                  }
+                            _dpd = Document_product_dict(_d)
+                            _dpd.update_row(self.ots_client)
 
         # judge if the product matches the standard product
         if name_ots_id is not None:
@@ -425,7 +442,7 @@ class Product_Manager(Product_Dict_Manager):
                 attachments = json.loads(page_attachments)
                 for _a in attachments:
                     _filemd5 = _a.get(document_attachment_path_filemd5)
-                    if _filemd5 in set_md5s:
+                    if _filemd5 in set_md5s or _filemd5 is None:
                         continue
                     set_md5s.add(_filemd5)
                     _da = {attachment_filemd5:_filemd5}
@@ -532,7 +549,6 @@ class Product_Manager(Product_Dict_Manager):
         return None,1
 
 
-
     def dumplicate(self,document_product):
         '''
         Duplicates the product data
@@ -560,7 +576,8 @@ class Product_Manager(Product_Dict_Manager):
         scheduler = BlockingScheduler()
         scheduler.add_job(self.producer,"cron",second="*/20")
         scheduler.add_job(self.comsumer,"cron",minute="*/1")
-        scheduler.add_job(self.embedding_comsumer,"cron",minute="*/1")
+        # scheduler.add_job(self.embedding_comsumer,"cron",minute="*/1")
+        scheduler.add_job(self.embedding_interface_comsumer,"cron",second="*/20")
         scheduler.start()
 
     def test(self):
@@ -598,36 +615,30 @@ def fix_product_data():
     :return:
     '''
     ots_client = getConnect_ots()
-    bool_query = BoolQuery(must_queries=[TermQuery("docid",309258275)
+    bool_query = BoolQuery(must_queries=[RangeQuery("status",1)
                                          ])
 
     rows,next_token,total_count,is_all_succeed = ots_client.search("document_product","document_product_index",
                                                                    SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
-                                                                   columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DOCID,DOCUMENT_PRODUCT_PROJECT_NAME],return_type=ColumnReturnType.SPECIFIED))
+                                                                   columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
 
     list_rows = getRow_ots(rows)
     while next_token:
         rows,next_token,total_count,is_all_succeed = ots_client.search('document_product','document_product_index',
                                                                        SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
-                                                                       columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DOCID,DOCUMENT_PRODUCT_PROJECT_NAME],return_type=ColumnReturnType.SPECIFIED))
+                                                                       columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DOCID],return_type=ColumnReturnType.SPECIFIED))
         list_rows.extend(getRow_ots(rows))
+        print("%d/%d"%(len(list_rows),total_count))
+        # if len(list_rows)>10000:
+        #     break
 
     task_queue = Queue()
     for d in list_rows:
         task_queue.put(d)
-    def handle(item,result_queue):
-        original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
-
-        # # delete data and rerun
-        # _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
-        # dpt = Document_product_tmp(_d)
-        # dpt.update_row(ots_client)
-        #
-        # _d = {DOCUMENT_PRODUCT_ID:item.get(DOCUMENT_PRODUCT_ID)}
-        # dp = Document_product(_d)
-        # dp.delete_row(ots_client)
 
+    def fix_missing_data(item,result_queue):
 
+        original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
         _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
         dpt = Document_product_tmp(_d)
         dpt.fix_columns(ots_client,["name","brand","specs"],True)
@@ -638,13 +649,14 @@ def fix_product_data():
         #fix the project_code and original_name and bidi_filemd5s
         docid = int(item.get(DOCUMENT_PRODUCT_DOCID))
         partitionkey = docid%500+1
-        project_name = item.get(DOCUMENT_PRODUCT_PROJECT_NAME,"")
-        if project_name=="":
-            #fix project_name
-            _doc = Document({"partitionkey":partitionkey,
-                             "docid":docid})
-            _doc.fix_columns(ots_client,["doctitle"],True)
-            dp.setValue(DOCUMENT_PRODUCT_DOCTITLE,_doc.getProperties().get("doctitle"),True)
+
+        # project_name = item.get(DOCUMENT_PRODUCT_PROJECT_NAME,"")
+        # if project_name=="":
+        #     #fix project_name
+        #     _doc = Document({"partitionkey":partitionkey,
+        #                      "docid":docid})
+        #     _doc.fix_columns(ots_client,["doctitle"],True)
+        #     dp.setValue(DOCUMENT_PRODUCT_DOCTITLE,_doc.getProperties().get("doctitle"),True)
         bid_filemd5s = Product_Manager.get_bid_filemd5s(docid,ots_client)
         if bid_filemd5s is not None:
             dp.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
@@ -654,6 +666,20 @@ def fix_product_data():
         dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,""),True)
         dp.update_row(ots_client)
 
+    def deleteAndReprocess(item,result_queue):
+
+        original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
+        # delete data and rerun
+        _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
+        dpt = Document_product_tmp(_d)
+        dpt.update_row(ots_client)
+
+        _d = {DOCUMENT_PRODUCT_ID:item.get(DOCUMENT_PRODUCT_ID)}
+        dp = Document_product(_d)
+        dp.delete_row(ots_client)
+
+    def handle(item,result_queue):
+        print("handle")
 
     mt = MultiThreadHandler(task_queue,handle,None,30,1)
     mt.run()
@@ -669,7 +695,7 @@ if __name__ == '__main__':
     # start_process_product()
     # print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))
     test()
-    print(Product_Manager.get_bid_filemd5s(174802483,getConnect_ots()))
+    print(Product_Manager.get_bid_filemd5s(155415770,getConnect_ots()))
     name = "一"
     ots_name = "一氧化碳分析仪"
     print(is_similar(name,ots_name),check_product(name,ots_name))

+ 39 - 0
BaseDataMaintenance/model/ots/document_product_dict_interface.py

@@ -0,0 +1,39 @@
+
+from BaseDataMaintenance.model.ots.BaseModel import BaseModel
+
+DOCUMENT_PRODUCT_DICT_INTERFACE_ID = "id" #uuid
+DOCUMENT_PRODUCT_DICT_ORIGINAL_ID = "original_id"
+DOCUMENT_PRODUCT_DICT_INTERFACE_NAME = "name"
+DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS = "alias"
+DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE = "grade"
+DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS = "status" # 1-50 for todo 201-300 for done
+DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID = "parent_id"
+DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME = "create_time"
+DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME = "update_time"
+
+DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS = "standard_alias"
+DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR = "|"
+DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION = "action" #insert delete update
+
+MAX_NAME_LENGTH = 300
+
+class Document_product_dict_interface(BaseModel):
+
+    def __init__(self,_dict):
+        BaseModel.__init__(self)
+        for k,v in _dict.items():
+            self.setValue(k,v,True)
+        self.table_name = "document_product_dict_interface"
+
+    def getPrimary_keys(self):
+        return ["id"]
+
+
+
+
+from BaseDataMaintenance.common.documentFingerprint import getMD5
+def get_document_product_dict_id(parent_md5,name):
+    return getMD5(parent_md5+"&&%s"%name)
+
+def get_document_product_dict_standard_alias_id(name):
+    return getMD5("alias&&%s"%name)