123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974 |
- from BaseDataMaintenance.common.milvusUtil import *
- from multiprocessing import Process,Queue
- from BaseDataMaintenance.common.Utils import *
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.model.ots.document_product_dict import *
- from BaseDataMaintenance.model.ots.document_product_dict_interface import *
- from BaseDataMaintenance.model.ots.document_product import *
- from BaseDataMaintenance.model.ots.document_product_tmp import *
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from tablestore import *
- from BaseDataMaintenance.common.Utils import getRow_ots
- from BaseDataMaintenance.maintenance.product.productUtils import *
- import time
- import traceback
- import json
- import requests
- from random import randint
- IS_SYNCHONIZED = 3
- class Product_Dict_Manager():
- def __init__(self):
- self.collection_name_name = COLLECTION_NAME_NAME
- self.collection_name_brand = COLLECTION_NAME_BRAND
- self.collection_name_specs = COLLECTION_NAME_SPECS
- self.search_params = {"metric_type":"IP",
- "params":{"nprobe":10}}
- self.init_milvus()
- self.ots_client = getConnect_ots()
- self.queue_product_dict = Queue()
- self.queue_product_interface = Queue()
- self.session = requests.Session()
- self.Coll_name = getCollection(self.collection_name_name)
- self.Coll_brand = getCollection(self.collection_name_brand)
- self.Coll_specs = getCollection(self.collection_name_specs)
- # self.pool_name = ConnectorPool(init_num=10,max_num=30,method_init=getCollection,collection_name=self.collection_name_name)
- #
- # self.pool_brand = ConnectorPool(init_num=10,max_num=30,method_init=getCollection,collection_name=self.collection_name_brand)
- # self.pool_specs = ConnectorPool(init_num=10,max_num=30,method_init=getCollection,collection_name=self.collection_name_specs)
- def init_milvus(self):
- from pymilvus import connections,FieldSchema,DataType
- fields = [
- # FieldSchema(name="pk_id",dtype=DataType.INT64,is_primary=True,auto_id=True), # pk is the same as ots
- FieldSchema(name="ots_id",dtype=DataType.VARCHAR,max_length=32,is_primary=True),
- FieldSchema(name="ots_name",dtype=DataType.VARCHAR,max_length=MAX_NAME_LENGTH),
- FieldSchema(name="standard_name",dtype=DataType.VARCHAR,max_length=MAX_NAME_LENGTH),
- FieldSchema(name="standard_name_id",dtype=DataType.VARCHAR,max_length=32),
- FieldSchema(name="embedding",dtype=DataType.FLOAT_VECTOR,dim=1024),
- FieldSchema(name="ots_parent_id",dtype=DataType.VARCHAR,max_length=32),
- FieldSchema(name="ots_grade",dtype=DataType.INT64),
- FieldSchema(name="remove_words",dtype=DataType.VARCHAR,max_length=3000),
- FieldSchema(name="level",dtype=DataType.INT64),
- ]
- index_name = "embedding"
- index_params = {"params":{"nlist":2048},
- "index_type":"IVF_SQ8",
- "metric_type":"IP"}
- init_milvus(milvus_host)
- #build the product name brand specs embedding respectively
- create_embedding_schema(self.collection_name_name,fields,index_name,index_params)
- create_embedding_schema(self.collection_name_brand,fields,index_name,index_params)
- create_embedding_schema(self.collection_name_specs,fields,index_name,index_params)
- def get_collection(self,grade):
- Coll = None
- Coll_name = None
- if grade ==SPECS_GRADE:
- Coll = self.Coll_specs
- Coll_name = self.collection_name_specs
- if grade == BRAND_GRADE:
- Coll = self.Coll_brand
- Coll_name = self.collection_name_brand
- if grade == NAME_GRADE:
- Coll = self.Coll_name
- Coll_name = self.collection_name_name
- return Coll,Coll_name
- def embedding_producer(self,columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,DOCUMENT_PRODUCT_DICT_LEVEL]):
- bool_query = BoolQuery(
- must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
- RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,3,5,True,True)
- ],
- must_not_queries=[TermQuery(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _d in list_dict:
- self.queue_product_dict.put(_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _d in list_dict:
- self.queue_product_dict.put(_d)
- if self.queue_product_dict.qsize()>=10000:
- break
- log("product_dict embedding total_count:%d"%total_count)
- def embedding_comsumer(self):
- def handle(item,result_queue):
- try:
- _id = item.get(DOCUMENT_PRODUCT_DICT_ID)
- name = str(item.get(DOCUMENT_PRODUCT_DICT_NAME))[:MAX_NAME_LENGTH]
- parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
- grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
- Coll,_ = self.get_collection(grade)
- standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
- remove_words = item.get(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,"")
- level = item.get(DOCUMENT_PRODUCT_DICT_LEVEL,1)
- if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level,wait_sync=False):
- _pd = Document_product_dict_interface({DOCUMENT_PRODUCT_DICT_ID:_id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED})
- _pd.update_row(self.ots_client)
- except Exception as e:
- traceback.print_exc()
- self.embedding_producer()
- start_time = time.time()
- q_size = self.queue_product_dict.qsize()
- mt = MultiThreadHandler(self.queue_product_dict,handle,None,5,1)
- mt.run()
- log("process embedding %d records cost %.2f s"%(q_size,time.time()-start_time))
- def process_history_name(self,list_name,action):
- if action=="insert":
- # search document_product_temp and update status
- # query in blur mode
- for name in list_name:
- should_q = self.make_query(name,DOCUMENT_PRODUCT_TMP_NAME,MatchPhraseQuery,4,5)
- if should_q is None:
- continue
- bool_query =BoolQuery(must_queries=[
- RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,401,450,True,True),
- should_q
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _data in list_data:
- id = _data.get(DOCUMENT_PRODUCT_TMP_ID)
- status = randint(1,50)
- _d = {DOCUMENT_PRODUCT_TMP_ID:id,
- DOCUMENT_PRODUCT_TMP_STATUS:status}
- _dt = Document_product_tmp(_d)
- _dt.update_row(self.ots_client)
- elif action=="delete":
- # delete document_product
- # update temp new_id and status to 401-450
- for name in list_name:
- bool_query = self.make_query(name,DOCUMENT_PRODUCT_NAME,TermQuery,len(name),5)
- if bool_query is not None:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- _d = {DOCUMENT_PRODUCT_ID:_id}
- dp = Document_product(_d)
- dp.delete_row(self.ots_client)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:randint(401,450)}
- dpt = Document_product_tmp(_d)
- dpt.update_row(self.ots_client)
- elif action=="update":
- # delete document_product and update document_product_temp to rerun
- for name in list_name:
- bool_query = self.make_query(name,DOCUMENT_PRODUCT_NAME,TermQuery,len(name),5)
- if bool_query is not None:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- def process_history_brand(self,list_brand,action):
- # search document_product and rerun
- for name in list_brand:
- if action=="insert":
- name = re.sub("有限|责任|公司",'',name)
- bool_query = self.make_query(name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,4,5)
- else:
- bool_query = self.make_query(name,DOCUMENT_PRODUCT_BRAND,TermQuery,len(name),5)
- if bool_query is not None:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- log("insert brand %s %d counts"%(name,total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- def process_history_specs(self,list_specs,action):
- # search document_product and rerun
- for name in list_specs:
- if action=="insert":
- bool_query = self.make_query(name,DOCUMENT_PRODUCT_ORIGINAL_SPECS,MatchPhraseQuery,len(name),5)
- else:
- bool_query = self.make_query(name,DOCUMENT_PRODUCT_SPECS,TermQuery,len(name),5)
- if bool_query is not None:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- def rerun(self,id,original_id):
- _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
- dpt = Document_product_tmp(_d)
- dpt.update_row(self.ots_client)
- _d = {DOCUMENT_PRODUCT_ID:id}
- dp = Document_product(_d)
- dp.delete_row(self.ots_client)
- def make_query(self,name,column,query_type,min_len,strides):
- should_q = []
- strides_spce = len(name)-min_len+1
- for _i in range(min(strides_spce,strides)):
- _s = str(name)[_i:min_len]
- if query_type==WildcardQuery:
- should_q.append(query_type(column,"*%s*"%_s))
- elif query_type==TermQuery or query_type==MatchPhraseQuery:
- should_q.append(query_type(column,"%s"%(_s)))
- if len(should_q)>0:
- return BoolQuery(should_queries=should_q)
- return None
- def process_history_by_name(self,list_name,grade,action):
- assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_UPDATE,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
- if grade==NAME_GRADE:
- self.process_history_name(list_name,action)
- elif grade==BRAND_GRADE:
- self.process_history_brand(list_name,action)
- elif grade==SPECS_GRADE:
- self.process_history_specs(list_name,action)
- def process_history_by_standard_name(self,name,grade,list_name,action):
- assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
- if grade==NAME_GRADE:
- if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
- for n_name in list_name:
- bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_NAME,TermQuery,len(n_name),5)
- if bool_query is not None:
- _query = bool_query
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- total_count = total_count
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_name_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
- for dict_id in [dict_name_id]:
- if dict_id is not None and dict_id!="":
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
- self.recurse_delete_dict(dict_name_id)
- dpd.delete_row(self.ots_client)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
- if total_count==0:
- self.process_history_name([n_name],action)
- elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
- for n_name in list_name:
- bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_NAME,MatchPhraseQuery,len(n_name),5)
- if bool_query is not None:
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_NAME,name),
- bool_query
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
- if dict_brand_id is not None and dict_brand_id!="":
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,get_total_count=True))
- if total_count==1:
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
- self.recurse_delete_dict(dict_brand_id)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- if grade==BRAND_GRADE:
- if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
- for n_name in list_name:
- bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_BRAND,TermQuery,len(n_name),5)
- if bool_query is not None:
- _query = bool_query
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
- for dict_id in [dict_brand_id]:
- if dict_id is not None and dict_id!="":
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
- self.recurse_delete_dict(dict_brand_id)
- dpd.delete_row(self.ots_client)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
- if total_count==0:
- self.process_history_brand([n_name],action)
- elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
- for n_name in list_name:
- bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,4,5)
- if bool_query is not None:
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_BRAND,name),
- bool_query
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
- if dict_brand_id is not None and dict_brand_id!="":
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,get_total_count=True))
- if total_count==1:
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
- self.recurse_delete_dict(dict_brand_id)
- dpd.delete_row(self.ots_client)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- if grade==SPECS_GRADE:
- if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
- for n_name in list_name:
- bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_SPECS,TermQuery,len(n_name),5)
- if bool_query is not None:
- _query = bool_query
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
- for dict_id in [dict_specs_id]:
- if dict_id is not None and dict_id!="":
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
- self.recurse_delete_dict(dict_specs_id)
- dpd.delete_row(self.ots_client)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
- if total_count==0:
- self.process_history_specs([n_name],action)
- elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_SPECS,name),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
- if dict_specs_id is not None and dict_specs_id!="":
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_SPECS_ID,dict_brand_id)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,get_total_count=True))
- if total_count==1:
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_specs_id})
- self.recurse_delete_dict(dict_specs_id)
- dpd.delete_row(self.ots_client)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- def process_history_by_remove_words(self,name,grade,list_name,action):
- assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
- if grade==NAME_GRADE:
- if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
- for n_name in list_name:
- bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_NAME,MatchPhraseQuery,len(n_name),5)
- if bool_query is not None:
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_NAME,name),
- bool_query
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
- if dict_brand_id is not None and dict_brand_id!="":
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,get_total_count=True))
- if total_count==1:
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
- self.recurse_delete_dict(dict_brand_id)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
- self.process_history_name(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
- if grade==BRAND_GRADE:
- if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
- for n_name in list_name:
- bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,len(n_name),5)
- if bool_query is not None:
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_BRAND,name),
- bool_query
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
- if dict_brand_id is not None and dict_brand_id!="":
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,get_total_count=True))
- if total_count==1:
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
- self.recurse_delete_dict(dict_brand_id)
- dpd.delete_row(self.ots_client)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
- self.process_history_brand(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
- if grade==SPECS_GRADE:
- if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
- for n_name in list_name:
- bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_SPECS,MatchPhraseQuery,len(n_name),5)
- if bool_query is not None:
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_SPECS,name),
- bool_query
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
- if dict_specs_id is not None and dict_specs_id!="":
- _query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_SPECS_ID,dict_brand_id)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(_query,get_total_count=True))
- if total_count==1:
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_specs_id})
- self.recurse_delete_dict(dict_specs_id)
- dpd.delete_row(self.ots_client)
- _id = _d.get(DOCUMENT_PRODUCT_ID)
- original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- self.rerun(_id,original_id)
- elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
- self.process_history_specs(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
- def exists_records(self,name,grade,create_time):
- term_columns = None
- if grade==NAME_GRADE:
- term_columns = DOCUMENT_PRODUCT_NAME
- elif grade==BRAND_GRADE:
- term_columns = DOCUMENT_PRODUCT_BRAND
- elif grade==SPECS_GRADE:
- term_columns = DOCUMENT_PRODUCT_SPECS
- if term_columns is not None:
- bool_query = BoolQuery(must_queries=[
- TermQuery(term_columns,str(name)),
- RangeQuery(DOCUMENT_PRODUCT_DICT_CREATE_TIME,None,str(create_time))
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,get_total_count=True,limit=1),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count>0:
- return True
- return False
- def act_insert(self,name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level):
- #update document_product_dict
- if original_id is None or original_id=="":
- if parent_id is None:
- original_id = get_document_product_dict_id("",name)
- else:
- original_id = get_document_product_dict_id(parent_id,name)
- if parent_id is not None and parent_id!="":
- _d = {DOCUMENT_PRODUCT_DICT_ID:original_id,
- DOCUMENT_PRODUCT_DICT_ALIAS:alias,
- DOCUMENT_PRODUCT_DICT_NAME:name,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_GRADE:grade,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:parent_id,
- DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
- _dpd = Document_product_dict(_d)
- _dpd.update_row(self.ots_client)
- # search interface if name and grade exists then update document_product_dict and return
- interface_id = get_document_product_dict_interface_base_id(name,grade)
- _interface_d = {
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:alias,
- DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300),
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE,
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
- DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:parent_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:standard_alias,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:remove_words,
- DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL:level
- }
- _dpdi = Document_product_dict_interface(_interface_d)
- if _dpdi.exists_row(self.ots_client):
- return
- list_name = []
- #update milvus
- Coll,_ = self.get_collection(grade)
- if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias):
- list_name.append(name)
- if standard_alias is not None and standard_alias!="":
- list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
- for _alias in list_alias:
- _alias = _alias.strip()
- if len(_alias)==0:
- continue
- if _alias==name:
- continue
- list_name.append(_alias)
- time.sleep(PRODUCT_REDIS_CACHE_TIME)
- #judge whether there exists records before this record created,if not process the history data
- if not self.exists_records(name,grade,create_time):
- self.process_history_by_name(list_name,grade,"insert")
- _dpdi.update_row(self.ots_client)
- def get_updated_record(self,alias,standard_alias,remove_words,level,original_alias,original_standard_alias,original_remove_words,original_level):
- original_alias_set = set()
- original_standard_alias_set = set()
- original_remove_words_set = set()
- if original_alias is not None and original_alias!="":
- _split = original_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- original_alias_set.add(_s)
- if original_standard_alias is not None and original_standard_alias!="":
- _split = original_standard_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- original_standard_alias_set.add(_s)
- if original_remove_words is not None and original_remove_words!="":
- _split = original_remove_words.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- original_remove_words_set.add(_s)
- new_alias_set = set()
- new_standard_alias_set = set()
- new_remove_words_set = set()
- if alias is not None and alias!="":
- if alias[0]=="+":
- new_alias_set |= original_alias_set
- _split = alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- new_alias_set.add(_s)
- elif alias[0]=="-":
- new_alias_set |= original_alias_set
- _split = alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- if _s in new_alias_set:
- new_alias_set.remove(_s)
- else:
- _split = alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- new_alias_set.add(_s)
- else:
- new_alias_set = original_alias_set
- if standard_alias is not None and standard_alias!="":
- if standard_alias[0]=="+":
- new_standard_alias_set |= original_standard_alias_set
- _split = standard_alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- new_standard_alias_set.add(_s)
- elif standard_alias[0]=="-":
- new_standard_alias_set |= original_standard_alias_set
- _split = standard_alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- if _s in new_standard_alias_set:
- new_standard_alias_set.remove(_s)
- else:
- _split = standard_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- new_standard_alias_set.add(_s)
- else:
- new_standard_alias_set = original_standard_alias_set
- if remove_words is not None and remove_words!="":
- if remove_words[0]=="+":
- new_remove_words_set |= original_remove_words_set
- _split = remove_words[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- new_remove_words_set.add(_s)
- elif remove_words[0]=="-":
- new_remove_words_set |= original_remove_words_set
- _split = remove_words[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- if _s in new_remove_words_set:
- new_remove_words_set.remove(_s)
- else:
- _split = remove_words.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
- for _s in _split:
- _s = _s.strip()
- if _s=="":
- continue
- new_remove_words_set.add(_s)
- else:
- new_remove_words_set = original_remove_words_set
- update_flag = False
- milvus_update_flag = False
- if len(new_alias_set&original_alias_set)!=len(new_alias_set):
- update_flag = True
- if len(new_standard_alias_set&original_remove_words_set)!=len(new_standard_alias_set):
- update_flag = True
- milvus_update_flag = True
- if len(new_remove_words_set&original_remove_words_set)!=len(new_remove_words_set):
- update_flag = True
- milvus_update_flag = True
- if str(level)!=str(original_level):
- update_flag = True
- milvus_update_flag = True
- return update_flag,milvus_update_flag,original_alias_set,original_standard_alias_set,original_remove_words_set,new_alias_set,new_standard_alias_set,new_remove_words_set
- def act_update(self,name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level):
- # check whether there are change variable
- _interface_id = get_document_product_dict_interface_base_id(name,grade)
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:_interface_id}
- _dpdi = Document_product_dict_interface(_d)
- if not _dpdi.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS,DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS,DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL],True):
- return
- original_alias = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS)
- original_standard_alias = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
- original_remove_words = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS)
- original_level = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL)
- update_flag,milvus_update_flag,original_alias_set,original_standard_alias_set,original_remove_words_set,new_alias_set,new_standard_alias_set,new_remove_words_set = self.get_updated_record(alias,standard_alias,remove_words,level,original_alias,original_standard_alias,original_remove_words,original_level)
- if not update_flag:
- return
- interface_id = get_document_product_dict_interface_base_id(name,grade)
- final_alias = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_alias_set))
- final_standard_alias = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_standard_alias_set))
- final_remove_words = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_remove_words_set))
- if parent_id is None:
- parent_id = ""
- if level is None or level=="":
- level = 1
- delete_standard_names = list(original_standard_alias_set-new_standard_alias_set)
- insert_standard_names = list(new_standard_alias_set-original_standard_alias_set)
- delete_remove_words = list(original_remove_words_set-new_remove_words_set)
- insert_remove_words = list(new_remove_words_set-original_remove_words_set)
- log("update_interface delete_standard_names:%s insert_standard_names:%s delete_remove_words:%s insert_remove_words:%s"%(str(delete_standard_names),str(insert_standard_names),str(delete_remove_words),str(insert_remove_words)))
- # update the milvus
- Coll,_ = self.get_collection(grade)
- if milvus_update_flag:
- insert_new_record_to_milvus(Coll,name,grade,parent_id,final_standard_alias,final_remove_words,level)
- if len(delete_standard_names)>0:
- for _name in delete_standard_names:
- delete_record_from_milvus(Coll,_name,"")
- time.sleep(PRODUCT_REDIS_CACHE_TIME)
- # update document_product_dict
- # update alias
- if len(new_alias_set&original_alias_set)!=len(new_alias_set):
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,name),
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- log("update dict table alias %d counts"%(total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data.extend(getRow_ots(rows))
- for _data in list_data:
- dpd = Document_product_dict(_data)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_ALIAS,final_alias,True)
- dpd.update_row(self.ots_client)
- #if merge current names then update dict
- for _name in insert_standard_names:
- if _name==name:
- continue
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,_name),
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- log("delete dict table %d counts"%(total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data.extend(getRow_ots(rows))
- for _data in list_data:
- dpd = Document_product_dict(_data)
- _id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
- log("delete id:%s"%(_id))
- self.recurse_delete_dict(_id)
- dpd.delete_row(self.ots_client)
- face_id = get_document_product_dict_interface_base_id(_name,grade)
- _interface_d = {
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:face_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(401,451)
- }
- _dpdi = Document_product_dict_interface(_interface_d)
- if _dpdi.exists_row(self.ots_client):
- _dpdi.update_row(self.ots_client)
- # process history
- if len(delete_standard_names)>0:
- self.process_history_by_standard_name(name,grade,delete_standard_names,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE)
- if len(insert_standard_names)>0:
- self.process_history_by_standard_name(name,grade,insert_standard_names,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
- if len(delete_remove_words)>0:
- self.process_history_by_remove_words(name,grade,delete_remove_words,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE)
- if len(insert_remove_words)>0:
- self.process_history_by_remove_words(name,grade,insert_remove_words,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
- _interface_d = {
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:final_alias,
- DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300),
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE,
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
- DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:parent_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:final_standard_alias,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:final_remove_words,
- DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL:level
- }
- _dpdi = Document_product_dict_interface(_interface_d)
- _dpdi.update_row(self.ots_client)
- def recurse_update_dict(self,parent_id,new_parent_id):
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,parent_id)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data.extend(getRow_ots(rows))
- for _data in list_data:
- dpd = Document_product_dict(_data)
- old_id = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_ID)
- new_id = get_document_product_dict_id(new_parent_id,dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_NAME))
- self.recurse_update_dict(old_id,new_id)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_PARENT_ID,new_parent_id,True)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_ID,new_id,True)
- dpd.update_row(self.ots_client)
- dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:old_id})
- dpd.delete_row(self.ots_client)
- def act_delete(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
- #search records which name=name and grade=grade
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,str(name)),
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- if total_count==0:
- return
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data.extend(getRow_ots(rows))
- interface_id = get_document_product_dict_interface_base_id(name,grade)
- #delete milvus records
- Coll,_ = self.get_collection(grade)
- delete_record_from_milvus(Coll,name,standard_alias)
- time.sleep(PRODUCT_REDIS_CACHE_TIME)
- #process_history data
- self.process_history_by_name([name],grade,"delete")
- #delete document_product_dict
- log("delete document_product_dict name:%s grade:%s count:%s"%(str(name),str(grade),str(len(list_data))))
- for _data in list_data:
- id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
- self.recurse_delete_dict(id)
- _d = {DOCUMENT_PRODUCT_DICT_ID:id}
- dpd = Document_product_dict(_d)
- dpd.delete_row(self.ots_client)
- _interface_d = {
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
- }
- _dpdi = Document_product_dict_interface(_interface_d)
- _dpdi.delete_row(self.ots_client)
- def recurse_delete_dict(self,id):
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,id)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data.extend(getRow_ots(rows))
- for _data in list_data:
- _id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
- self.recurse_delete_dict(_id)
- dpd = Document_product_dict(_data)
- dpd.delete_row(self.ots_client)
- def embedding_interface_producer(self):
- bool_query = BoolQuery(must_queries=[
- # TermQuery("name",'济南鑫驰'),
- RangeQuery("status",1,50,True,True)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _data in list_data:
- self.queue_product_interface.put(_data)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _data in list_data:
- self.queue_product_interface.put(_data)
- if self.queue_product_dict.qsize()>1000:
- break
- log("embedding interface total_count %d"%(total_count))
- def embedding_interface_comsumer(self):
- def _handle(item,result_queue):
- id = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ID)
- action = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION)
- name = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_NAME)
- alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS)
- grade = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE)
- original_id = item.get(DOCUMENT_PRODUCT_DICT_ORIGINAL_ID)
- parent_id = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID,"")
- standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
- create_time = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)
- remove_words = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS,'')
- level = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL,1)
- if name is not None and len(name)>1 and len(name)<MAX_NAME_LENGTH:
- if action=="insert":
- self.act_insert(name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level)
- elif action=="update":
- self.act_update(name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level)
- elif action=="delete":
- self.act_delete(name,alias,grade,original_id,parent_id,standard_alias,create_time)
- _pdi = Document_product_dict_interface({DOCUMENT_PRODUCT_DICT_INTERFACE_ID:id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300)})
- _pdi.update_row(self.ots_client)
- self.embedding_interface_producer()
- mt = MultiThreadHandler(self.queue_product_interface,_handle,None,20,1)
- mt.run()
- def start_embedding_product_dict(self):
- from apscheduler.schedulers.blocking import BlockingScheduler
- scheduler = BlockingScheduler()
- # scheduler.add_job(func=self.embedding_producer,trigger="cron",minute="*/1")
- scheduler.add_job(func=self.embedding_comsumer,trigger="cron",second="*/5")
- scheduler.start()
- def delete_collections(self):
- drop_embedding_collection(self.collection_name_name)
- drop_embedding_collection(self.collection_name_brand)
- drop_embedding_collection(self.collection_name_specs)
- def start_embedding_product_dict():
- pdm = Product_Dict_Manager()
- pdm.start_embedding_product_dict()
- def drop_product_dict_collections():
- pdm = Product_Dict_Manager()
- pdm.delete_collections()
- def search_similar():
- task_queue = Queue()
- ots_client = getConnect_ots()
- pdm = Product_Dict_Manager()
- list_data = []
- columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE]
- bool_query = BoolQuery(must_queries=[
- RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,4,5,True,True),
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _d in list_dict:
- list_data.append(_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _d in list_dict:
- list_data.append(_d)
- # if len(list_data)>=1000:
- # break
- log("product_dict embedding total_count:%d"%total_count)
- set_key = set()
- for _d in list_data:
- name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
- grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
- _key = "%s-%d"%(name,grade)
- if _key in set_key:
- continue
- set_key.add(_key)
- task_queue.put(_d)
- result_queue = Queue()
- def handle(item,result_queue):
- id = item.get(DOCUMENT_PRODUCT_DICT_ID)
- name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
- vector = get_embedding_request(name)
- parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
- grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
- Coll,Coll_name = pdm.get_collection(grade)
- output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name"]
- if vector is not None and Coll is not None:
- search_list = get_embedding_search(Coll,embedding_index_name,name,grade,[vector],pdm.search_params,output_fields,limit=10)
- for _item in search_list:
- ots_id = _item.get("ots_id")
- ots_name = _item.get("ots_name")
- ots_parent_id = _item.get("ots_parent_id")
- standard_name = _item.get("standard_name")
- if grade==4:
- if check_brand(name,ots_name):
- _d = {"source_id":id,"source_name":name,"grade":grade,"target_id":ots_id,"target_name":ots_name,"parent_id":parent_id,"target_parent_id":ots_parent_id,"target_standard_name":standard_name}
- result_queue.put(_d)
- elif grade==5:
- if is_similar(name,ots_name) and check_specs(name,ots_name):
- _d = {"source_id":id,"source_name":name,"grade":grade,"target_id":ots_id,"target_name":ots_name,"parent_id":parent_id,"target_parent_id":ots_parent_id,"target_standard_name":standard_name}
- result_queue.put(_d)
- mt = MultiThreadHandler(task_queue,handle,result_queue,5,1)
- mt.run()
- df_data = {}
- _set = set()
- df_columns = ["source_id","source_name","grade","parent_id","target_id","target_name","parent_parent_id"]
- while 1:
- try:
- item = result_queue.get(timeout=1)
- source_name = item.get("source_name")
- target_name = item.get("target_name")
- _key1 = "%s-%s"%(source_name,target_name)
- _key2 = "%s-%s"%(target_name,source_name)
- for c in df_columns:
- if c not in df_data:
- df_data[c] = []
- df_data[c].append(getLegal_str(item.get(c)))
- except Exception as e:
- break
- import pandas as pd
- df = pd.DataFrame(df_data)
- df.to_excel("search_similar2.xlsx",columns=df_columns)
- def clean_similar():
- import pandas as pd
- filename = "../../test/search_similar2_1.xlsx"
- df = pd.read_excel(filename)
- _set = set()
- list_source_name = []
- list_grade = []
- list_target_name = []
- list_check = []
- brand_set_move = set()
- brand_set_keep = set()
- specs_set_move = set()
- specs_set_keep = set()
- for source_name,grade,target_name in zip(df["source_name"],df["grade"],df["target_name"]):
- source_name = str(source_name)
- target_name = str(target_name)
- if source_name==target_name:
- continue
- _key1 = "%s-%s"%(source_name,target_name)
- _key2 = "%s--%s"%(target_name,source_name)
- if _key1 in _set or _key2 in _set:
- continue
- _set.add(_key1)
- _set.add(_key2)
- list_source_name.append(source_name)
- list_grade.append(grade)
- list_target_name.append(target_name)
- if grade==4:
- _check = check_brand(source_name,target_name)
- elif grade==5:
- _check = is_similar(source_name,target_name) and check_specs(source_name,target_name)
- list_check.append(_check)
- if _check:
- if grade==4:
- n_source_name = re.sub("省|市|县|集团|股份|有限|责任|公司",'',str(source_name))
- n_target_name = re.sub("省|市|县|集团|股份|有限|责任|公司",'',str(target_name))
- else:
- n_source_name = source_name
- n_target_name = target_name
- source_dis = abs(len(n_source_name)-4.6)
- target_dis = abs(len(n_target_name)-4.6)
- if source_dis>target_dis:
- if grade==4:
- brand_set_keep.add(target_name)
- brand_set_move.add(source_name)
- elif grade==5:
- specs_set_keep.add(target_name)
- specs_set_move.add(source_name)
- else:
- if grade==4:
- brand_set_keep.add(source_name)
- brand_set_move.add(target_name)
- elif grade==5:
- specs_set_keep.add(source_name)
- specs_set_move.add(target_name)
- df = pd.DataFrame({"source_name":list_source_name,
- "grade":list_grade,
- "target_name":list_target_name,
- "check":list_check})
- df.to_excel("%s_clean.xlsx"%(filename),columns=["source_name","grade","target_name","check"])
- list_brand_move = list(brand_set_move)
- list_brand_keep = list(brand_set_keep)
- list_brand_union = list(brand_set_move&brand_set_keep)
- list_specs_move = list(specs_set_move)
- list_specs_keep = list(specs_set_keep)
- list_specs_union = list(specs_set_move&specs_set_keep)
- with open("%s_brand_move.txt"%(filename),"w",encoding="utf8") as f:
- for _move in list_brand_move:
- f.write("%s\n"%(_move))
- with open("%s_brand_keep.txt"%(filename),"w",encoding="utf8") as f:
- for _keep in list_brand_keep:
- f.write("%s\n"%(_keep))
- with open("%s_brand_union.txt"%(filename),"w",encoding="utf8") as f:
- for _union in list_brand_union:
- f.write("%s\n"%(_union))
- with open("%s_specs_move.txt"%(filename),"w",encoding="utf8") as f:
- for _move in list_specs_move:
- f.write("%s\n"%(_move))
- with open("%s_specs_keep.txt"%(filename),"w",encoding="utf8") as f:
- for _keep in list_specs_keep:
- f.write("%s\n"%(_keep))
- with open("%s_specs_union.txt"%(filename),"w",encoding="utf8") as f:
- for _union in list_specs_union:
- f.write("%s\n"%(_union))
- def insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words="",level=1,wait_sync=True):
- n_name = get_milvus_standard_name(name)
- name_id = get_milvus_product_dict_id(n_name)
- vector = request_embedding(n_name)
- log("insert name %s grade %d"%(name,grade))
- if vector is not None and Coll is not None:
- expr = " ots_id in ['%s']"%name_id
- Coll.delete(expr)
- data = [[name_id],
- [name],
- [name],
- [name_id],
- [vector],
- [parent_id],
- [grade],
- [remove_words],
- [level]
- ]
- insert_embedding(Coll,data)
- if standard_alias is not None and standard_alias!="":
- list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
- for _alias in list_alias:
- _alias = _alias.strip()
- if len(_alias)==0:
- continue
- if _alias==name:
- continue
- _id = get_document_product_dict_standard_alias_id(_alias)
- expr = " ots_id in ['%s']"%_id
- Coll.delete(expr)
- n_alias = get_milvus_standard_name(_alias)
- vector = request_embedding(n_alias)
- data = [[_id],
- [_alias],
- [name],
- [name_id],
- [vector],
- [parent_id],
- [grade],
- [remove_words],
- [level]
- ]
- insert_embedding(Coll,data)
- if wait_sync:
- while 1:
- try:
- log("milvus insert wait for done")
- list_result = Coll.query(expr=expr,output_fields=["standard_name"])
- log("list_result"+str(list_result)+str(type(list_result[0])))
- if len(list_result)==1:
- if list_result[0].get("standard_name","")==name:
- log("milvus insert done")
- return True
- time.sleep(1)
- except Exception as e:
- traceback.print_exc()
- else:
- return True
- def delete_record_from_milvus(Coll,name,standard_alias):
- n_name = get_milvus_standard_name(name)
- name_id = get_milvus_product_dict_id(n_name)
- log("delete name %s standard_alias %s"%(str(name),str(standard_alias)))
- expr = " ots_id in ['%s']"%name_id
- Coll.delete(expr)
- if standard_alias is not None and standard_alias!="":
- list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
- for _alias in list_alias:
- _alias = _alias.strip()
- if len(_alias)==0:
- continue
- if _alias==name:
- continue
- _id = get_document_product_dict_standard_alias_id(_alias)
- expr = " ots_id in ['%s']"%_id
- Coll.delete(expr)
- while 1:
- if len(Coll.query(expr=expr))==0:
- return
- else:
- log("milvus delete wait for done")
- time.sleep(1)
- def dict_interface_delete(name,grade,ots_client = getConnect_ots()):
- from uuid import uuid4
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"delete",
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
- dpdi = Document_product_dict_interface(_d)
- dpdi.update_row(ots_client)
- def interface_insert():
- from uuid import uuid4
- a = '''
- '''
- grade = 4
- new_standard_alias = ""
- new_remove_words = ""
- list_brand = []
- ots_client=getConnect_ots()
- for s in re.split("[\n\s,.,。、]",a):
- s = s.strip()
- if s=="":
- continue
- list_brand.append(s)
- grade = 4
- for brand in list_brand:
- print(brand)
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:brand,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert",
- DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
- DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:new_remove_words,
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
- dpdi = Document_product_dict_interface(_d)
- dpdi.update_row(ots_client)
- def interface_deletes():
- a = '''
- 株式会社
- '''
- grade = 4
- ots_client=getConnect_ots()
- for s in re.split("[\n\s]",a):
- s = s.strip()
- if s=="":
- continue
- print(s)
- dict_interface_delete(s,grade,ots_client)
- def interface_update():
- name = "万东"
- new_standard_alias = "+万东康源|北京万东"
- new_remove_words = ""
- grade = 4
- ots_client = getConnect_ots()
- from uuid import uuid4
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
- DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
- DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:new_remove_words,
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
- dpdi = Document_product_dict_interface(_d)
- dpdi.update_row(ots_client)
- def interface_brand_update_by_file():
- import pandas as pd
- import re
- filename = "../../test/品牌合并.xlsx"
- df0 = pd.read_excel(filename,0)
- df1 = pd.read_excel(filename,1)
- set_source_brand = set()
- for b in df0["brands"]:
- if b is None or b=="":
- continue
- list_brand = b.split(",")
- for brand in list_brand:
- brand = brand.strip()
- if brand=="":
- continue
- set_source_brand.add(brand)
- target_brand = df1["brand"]
- target_standard_alias = df1["standard_alias"]
- _check_flag = True
- list_target = []
- for tbrand,standard_alias in zip(target_brand,target_standard_alias):
- brand = tbrand.strip()
- if brand not in set_source_brand:
- print("not in source:%s"%(brand))
- _check_flag = False
- if standard_alias is None or standard_alias=="" or str(standard_alias)=="nan":
- continue
- list_brand = re.split("[,,]",standard_alias)
- set_alias = set()
- for brand in list_brand:
- brand = brand.strip()
- if brand=="":
- continue
- if brand not in set_source_brand:
- print("not in source:%s"%(brand))
- _check_flag = False
- set_alias.add(brand)
- _d = {"brand":tbrand.strip(),
- "standard_alias":"+"+"|".join(list(set_alias))}
- list_target.append(_d)
- if _check_flag or 1:
- grade = 4
- ots_client = getConnect_ots()
- from uuid import uuid4
- for target in list_target:
- name = target["brand"]
- new_standard_alias = target["standard_alias"]
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
- DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
- DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:"",
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
- dpdi = Document_product_dict_interface(_d)
- dpdi.update_row(ots_client)
- print(list_target)
- def clean_brands():
- from queue import Queue as TQueue
- task_queue = TQueue()
- ots_client = getConnect_ots()
- list_data = []
- table_name = Document_product_dict_interface_table_name
- table_index = table_name+"_index"
- columns=[DOCUMENT_PRODUCT_DICT_INTERFACE_NAME,DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE]
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
- RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,4,4,True,True),
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _d in list_dict:
- list_data.append(_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _d in list_dict:
- list_data.append(_d)
- # if len(list_data)>=1000:
- # break
- log("product_dict embedding total_count:%d"%total_count)
- set_key = set()
- list_process_data = []
- set_brand = set()
- for _d in list_data:
- name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
- grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
- _key = "%s-%d"%(name,grade)
- if _key in set_key:
- continue
- set_key.add(_key)
- task_queue.put(_d)
- list_process_data.append(_d)
- if grade==BRAND_GRADE:
- set_brand.add(name)
- def _handle(item,result_queue):
- name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
- _legal = is_legal_brand(ots_client,name)
- if is_legal_brand(ots_client,name):
- item["legal"] = 1
- elif _legal==False:
- item["legal"] = 0
- else:
- item["legal"] = 0
- bool_query = BoolQuery(must_queries=[
- TermQuery("brand",name)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document_product","document_product_index",
- SearchQuery(bool_query,get_total_count=True))
- if total_count>=2:
- item["legal"] = 1
- else:
- bool_query = BoolQuery(must_queries=[
- NestedQuery("products",WildcardQuery("products.brand",name)),
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
- SearchQuery(bool_query,get_total_count=True))
- if total_count>=1:
- item["legal"] = 1
- else:
- item["legal"] = 0
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- list_legal = []
- list_illegal = []
- for _data in list_process_data:
- name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
- legal = _data["legal"]
- if legal==1:
- list_legal.append(name)
- else:
- list_illegal.append(name)
- with open("../../test/legal_brand.txt", "w", encoding="utf8") as f:
- for _name in list_legal:
- f.write("%s\n"%(_name))
- with open("../../test/illegal_brand.txt", "w", encoding="utf8") as f:
- for _name in list_illegal:
- f.write("%s\n"%(_name))
- def merge_brands():
- from queue import Queue as TQueue
- import pandas as pd
- task_queue = TQueue()
- ots_client = getConnect_ots()
- list_data = []
- table_name = Document_product_dict_interface_table_name
- table_index = table_name+"_index"
- columns=[DOCUMENT_PRODUCT_DICT_INTERFACE_NAME,DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS]
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
- RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,4,4,True,True),
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _d in list_dict:
- list_data.append(_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
- list_dict = getRow_ots(rows)
- for _d in list_dict:
- list_data.append(_d)
- # if len(list_data)>=1000:
- # break
- log("product_dict embedding total_count:%d"%total_count)
- set_key = set()
- list_process_data = []
- set_brand = set()
- for _d in list_data:
- name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
- grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
- _key = "%s-%d"%(name,grade)
- if _key in set_key:
- continue
- set_key.add(_key)
- task_queue.put(_d)
- list_process_data.append(_d)
- if grade==BRAND_GRADE:
- set_brand.add(name)
- area_set = get_area_set()
- def _handle(item,result_queue):
- name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
- grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
- for i in range(min(len(name)-2,8)):
- _n = name[:i+1]
- if _n in area_set:
- n_name = re.sub("^[省市区]]",'',name[i+1:])
- if n_name in set_brand:
- item["belongs_to"] = n_name
- standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
- if standard_alias is not None and standard_alias!="":
- for salias in standard_alias.split("|"):
- face_id = get_document_product_dict_interface_base_id(salias,grade)
- _interface_d = {
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:face_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(401,451)
- }
- _dpdi = Document_product_dict_interface(_interface_d)
- if _dpdi.exists_row(ots_client):
- _dpdi.update_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,20)
- mt.run()
- dict_belongs_alias = {}
- for _data in list_process_data:
- name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
- belongs_to = _data.get("belongs_to")
- if belongs_to is not None:
- if belongs_to not in dict_belongs_alias:
- dict_belongs_alias[belongs_to] = []
- dict_belongs_alias[belongs_to].append(name)
- df_data = {"brand":[],"standard_alias":[]}
- for k,v in dict_belongs_alias.items():
- df_data["brand"].append(k)
- df_data["standard_alias"].append("|".join(v))
- df = pd.DataFrame(df_data)
- df.to_excel("../../merge.xlsx",columns=["brand","standard_alias"])
- # grade = 4
- # ots_client = getConnect_ots()
- # from uuid import uuid4
- # for k,v in dict_belongs_alias.items():
- # name = k
- # new_standard_alias = "+%s"%("|".join(v))
- # print(k,new_standard_alias)
- # _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
- # DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- # DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
- # DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- # DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
- # DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
- # DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:"",
- # DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
- # dpdi = Document_product_dict_interface(_d)
- # dpdi.update_row(ots_client)
- def interface_delete_brands():
- from uuid import uuid4
- ots_client=getConnect_ots()
- list_brand = []
- a = '''
- 日本
- '''
- grade = 4
- for s in re.split("[\n\s,.,。、]",a):
- s = s.strip()
- if s=="":
- continue
- list_brand.append(s)
- with open("../../test/illegal_brand.txt","r",encoding="utf8") as f:
- while 1:
- brand = f.readline()
- if not brand:
- break
- brand = brand.strip()
- if brand!="":
- list_brand.append(brand)
- for brand in list_brand:
- print(brand)
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:brand,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"delete",
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- "is_temp":1
- }
- dpdi = Document_product_dict_interface(_d)
- dpdi.update_row(ots_client)
- def clean_interface_delete_temp():
- ots_client = getConnect_ots()
- table_name = Document_product_dict_interface_table_name
- table_index = table_name+"_index"
- columns = ["is_temp","status","name"]
- task_queue = Queue()
- bool_query = BoolQuery(must_queries=[
- TermQuery("action","delete")
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- def _handle(item,result_queue):
- is_temp = item.get("is_temp",0)
- status = item.get("status",0)
- name = item.get("name")
- dpdi = Document_product_dict_interface(item)
- if is_temp==1 and status>=201:
- dpdi.delete_row(ots_client)
- else:
- pass
- # dpdi.setValue("status",1,True)
- # dpdi.update_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- def clean_product_dict():
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- RangeQuery("grade",3)
- ])
- task_queue = Queue()
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- def _handle(item,result_queue):
- _dpd = Document_product_dict(item)
- _dpd.delete_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- def clean_product_dict_interface():
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- BoolQuery(should_queries=[
- TermQuery("action","insert"),
- TermQuery("action","base")
- ])
- ])
- task_queue = Queue()
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- def _handle(item,result_queue):
- _dpd = Document_product_dict_interface(item)
- _dpd.delete_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- def rerun_interface_deletes():
- ots_client = getConnect_ots()
- table_name = Document_product_dict_interface_table_name
- table_index = table_name+"_index"
- columns = ["is_temp","status","name"]
- task_queue = Queue()
- bool_query = BoolQuery(must_queries=[
- TermQuery("action","delete")
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- def _handle(item,result_queue):
- status = item.get("status",0)
- dpdi = Document_product_dict_interface(item)
- dpdi.setValue("status",1,True)
- dpdi.update_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- if __name__ == '__main__':
- # start_embedding_product_dict()
- interface_deletes()
- interface_insert()
- # interface_update()
- # interface_brand_update_by_file()
- # clean_similar()
- # clean_brands()
- # merge_brands()
- # interface_delete_brands()
- # clean_interface_delete_temp()
- # clean_product_dict()
- # clean_product_dict_interface()
- # rerun_interface_deletes()
|