123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- from BaseDataMaintenance.model.ots.BaseModel import BaseModel
- DOCUMENT_PRODUCT_DICT_ID = "id"
- DOCUMENT_PRODUCT_DICT_NAME = "name"
- DOCUMENT_PRODUCT_DICT_ALIAS = "alias"
- DOCUMENT_PRODUCT_DICT_GRADE = "grade"
- DOCUMENT_PRODUCT_DICT_STATUS = "status"
- DOCUMENT_PRODUCT_DICT_PARENT_ID = "parent_id"
- DOCUMENT_PRODUCT_DICT_CREATE_TIME = "create_time"
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME = "update_time"
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED = "is_synchonized"
- DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS = "standard_alias"
- DOCUMENT_PRODUCT_DICT_REMOVE_WORDS = "remove_words"
- DOCUMENT_PRODUCT_DICT_LEVEL = "level"
- DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR = "|"
- MAX_NAME_LENGTH = 300
- Document_product_dict_table_name = "document_product_dict2"
- class Document_product_dict(BaseModel):
- def __init__(self,_dict):
- BaseModel.__init__(self)
- for k,v in _dict.items():
- self.setValue(k,v,True)
- self.table_name = Document_product_dict_table_name
- def getPrimary_keys(self):
- return ["id"]
- def updateAlias(self,name):
- name = str(name).lower()
- alias = self.getProperties().get(DOCUMENT_PRODUCT_DICT_ALIAS,"")
- l_alias = alias.split("&&")
- if name not in set(l_alias):
- alias+="&&%s"%name
- self.setValue(DOCUMENT_PRODUCT_DICT_ALIAS,alias,True)
- return True
- return False
- from BaseDataMaintenance.common.documentFingerprint import getMD5
- def get_document_product_dict_id(parent_md5,name):
- return getMD5(parent_md5+"&&%s"%name)
- def get_document_product_dict_standard_alias_id(name):
- return get_milvus_product_dict_id(name)
- def get_milvus_standard_name(name):
- return "%s"%(str(name)[:MAX_NAME_LENGTH].lower())
- def get_milvus_product_dict_id(name):
- return getMD5(get_milvus_standard_name(name))
- from BaseDataMaintenance.model.ots.document_product import *
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from tablestore import *
- from BaseDataMaintenance.common.Utils import *
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from queue import Queue
- def move_document_product_dict():
- bool_query = BoolQuery(must_queries=[
- ExistsQuery(DOCUMENT_PRODUCT_NAME)
- ])
- ots_client = getConnect_ots()
- Document_product_table_name = "document_product_dict"
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("grade")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data.extend(getRow_ots(rows))
- print("%d/%d"%(len(list_data),total_count))
- # if len(list_data)>=2000:
- # break
- task_queue = Queue()
- dict_id_dict = {}
- for _data in list_data:
- task_queue.put(_data)
- id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
- dict_id_dict[id] = _data
- def _handle(item,result_queue):
- status = item.get(DOCUMENT_PRODUCT_DICT_STATUS)
- D1 = Document_product_dict(item)
- if status==1:
- D1.update_row(ots_client)
- D1.table_name = Document_product_table_name
- D1.delete_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- if __name__ == '__main__':
- # print(get_milvus_product_dict_id("-sl-10xls"))
- move_document_product_dict()
|