|
@@ -24,7 +24,7 @@ import requests
|
|
from random import randint
|
|
from random import randint
|
|
|
|
|
|
|
|
|
|
-IS_SYNCHONIZED = 2
|
|
|
|
|
|
+IS_SYNCHONIZED = 3
|
|
|
|
|
|
class Product_Dict_Manager():
|
|
class Product_Dict_Manager():
|
|
|
|
|
|
@@ -104,10 +104,13 @@ class Product_Dict_Manager():
|
|
def embedding_producer(self,columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,DOCUMENT_PRODUCT_DICT_LEVEL]):
|
|
def embedding_producer(self,columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,DOCUMENT_PRODUCT_DICT_LEVEL]):
|
|
|
|
|
|
bool_query = BoolQuery(
|
|
bool_query = BoolQuery(
|
|
- must_queries=[RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3,5,True,True)],
|
|
|
|
|
|
+ must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
|
|
|
|
+ RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,3,5,True,True)
|
|
|
|
+ ],
|
|
must_not_queries=[TermQuery(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED)])
|
|
must_not_queries=[TermQuery(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED)])
|
|
|
|
|
|
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
|
|
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
|
|
SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
|
|
SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
|
|
columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
|
|
columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
|
|
|
|
|
|
@@ -116,18 +119,17 @@ class Product_Dict_Manager():
|
|
self.queue_product_dict.put(_d)
|
|
self.queue_product_dict.put(_d)
|
|
|
|
|
|
while next_token:
|
|
while next_token:
|
|
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
|
|
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
|
|
SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
|
|
SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
|
|
columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
|
|
columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
|
|
list_dict = getRow_ots(rows)
|
|
list_dict = getRow_ots(rows)
|
|
for _d in list_dict:
|
|
for _d in list_dict:
|
|
self.queue_product_dict.put(_d)
|
|
self.queue_product_dict.put(_d)
|
|
- if self.queue_product_dict.qsize()>=1000:
|
|
|
|
|
|
+ if self.queue_product_dict.qsize()>=10000:
|
|
break
|
|
break
|
|
log("product_dict embedding total_count:%d"%total_count)
|
|
log("product_dict embedding total_count:%d"%total_count)
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
def embedding_comsumer(self):
|
|
def embedding_comsumer(self):
|
|
def handle(item,result_queue):
|
|
def handle(item,result_queue):
|
|
try:
|
|
try:
|
|
@@ -141,10 +143,9 @@ class Product_Dict_Manager():
|
|
remove_words = item.get(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,"")
|
|
remove_words = item.get(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,"")
|
|
level = item.get(DOCUMENT_PRODUCT_DICT_LEVEL,1)
|
|
level = item.get(DOCUMENT_PRODUCT_DICT_LEVEL,1)
|
|
|
|
|
|
-
|
|
|
|
if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level):
|
|
if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level):
|
|
|
|
|
|
- _pd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:_id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED})
|
|
|
|
|
|
+ _pd = Document_product_dict_interface({DOCUMENT_PRODUCT_DICT_ID:_id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED})
|
|
_pd.update_row(self.ots_client)
|
|
_pd.update_row(self.ots_client)
|
|
|
|
|
|
|
|
|
|
@@ -294,7 +295,6 @@ class Product_Dict_Manager():
|
|
|
|
|
|
|
|
|
|
def make_query(self,name,column,query_type,min_len,strides):
|
|
def make_query(self,name,column,query_type,min_len,strides):
|
|
-
|
|
|
|
should_q = []
|
|
should_q = []
|
|
strides_spce = len(name)-min_len+1
|
|
strides_spce = len(name)-min_len+1
|
|
for _i in range(min(strides_spce,strides)):
|
|
for _i in range(min(strides_spce,strides)):
|
|
@@ -308,9 +308,8 @@ class Product_Dict_Manager():
|
|
return None
|
|
return None
|
|
|
|
|
|
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- def process_history(self,list_name,grade,action):
|
|
|
|
|
|
+ def process_history_by_name(self,list_name,grade,action):
|
|
|
|
+ assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_UPDATE,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
|
|
if grade==NAME_GRADE:
|
|
if grade==NAME_GRADE:
|
|
self.process_history_name(list_name,action)
|
|
self.process_history_name(list_name,action)
|
|
elif grade==BRAND_GRADE:
|
|
elif grade==BRAND_GRADE:
|
|
@@ -318,6 +317,315 @@ class Product_Dict_Manager():
|
|
elif grade==SPECS_GRADE:
|
|
elif grade==SPECS_GRADE:
|
|
self.process_history_specs(list_name,action)
|
|
self.process_history_specs(list_name,action)
|
|
|
|
|
|
|
|
+ def process_history_by_standard_name(self,name,grade,list_name,action):
|
|
|
|
+ assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
|
|
|
|
+ if grade==NAME_GRADE:
|
|
|
|
+ if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
|
|
|
|
+ for n_name in list_name:
|
|
|
|
+ bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_NAME,TermQuery,len(n_name),5)
|
|
|
|
+ if bool_query is not None:
|
|
|
|
+ _query = bool_query
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ total_count = total_count
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_name_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
|
|
|
|
+ for dict_id in [dict_name_id]:
|
|
|
|
+ if dict_id is not None and dict_id!="":
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
|
|
|
|
+ self.recurse_delete_dict(dict_name_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+ log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
|
|
|
|
+ if total_count==0:
|
|
|
|
+ self.process_history_name([n_name],action)
|
|
|
|
+
|
|
|
|
+ elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
|
|
|
|
+ for n_name in list_name:
|
|
|
|
+ bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_NAME,MatchPhraseQuery,len(n_name),5)
|
|
|
|
+ if bool_query is not None:
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_NAME,name),
|
|
|
|
+ bool_query
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_name_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
|
|
|
|
+ if dict_name_id is not None and dict_name_id!="":
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_NAME_ID,dict_name_id)
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,get_total_count=True))
|
|
|
|
+ if total_count==1:
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_name_id})
|
|
|
|
+ self.recurse_delete_dict(dict_name_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+ if grade==BRAND_GRADE:
|
|
|
|
+ if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
|
|
|
|
+ for n_name in list_name:
|
|
|
|
+ bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_BRAND,TermQuery,len(n_name),5)
|
|
|
|
+ if bool_query is not None:
|
|
|
|
+ _query = bool_query
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
|
|
|
|
+ for dict_id in [dict_brand_id]:
|
|
|
|
+ if dict_id is not None and dict_id!="":
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
|
|
|
|
+ self.recurse_delete_dict(dict_brand_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+ log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
|
|
|
|
+ if total_count==0:
|
|
|
|
+ self.process_history_brand([n_name],action)
|
|
|
|
+
|
|
|
|
+ elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
|
|
|
|
+ for n_name in list_name:
|
|
|
|
+ bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,4,5)
|
|
|
|
+ if bool_query is not None:
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_BRAND,name),
|
|
|
|
+ bool_query
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
|
|
|
|
+ if dict_brand_id is not None and dict_brand_id!="":
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,get_total_count=True))
|
|
|
|
+ if total_count==1:
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
|
|
|
|
+ self.recurse_delete_dict(dict_brand_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+ if grade==SPECS_GRADE:
|
|
|
|
+ if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
|
|
|
|
+ for n_name in list_name:
|
|
|
|
+ bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_SPECS,TermQuery,len(n_name),5)
|
|
|
|
+ if bool_query is not None:
|
|
|
|
+ _query = bool_query
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
|
|
|
|
+ for dict_id in [dict_specs_id]:
|
|
|
|
+ if dict_id is not None and dict_id!="":
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
|
|
|
|
+ self.recurse_delete_dict(dict_specs_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+ log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
|
|
|
|
+ if total_count==0:
|
|
|
|
+ self.process_history_specs([n_name],action)
|
|
|
|
+
|
|
|
|
+ elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_SPECS,name),
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
|
|
|
|
+ if dict_specs_id is not None and dict_specs_id!="":
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_SPECS_ID,dict_brand_id)
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,get_total_count=True))
|
|
|
|
+ if total_count==1:
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_specs_id})
|
|
|
|
+ self.recurse_delete_dict(dict_specs_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ def process_history_by_remove_words(self,name,grade,list_name,action):
|
|
|
|
+ assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
|
|
|
|
+ if grade==NAME_GRADE:
|
|
|
|
+ if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
|
|
|
|
+ for n_name in list_name:
|
|
|
|
+ bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_NAME,MatchPhraseQuery,len(n_name),5)
|
|
|
|
+ if bool_query is not None:
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_NAME,name),
|
|
|
|
+ bool_query
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_name_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
|
|
|
|
+ if dict_name_id is not None and dict_name_id!="":
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_NAME_ID,dict_name_id)
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,get_total_count=True))
|
|
|
|
+ if total_count==1:
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_name_id})
|
|
|
|
+ self.recurse_delete_dict(dict_name_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+
|
|
|
|
+ elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
|
|
|
|
+ self.process_history_name(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
|
|
|
|
+ if grade==BRAND_GRADE:
|
|
|
|
+ if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
|
|
|
|
+ for n_name in list_name:
|
|
|
|
+ bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,len(n_name),5)
|
|
|
|
+ if bool_query is not None:
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_BRAND,name),
|
|
|
|
+ bool_query
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
|
|
|
|
+ if dict_brand_id is not None and dict_brand_id!="":
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,get_total_count=True))
|
|
|
|
+ if total_count==1:
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
|
|
|
|
+ self.recurse_delete_dict(dict_brand_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+
|
|
|
|
+ elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
|
|
|
|
+ self.process_history_brand(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
|
|
|
|
+ if grade==SPECS_GRADE:
|
|
|
|
+ if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
|
|
|
|
+ for n_name in list_name:
|
|
|
|
+ bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_SPECS,MatchPhraseQuery,len(n_name),5)
|
|
|
|
+ if bool_query is not None:
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_SPECS,name),
|
|
|
|
+ bool_query
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+
|
|
|
|
+ for _d in list_data:
|
|
|
|
+ dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
|
|
|
|
+ if dict_specs_id is not None and dict_specs_id!="":
|
|
|
|
+ _query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_SPECS_ID,dict_brand_id)
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
|
|
|
|
+ SearchQuery(_query,get_total_count=True))
|
|
|
|
+ if total_count==1:
|
|
|
|
+ dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_specs_id})
|
|
|
|
+ self.recurse_delete_dict(dict_specs_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
+ _id = _d.get(DOCUMENT_PRODUCT_ID)
|
|
|
|
+ original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
|
|
|
|
+ self.rerun(_id,original_id)
|
|
|
|
+
|
|
|
|
+ elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
|
|
|
|
+ self.process_history_specs(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
|
|
|
|
+
|
|
|
|
+
|
|
def exists_records(self,name,grade,create_time):
|
|
def exists_records(self,name,grade,create_time):
|
|
term_columns = None
|
|
term_columns = None
|
|
if grade==NAME_GRADE:
|
|
if grade==NAME_GRADE:
|
|
@@ -339,8 +647,7 @@ class Product_Dict_Manager():
|
|
return False
|
|
return False
|
|
|
|
|
|
|
|
|
|
- def act_insert(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
|
|
|
|
-
|
|
|
|
|
|
+ def act_insert(self,name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level):
|
|
|
|
|
|
#update document_product_dict
|
|
#update document_product_dict
|
|
if original_id is None or original_id=="":
|
|
if original_id is None or original_id=="":
|
|
@@ -363,14 +670,24 @@ class Product_Dict_Manager():
|
|
_dpd.update_row(self.ots_client)
|
|
_dpd.update_row(self.ots_client)
|
|
|
|
|
|
# search interface if name and grade exists then update document_product_dict and return
|
|
# search interface if name and grade exists then update document_product_dict and return
|
|
- bool_query = BoolQuery(must_queries=[
|
|
|
|
- TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_NAME,name),
|
|
|
|
- TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,grade),
|
|
|
|
- RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS,201,301)
|
|
|
|
- ])
|
|
|
|
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
|
|
|
|
- SearchQuery(bool_query,get_total_count=True))
|
|
|
|
- if total_count>0:
|
|
|
|
|
|
+
|
|
|
|
+ interface_id = get_milvus_product_dict_id(name)
|
|
|
|
+ _interface_d = {
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:alias,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300),
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:parent_id,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:standard_alias,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:remove_words,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL:level
|
|
|
|
+ }
|
|
|
|
+ _dpdi = Document_product_dict_interface(_interface_d)
|
|
|
|
+ if _dpdi.exists_row(self.ots_client):
|
|
return
|
|
return
|
|
|
|
|
|
list_name = []
|
|
list_name = []
|
|
@@ -393,90 +710,258 @@ class Product_Dict_Manager():
|
|
|
|
|
|
#judge whether there exists records before this record created,if not process the history data
|
|
#judge whether there exists records before this record created,if not process the history data
|
|
if not self.exists_records(name,grade,create_time):
|
|
if not self.exists_records(name,grade,create_time):
|
|
- self.process_history(list_name,grade,"insert")
|
|
|
|
|
|
+ self.process_history_by_name(list_name,grade,"insert")
|
|
|
|
|
|
|
|
+ _dpdi.update_row(self.ots_client)
|
|
|
|
|
|
- def act_update(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
|
|
|
|
- # check whether there are change variable
|
|
|
|
- if original_id is None or original_id=="":
|
|
|
|
- return
|
|
|
|
- _d = {DOCUMENT_PRODUCT_DICT_ID:original_id}
|
|
|
|
- dpd = Document_product_dict(_d)
|
|
|
|
- if not dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_ALIAS,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_CREATE_TIME,DOCUMENT_PRODUCT_DICT_UPDATE_TIME,DOCUMENT_PRODUCT_DICT_STATUS],True):
|
|
|
|
- return
|
|
|
|
|
|
+ def get_updated_record(self,alias,standard_alias,remove_words,level,original_alias,original_standard_alias,original_remove_words,original_level):
|
|
|
|
|
|
- if parent_id is None or parent_id=="":
|
|
|
|
- parent_id = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
|
|
|
|
- old_name = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_NAME)
|
|
|
|
- old_name_set = set([old_name])
|
|
|
|
- _alias = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS)
|
|
|
|
- if _alias is not None:
|
|
|
|
- for s in _alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR):
|
|
|
|
- if s!="":
|
|
|
|
- old_name_set.add(s)
|
|
|
|
-
|
|
|
|
- new_name_set = set([name])
|
|
|
|
- if standard_alias is not None:
|
|
|
|
- for s in standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR):
|
|
|
|
- if s!="":
|
|
|
|
- new_name_set.add(s)
|
|
|
|
-
|
|
|
|
- if old_name!=name:
|
|
|
|
- new_id = get_document_product_dict_id(parent_id, name)
|
|
|
|
- else:
|
|
|
|
- new_id = original_id
|
|
|
|
|
|
+ original_alias_set = set()
|
|
|
|
+ original_standard_alias_set = set()
|
|
|
|
+ original_remove_words_set = set()
|
|
|
|
|
|
|
|
+ if original_alias is not None and original_alias!="":
|
|
|
|
+ _split = original_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ original_alias_set.add(_s)
|
|
|
|
+ if original_standard_alias is not None and original_standard_alias!="":
|
|
|
|
+ _split = original_standard_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ original_standard_alias_set.add(_s)
|
|
|
|
+ if original_remove_words is not None and original_remove_words!="":
|
|
|
|
+ _split = original_remove_words.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ original_remove_words_set.add(_s)
|
|
|
|
|
|
- Coll,_ = self.get_collection(grade)
|
|
|
|
|
|
+ new_alias_set = set()
|
|
|
|
+ new_standard_alias_set = set()
|
|
|
|
+ new_remove_words_set = set()
|
|
|
|
+ if alias is not None and alias!="":
|
|
|
|
+ if alias[0]=="+":
|
|
|
|
+ new_alias_set |= original_alias_set
|
|
|
|
+ _split = alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ new_alias_set.add(_s)
|
|
|
|
+ elif alias[0]=="-":
|
|
|
|
+ new_alias_set |= original_alias_set
|
|
|
|
+ _split = alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ if _s in new_alias_set:
|
|
|
|
+ new_alias_set.remove(_s)
|
|
|
|
+ else:
|
|
|
|
+ _split = alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ new_alias_set.add(_s)
|
|
|
|
+ else:
|
|
|
|
+ new_alias_set = original_alias_set
|
|
|
|
|
|
- delete_names = list(old_name_set-new_name_set)
|
|
|
|
- insert_names = list(new_name_set-old_name_set)
|
|
|
|
- # update the milvus
|
|
|
|
- if len(delete_names)>0:
|
|
|
|
- for _name in delete_names:
|
|
|
|
- delete_record_from_milvus(Coll,_name,"")
|
|
|
|
- time.sleep(1)
|
|
|
|
- if len(insert_names)>0:
|
|
|
|
- insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias)
|
|
|
|
|
|
+ if standard_alias is not None and standard_alias!="":
|
|
|
|
+ if standard_alias[0]=="+":
|
|
|
|
+ new_standard_alias_set |= original_standard_alias_set
|
|
|
|
+ _split = standard_alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ new_standard_alias_set.add(_s)
|
|
|
|
+ elif standard_alias[0]=="-":
|
|
|
|
+ new_standard_alias_set |= original_standard_alias_set
|
|
|
|
+ _split = standard_alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ if _s in new_standard_alias_set:
|
|
|
|
+ new_standard_alias_set.remove(_s)
|
|
|
|
+ else:
|
|
|
|
+ _split = standard_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ new_standard_alias_set.add(_s)
|
|
|
|
+ else:
|
|
|
|
+ new_standard_alias_set = original_standard_alias_set
|
|
|
|
+
|
|
|
|
+ if remove_words is not None and remove_words!="":
|
|
|
|
+ if remove_words[0]=="+":
|
|
|
|
+ new_remove_words_set |= original_remove_words_set
|
|
|
|
+ _split = remove_words[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ new_remove_words_set.add(_s)
|
|
|
|
+ elif remove_words[0]=="-":
|
|
|
|
+ new_remove_words_set |= original_remove_words_set
|
|
|
|
+ _split = remove_words[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ if _s in new_remove_words_set:
|
|
|
|
+ new_remove_words_set.remove(_s)
|
|
|
|
+ else:
|
|
|
|
+ _split = remove_words.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
|
|
|
|
+ for _s in _split:
|
|
|
|
+ _s = _s.strip()
|
|
|
|
+ if _s=="":
|
|
|
|
+ continue
|
|
|
|
+ new_remove_words_set.add(_s)
|
|
|
|
+ else:
|
|
|
|
+ new_remove_words_set = original_remove_words_set
|
|
|
|
+ update_flag = False
|
|
|
|
+ milvus_update_flag = False
|
|
|
|
+ if len(new_alias_set&original_alias_set)!=len(new_alias_set):
|
|
|
|
+ update_flag = True
|
|
|
|
+ if len(new_standard_alias_set&original_remove_words_set)!=len(new_standard_alias_set):
|
|
|
|
+ update_flag = True
|
|
|
|
+ milvus_update_flag = True
|
|
|
|
+ if len(new_remove_words_set&original_remove_words_set)!=len(new_remove_words_set):
|
|
|
|
+ update_flag = True
|
|
|
|
+ milvus_update_flag = True
|
|
|
|
+ if str(level)!=str(original_level):
|
|
|
|
+ update_flag = True
|
|
|
|
+ milvus_update_flag = True
|
|
|
|
+
|
|
|
|
+ return update_flag,milvus_update_flag,original_alias_set,original_standard_alias_set,original_remove_words_set,new_alias_set,new_standard_alias_set,new_remove_words_set
|
|
|
|
+
|
|
|
|
+ def act_update(self,name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level):
|
|
|
|
+ # check whether there are change variable
|
|
|
|
|
|
- # process history
|
|
|
|
- if len(delete_names)>0:
|
|
|
|
- self.process_history([old_name],grade,"update")
|
|
|
|
- if len(insert_names)>0:
|
|
|
|
- self.process_history(insert_names,grade,"insert")
|
|
|
|
|
|
+ _interface_id = get_milvus_product_dict_id(name)
|
|
|
|
+ _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:_interface_id}
|
|
|
|
+ _dpdi = Document_product_dict_interface(_d)
|
|
|
|
+ if not _dpdi.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS,DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS,DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL],True):
|
|
|
|
+ return
|
|
|
|
|
|
|
|
+ original_alias = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS)
|
|
|
|
+ original_standard_alias = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
|
|
|
|
+ original_remove_words = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS)
|
|
|
|
+ original_level = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL)
|
|
|
|
|
|
|
|
+ update_flag,milvus_update_flag,original_alias_set,original_standard_alias_set,original_remove_words_set,new_alias_set,new_standard_alias_set,new_remove_words_set = self.get_updated_record(alias,standard_alias,remove_words,level,original_alias,original_standard_alias,original_remove_words,original_level)
|
|
|
|
|
|
- # update document_product_dict
|
|
|
|
- _d = {DOCUMENT_PRODUCT_DICT_ID:new_id,
|
|
|
|
- DOCUMENT_PRODUCT_DICT_NAME:name,
|
|
|
|
- DOCUMENT_PRODUCT_DICT_GRADE:grade,
|
|
|
|
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
|
|
|
|
- DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias}
|
|
|
|
|
|
+ if not update_flag:
|
|
|
|
+ return
|
|
|
|
|
|
- if alias is not None and alias!="":
|
|
|
|
- _d[DOCUMENT_PRODUCT_DICT_ALIAS] = alias
|
|
|
|
- if parent_id is not None and parent_id!="":
|
|
|
|
- _d[DOCUMENT_PRODUCT_DICT_PARENT_ID] = parent_id
|
|
|
|
- if old_name!=name:
|
|
|
|
- _d[DOCUMENT_PRODUCT_DICT_CREATE_TIME] = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_CREATE_TIME)
|
|
|
|
- _d[DOCUMENT_PRODUCT_DICT_UPDATE_TIME] = getCurrent_date(format="%Y-%m-%d %H:%M:%S")
|
|
|
|
- _d[DOCUMENT_PRODUCT_DICT_STATUS] = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_STATUS)
|
|
|
|
|
|
+ interface_id = get_milvus_product_dict_id(name)
|
|
|
|
+ final_alias = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_alias_set))
|
|
|
|
+ final_standard_alias = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_standard_alias_set))
|
|
|
|
+ final_remove_words = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_remove_words_set))
|
|
|
|
+ if parent_id is None:
|
|
|
|
+ parent_id = ""
|
|
|
|
+ if level is None or level=="":
|
|
|
|
+ level = 1
|
|
|
|
|
|
|
|
|
|
|
|
+ delete_standard_names = list(original_standard_alias_set-new_standard_alias_set)
|
|
|
|
+ insert_standard_names = list(new_standard_alias_set-original_standard_alias_set)
|
|
|
|
|
|
- dpd = Document_product_dict(_d)
|
|
|
|
- dpd.update_row(self.ots_client)
|
|
|
|
|
|
+ delete_remove_words = list(original_remove_words_set-new_remove_words_set)
|
|
|
|
+ insert_remove_words = list(new_remove_words_set-original_remove_words_set)
|
|
|
|
|
|
- if old_name!=name:
|
|
|
|
- # in the case of name changed ,delete the old name row
|
|
|
|
- _d = {DOCUMENT_PRODUCT_DICT_ID:original_id}
|
|
|
|
- dpd = Document_product_dict(_d)
|
|
|
|
|
|
+ log("update_interface delete_standard_names:%s insert_standard_names:%s delete_remove_words:%s insert_remove_words:%s"%(str(delete_standard_names),str(insert_standard_names),str(delete_remove_words),str(insert_remove_words)))
|
|
|
|
+ # update the milvus
|
|
|
|
+ Coll,_ = self.get_collection(grade)
|
|
|
|
+ if milvus_update_flag:
|
|
|
|
+ insert_new_record_to_milvus(Coll,name,grade,parent_id,final_standard_alias,final_remove_words,level)
|
|
|
|
+ if len(delete_standard_names)>0:
|
|
|
|
+ for _name in delete_standard_names:
|
|
|
|
+ delete_record_from_milvus(Coll,_name,"")
|
|
|
|
|
|
- self.recurse_update_dict(original_id,new_id)
|
|
|
|
|
|
+ # update document_product_dict
|
|
|
|
+ # update alias
|
|
|
|
+ if len(new_alias_set&original_alias_set)!=len(new_alias_set):
|
|
|
|
+ bool_query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_NAME,name),
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
|
|
|
|
+ SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ ColumnsToGet(return_type=ColumnReturnType.NONE))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ log("update dict table alias %d counts"%(total_count))
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
|
|
|
|
+ SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ ColumnsToGet(return_type=ColumnReturnType.NONE))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+ for _data in list_data:
|
|
|
|
+ dpd = Document_product_dict(_data)
|
|
|
|
+ dpd.setValue(DOCUMENT_PRODUCT_DICT_ALIAS,final_alias,True)
|
|
|
|
+ dpd.update_row(self.ots_client)
|
|
|
|
|
|
- dpd.delete_row(self.ots_client)
|
|
|
|
- # change the next level parent_id
|
|
|
|
|
|
+ #if merge current names then update dict
|
|
|
|
+ for _name in insert_standard_names:
|
|
|
|
+ if _name==name:
|
|
|
|
+ continue
|
|
|
|
+ bool_query = BoolQuery(must_queries=[
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_NAME,_name),
|
|
|
|
+ TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
|
|
|
|
+ ])
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
|
|
|
|
+ SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
|
|
|
|
+ ColumnsToGet(return_type=ColumnReturnType.NONE))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ log("delete dict table %d counts"%(total_count))
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
|
|
|
|
+ SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
|
|
|
|
+ ColumnsToGet(return_type=ColumnReturnType.NONE))
|
|
|
|
+ list_data.extend(getRow_ots(rows))
|
|
|
|
+ for _data in list_data:
|
|
|
|
+ dpd = Document_product_dict(_data)
|
|
|
|
+ _id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
|
|
|
|
+ log("delete id:%s"%(_id))
|
|
|
|
+ self.recurse_delete_dict(_id)
|
|
|
|
+ dpd.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
+ # process history
|
|
|
|
+ if len(delete_standard_names)>0:
|
|
|
|
+ self.process_history_by_standard_name(name,grade,delete_standard_names,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE)
|
|
|
|
+ if len(insert_standard_names)>0:
|
|
|
|
+ self.process_history_by_standard_name(name,grade,insert_standard_names,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
|
|
|
|
+
|
|
|
|
+ if len(delete_remove_words)>0:
|
|
|
|
+ self.process_history_by_remove_words(name,grade,delete_remove_words,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE)
|
|
|
|
+ if len(insert_remove_words)>0:
|
|
|
|
+ self.process_history_by_remove_words(name,grade,insert_remove_words,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
|
|
|
|
+
|
|
|
|
+ _interface_d = {
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:final_alias,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300),
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:parent_id,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:final_standard_alias,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:final_remove_words,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL:level
|
|
|
|
+ }
|
|
|
|
+ _dpdi = Document_product_dict_interface(_interface_d)
|
|
|
|
+ _dpdi.update_row(self.ots_client)
|
|
|
|
|
|
|
|
|
|
def recurse_update_dict(self,parent_id,new_parent_id):
|
|
def recurse_update_dict(self,parent_id,new_parent_id):
|
|
@@ -527,6 +1012,10 @@ class Product_Dict_Manager():
|
|
columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
|
|
columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
|
|
list_data.extend(getRow_ots(rows))
|
|
list_data.extend(getRow_ots(rows))
|
|
|
|
|
|
|
|
+ interface_id = get_milvus_product_dict_id(name)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
#delete milvus records
|
|
#delete milvus records
|
|
Coll,_ = self.get_collection(grade)
|
|
Coll,_ = self.get_collection(grade)
|
|
|
|
|
|
@@ -534,7 +1023,7 @@ class Product_Dict_Manager():
|
|
time.sleep(1)
|
|
time.sleep(1)
|
|
|
|
|
|
#process_history data
|
|
#process_history data
|
|
- self.process_history([name],grade,"delete")
|
|
|
|
|
|
+ self.process_history_by_name([name],grade,"delete")
|
|
|
|
|
|
#delete document_product_dict
|
|
#delete document_product_dict
|
|
log("delete document_product_dict name:%s grade:%s count:%s"%(str(name),str(grade),str(len(list_data))))
|
|
log("delete document_product_dict name:%s grade:%s count:%s"%(str(name),str(grade),str(len(list_data))))
|
|
@@ -547,6 +1036,12 @@ class Product_Dict_Manager():
|
|
dpd = Document_product_dict(_d)
|
|
dpd = Document_product_dict(_d)
|
|
dpd.delete_row(self.ots_client)
|
|
dpd.delete_row(self.ots_client)
|
|
|
|
|
|
|
|
+ _interface_d = {
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
|
|
|
|
+ }
|
|
|
|
+ _dpdi = Document_product_dict_interface(_interface_d)
|
|
|
|
+ _dpdi.delete_row(self.ots_client)
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
def recurse_delete_dict(self,id):
|
|
def recurse_delete_dict(self,id):
|
|
@@ -604,15 +1099,17 @@ class Product_Dict_Manager():
|
|
alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS)
|
|
alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS)
|
|
grade = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE)
|
|
grade = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE)
|
|
original_id = item.get(DOCUMENT_PRODUCT_DICT_ORIGINAL_ID)
|
|
original_id = item.get(DOCUMENT_PRODUCT_DICT_ORIGINAL_ID)
|
|
- parent_id = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID)
|
|
|
|
|
|
+ parent_id = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID,"")
|
|
standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
|
|
standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
|
|
create_time = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)
|
|
create_time = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)
|
|
|
|
+ remove_words = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS,'')
|
|
|
|
+ level = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL,1)
|
|
|
|
|
|
if name is not None and len(name)>1 and len(name)<MAX_NAME_LENGTH:
|
|
if name is not None and len(name)>1 and len(name)<MAX_NAME_LENGTH:
|
|
if action=="insert":
|
|
if action=="insert":
|
|
- self.act_insert(name,alias,grade,original_id,parent_id,standard_alias,create_time)
|
|
|
|
|
|
+ self.act_insert(name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level)
|
|
elif action=="update":
|
|
elif action=="update":
|
|
- self.act_update(name,alias,grade,original_id,parent_id,standard_alias,create_time)
|
|
|
|
|
|
+ self.act_update(name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level)
|
|
elif action=="delete":
|
|
elif action=="delete":
|
|
self.act_delete(name,alias,grade,original_id,parent_id,standard_alias,create_time)
|
|
self.act_delete(name,alias,grade,original_id,parent_id,standard_alias,create_time)
|
|
|
|
|
|
@@ -882,7 +1379,18 @@ def insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_
|
|
[level]
|
|
[level]
|
|
]
|
|
]
|
|
insert_embedding(Coll,data)
|
|
insert_embedding(Coll,data)
|
|
- return True
|
|
|
|
|
|
+ while 1:
|
|
|
|
+ try:
|
|
|
|
+ log("milvus insert wait for done")
|
|
|
|
+ list_result = Coll.query(expr=expr,output_fields=["standard_name"])
|
|
|
|
+ log("list_result"+str(list_result)+str(type(list_result[0])))
|
|
|
|
+ if len(list_result)==1:
|
|
|
|
+ if list_result[0].get("standard_name","")==name:
|
|
|
|
+ log("milvus insert done")
|
|
|
|
+ return True
|
|
|
|
+ time.sleep(1)
|
|
|
|
+ except Exception as e:
|
|
|
|
+ traceback.print_exc()
|
|
|
|
|
|
def delete_record_from_milvus(Coll,name,standard_alias):
|
|
def delete_record_from_milvus(Coll,name,standard_alias):
|
|
|
|
|
|
@@ -906,6 +1414,12 @@ def delete_record_from_milvus(Coll,name,standard_alias):
|
|
|
|
|
|
expr = " ots_id in ['%s']"%_id
|
|
expr = " ots_id in ['%s']"%_id
|
|
Coll.delete(expr)
|
|
Coll.delete(expr)
|
|
|
|
+ while 1:
|
|
|
|
+ if len(Coll.query(expr=expr))==0:
|
|
|
|
+ return
|
|
|
|
+ else:
|
|
|
|
+ log("milvus delete wait for done")
|
|
|
|
+ time.sleep(1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -923,7 +1437,7 @@ def dict_interface_delete(name,grade,ots_client = getConnect_ots()):
|
|
|
|
|
|
def interface_deletes():
|
|
def interface_deletes():
|
|
a = '''
|
|
a = '''
|
|
- 明细
|
|
|
|
|
|
+ 按采购需求执行
|
|
'''
|
|
'''
|
|
grade = 4
|
|
grade = 4
|
|
ots_client=getConnect_ots()
|
|
ots_client=getConnect_ots()
|
|
@@ -934,6 +1448,26 @@ def interface_deletes():
|
|
print(s)
|
|
print(s)
|
|
dict_interface_delete(s,grade,ots_client)
|
|
dict_interface_delete(s,grade,ots_client)
|
|
|
|
|
|
|
|
+def interface_update():
|
|
|
|
+ name = "保健"
|
|
|
|
+ new_standard_alias = ""
|
|
|
|
+ new_remove_words = "+设备"
|
|
|
|
+ grade = 4
|
|
|
|
+ ots_client = getConnect_ots()
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ from uuid import uuid4
|
|
|
|
+ _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:new_remove_words,
|
|
|
|
+ DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
|
|
|
|
+ dpdi = Document_product_dict_interface(_d)
|
|
|
|
+ dpdi.update_row(ots_client)
|
|
|
|
+
|
|
def clean_brands():
|
|
def clean_brands():
|
|
from queue import Queue as TQueue
|
|
from queue import Queue as TQueue
|
|
task_queue = TQueue()
|
|
task_queue = TQueue()
|
|
@@ -1011,8 +1545,70 @@ def clean_brands():
|
|
for _name in list_illegal:
|
|
for _name in list_illegal:
|
|
f.write("%s\n"%(_name))
|
|
f.write("%s\n"%(_name))
|
|
|
|
|
|
|
|
+def clean_product_dict():
|
|
|
|
+ ots_client = getConnect_ots()
|
|
|
|
+ bool_query = BoolQuery(must_queries=[
|
|
|
|
+ RangeQuery("status",0)
|
|
|
|
+ ])
|
|
|
|
+ task_queue = Queue()
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
|
|
|
|
+ SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
|
|
|
|
+ columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ for _data in list_data:
|
|
|
|
+ task_queue.put(_data)
|
|
|
|
+ print("%d/%d"%(task_queue.qsize(),total_count))
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
|
|
|
|
+ SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
|
|
|
|
+ columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ for _data in list_data:
|
|
|
|
+ task_queue.put(_data)
|
|
|
|
+ print("%d/%d"%(task_queue.qsize(),total_count))
|
|
|
|
+
|
|
|
|
+ def _handle(item,result_queue):
|
|
|
|
+ _dpd = Document_product_dict(item)
|
|
|
|
+ _dpd.delete_row(ots_client)
|
|
|
|
+ mt = MultiThreadHandler(task_queue,_handle,None,30)
|
|
|
|
+ mt.run()
|
|
|
|
+
|
|
|
|
+def clean_product_dict_interface():
|
|
|
|
+ ots_client = getConnect_ots()
|
|
|
|
+ bool_query = BoolQuery(must_queries=[
|
|
|
|
+ BoolQuery(should_queries=[
|
|
|
|
+ TermQuery("action","insert"),
|
|
|
|
+ TermQuery("action","base")
|
|
|
|
+ ])
|
|
|
|
+ ])
|
|
|
|
+ task_queue = Queue()
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
|
|
|
|
+ SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
|
|
|
|
+ columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ for _data in list_data:
|
|
|
|
+ task_queue.put(_data)
|
|
|
|
+ print("%d/%d"%(task_queue.qsize(),total_count))
|
|
|
|
+ while next_token:
|
|
|
|
+ rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
|
|
|
|
+ SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
|
|
|
|
+ columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
|
|
|
|
+ list_data = getRow_ots(rows)
|
|
|
|
+ for _data in list_data:
|
|
|
|
+ task_queue.put(_data)
|
|
|
|
+ print("%d/%d"%(task_queue.qsize(),total_count))
|
|
|
|
+
|
|
|
|
+ def _handle(item,result_queue):
|
|
|
|
+ _dpd = Document_product_dict_interface(item)
|
|
|
|
+ _dpd.delete_row(ots_client)
|
|
|
|
+ mt = MultiThreadHandler(task_queue,_handle,None,30)
|
|
|
|
+ mt.run()
|
|
|
|
+
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
# start_embedding_product_dict()
|
|
# start_embedding_product_dict()
|
|
- interface_deletes()
|
|
|
|
|
|
+ # interface_deletes()
|
|
|
|
+ # interface_update()
|
|
# clean_similar()
|
|
# clean_similar()
|
|
- # clean_brands()
|
|
|
|
|
|
+ # clean_brands()
|
|
|
|
+ # clean_product_dict()
|
|
|
|
+ clean_product_dict_interface()
|