from BaseDataMaintenance.model.ots.BaseModel import BaseModel DOCUMENT_PRODUCT_DICT_ID = "id" DOCUMENT_PRODUCT_DICT_NAME = "name" DOCUMENT_PRODUCT_DICT_ALIAS = "alias" DOCUMENT_PRODUCT_DICT_GRADE = "grade" DOCUMENT_PRODUCT_DICT_STATUS = "status" DOCUMENT_PRODUCT_DICT_PARENT_ID = "parent_id" DOCUMENT_PRODUCT_DICT_CREATE_TIME = "create_time" DOCUMENT_PRODUCT_DICT_UPDATE_TIME = "update_time" DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED = "is_synchonized" DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS = "standard_alias" DOCUMENT_PRODUCT_DICT_REMOVE_WORDS = "remove_words" DOCUMENT_PRODUCT_DICT_LEVEL = "level" DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR = "|" MAX_NAME_LENGTH = 300 Document_product_dict_table_name = "document_product_dict2" class Document_product_dict(BaseModel): def __init__(self,_dict): BaseModel.__init__(self) for k,v in _dict.items(): self.setValue(k,v,True) self.table_name = Document_product_dict_table_name def getPrimary_keys(self): return ["id"] def updateAlias(self,name): name = str(name).lower() alias = self.getProperties().get(DOCUMENT_PRODUCT_DICT_ALIAS,"") l_alias = alias.split("&&") if name not in set(l_alias): alias+="&&%s"%name self.setValue(DOCUMENT_PRODUCT_DICT_ALIAS,alias,True) return True return False from BaseDataMaintenance.common.documentFingerprint import getMD5 def get_document_product_dict_id(parent_md5,name): return getMD5(parent_md5+"&&%s"%name) def get_document_product_dict_standard_alias_id(name): return get_milvus_product_dict_id(name) def get_milvus_standard_name(name): return "%s"%(str(name)[:MAX_NAME_LENGTH].lower()) def get_milvus_product_dict_id(name): return getMD5(get_milvus_standard_name(name)) from BaseDataMaintenance.model.ots.document_product import * from BaseDataMaintenance.dataSource.source import getConnect_ots from tablestore import * from BaseDataMaintenance.common.Utils import * from BaseDataMaintenance.common.multiThread import MultiThreadHandler from queue import Queue def move_document_product_dict(): bool_query = BoolQuery(must_queries=[ ExistsQuery(DOCUMENT_PRODUCT_NAME) ]) ots_client = getConnect_ots() Document_product_table_name = "document_product_dict" rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index", SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("grade")]),limit=100,get_total_count=True), ColumnsToGet(return_type=ColumnReturnType.ALL)) list_data = getRow_ots(rows) while next_token: rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index", SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True), ColumnsToGet(return_type=ColumnReturnType.ALL)) list_data.extend(getRow_ots(rows)) print("%d/%d"%(len(list_data),total_count)) # if len(list_data)>=2000: # break task_queue = Queue() dict_id_dict = {} for _data in list_data: task_queue.put(_data) id = _data.get(DOCUMENT_PRODUCT_DICT_ID) dict_id_dict[id] = _data def _handle(item,result_queue): status = item.get(DOCUMENT_PRODUCT_DICT_STATUS) D1 = Document_product_dict(item) if status==1: D1.update_row(ots_client) D1.table_name = Document_product_table_name D1.delete_row(ots_client) mt = MultiThreadHandler(task_queue,_handle,None,30) mt.run() if __name__ == '__main__': # print(get_milvus_product_dict_id("-sl-10xls")) move_document_product_dict()