product_dict.py 104 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974
  1. from BaseDataMaintenance.common.milvusUtil import *
  2. from multiprocessing import Process,Queue
  3. from BaseDataMaintenance.common.Utils import *
  4. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  5. from apscheduler.schedulers.blocking import BlockingScheduler
  6. from BaseDataMaintenance.model.ots.document_product_dict import *
  7. from BaseDataMaintenance.model.ots.document_product_dict_interface import *
  8. from BaseDataMaintenance.model.ots.document_product import *
  9. from BaseDataMaintenance.model.ots.document_product_tmp import *
  10. from BaseDataMaintenance.dataSource.source import getConnect_ots
  11. from tablestore import *
  12. from BaseDataMaintenance.common.Utils import getRow_ots
  13. from BaseDataMaintenance.maintenance.product.productUtils import *
  14. import time
  15. import traceback
  16. import json
  17. import requests
  18. from random import randint
  19. IS_SYNCHONIZED = 3
  20. class Product_Dict_Manager():
  21. def __init__(self):
  22. self.collection_name_name = COLLECTION_NAME_NAME
  23. self.collection_name_brand = COLLECTION_NAME_BRAND
  24. self.collection_name_specs = COLLECTION_NAME_SPECS
  25. self.search_params = {"metric_type":"IP",
  26. "params":{"nprobe":10}}
  27. self.init_milvus()
  28. self.ots_client = getConnect_ots()
  29. self.queue_product_dict = Queue()
  30. self.queue_product_interface = Queue()
  31. self.session = requests.Session()
  32. self.Coll_name = getCollection(self.collection_name_name)
  33. self.Coll_brand = getCollection(self.collection_name_brand)
  34. self.Coll_specs = getCollection(self.collection_name_specs)
  35. # self.pool_name = ConnectorPool(init_num=10,max_num=30,method_init=getCollection,collection_name=self.collection_name_name)
  36. #
  37. # self.pool_brand = ConnectorPool(init_num=10,max_num=30,method_init=getCollection,collection_name=self.collection_name_brand)
  38. # self.pool_specs = ConnectorPool(init_num=10,max_num=30,method_init=getCollection,collection_name=self.collection_name_specs)
  39. def init_milvus(self):
  40. from pymilvus import connections,FieldSchema,DataType
  41. fields = [
  42. # FieldSchema(name="pk_id",dtype=DataType.INT64,is_primary=True,auto_id=True), # pk is the same as ots
  43. FieldSchema(name="ots_id",dtype=DataType.VARCHAR,max_length=32,is_primary=True),
  44. FieldSchema(name="ots_name",dtype=DataType.VARCHAR,max_length=MAX_NAME_LENGTH),
  45. FieldSchema(name="standard_name",dtype=DataType.VARCHAR,max_length=MAX_NAME_LENGTH),
  46. FieldSchema(name="standard_name_id",dtype=DataType.VARCHAR,max_length=32),
  47. FieldSchema(name="embedding",dtype=DataType.FLOAT_VECTOR,dim=1024),
  48. FieldSchema(name="ots_parent_id",dtype=DataType.VARCHAR,max_length=32),
  49. FieldSchema(name="ots_grade",dtype=DataType.INT64),
  50. FieldSchema(name="remove_words",dtype=DataType.VARCHAR,max_length=3000),
  51. FieldSchema(name="level",dtype=DataType.INT64),
  52. ]
  53. index_name = "embedding"
  54. index_params = {"params":{"nlist":2048},
  55. "index_type":"IVF_SQ8",
  56. "metric_type":"IP"}
  57. init_milvus(milvus_host)
  58. #build the product name brand specs embedding respectively
  59. create_embedding_schema(self.collection_name_name,fields,index_name,index_params)
  60. create_embedding_schema(self.collection_name_brand,fields,index_name,index_params)
  61. create_embedding_schema(self.collection_name_specs,fields,index_name,index_params)
  62. def get_collection(self,grade):
  63. Coll = None
  64. Coll_name = None
  65. if grade ==SPECS_GRADE:
  66. Coll = self.Coll_specs
  67. Coll_name = self.collection_name_specs
  68. if grade == BRAND_GRADE:
  69. Coll = self.Coll_brand
  70. Coll_name = self.collection_name_brand
  71. if grade == NAME_GRADE:
  72. Coll = self.Coll_name
  73. Coll_name = self.collection_name_name
  74. return Coll,Coll_name
  75. def embedding_producer(self,columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,DOCUMENT_PRODUCT_DICT_LEVEL]):
  76. bool_query = BoolQuery(
  77. must_queries=[
  78. TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
  79. RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,3,5,True,True)
  80. ],
  81. must_not_queries=[TermQuery(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED)])
  82. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
  83. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
  84. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  85. list_dict = getRow_ots(rows)
  86. for _d in list_dict:
  87. self.queue_product_dict.put(_d)
  88. while next_token:
  89. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
  90. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  91. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  92. list_dict = getRow_ots(rows)
  93. for _d in list_dict:
  94. self.queue_product_dict.put(_d)
  95. if self.queue_product_dict.qsize()>=10000:
  96. break
  97. log("product_dict embedding total_count:%d"%total_count)
  98. def embedding_comsumer(self):
  99. def handle(item,result_queue):
  100. try:
  101. _id = item.get(DOCUMENT_PRODUCT_DICT_ID)
  102. name = str(item.get(DOCUMENT_PRODUCT_DICT_NAME))[:MAX_NAME_LENGTH]
  103. parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
  104. grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
  105. Coll,_ = self.get_collection(grade)
  106. standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
  107. remove_words = item.get(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,"")
  108. level = item.get(DOCUMENT_PRODUCT_DICT_LEVEL,1)
  109. if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level,wait_sync=False):
  110. _pd = Document_product_dict_interface({DOCUMENT_PRODUCT_DICT_ID:_id,DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED})
  111. _pd.update_row(self.ots_client)
  112. except Exception as e:
  113. traceback.print_exc()
  114. self.embedding_producer()
  115. start_time = time.time()
  116. q_size = self.queue_product_dict.qsize()
  117. mt = MultiThreadHandler(self.queue_product_dict,handle,None,5,1)
  118. mt.run()
  119. log("process embedding %d records cost %.2f s"%(q_size,time.time()-start_time))
  120. def process_history_name(self,list_name,action):
  121. if action=="insert":
  122. # search document_product_temp and update status
  123. # query in blur mode
  124. for name in list_name:
  125. should_q = self.make_query(name,DOCUMENT_PRODUCT_TMP_NAME,MatchPhraseQuery,4,5)
  126. if should_q is None:
  127. continue
  128. bool_query =BoolQuery(must_queries=[
  129. RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,401,450,True,True),
  130. should_q
  131. ])
  132. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
  133. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  134. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  135. list_data = getRow_ots(rows)
  136. while next_token:
  137. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
  138. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  139. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  140. list_data.extend(getRow_ots(rows))
  141. for _data in list_data:
  142. id = _data.get(DOCUMENT_PRODUCT_TMP_ID)
  143. status = randint(1,50)
  144. _d = {DOCUMENT_PRODUCT_TMP_ID:id,
  145. DOCUMENT_PRODUCT_TMP_STATUS:status}
  146. _dt = Document_product_tmp(_d)
  147. _dt.update_row(self.ots_client)
  148. elif action=="delete":
  149. # delete document_product
  150. # update temp new_id and status to 401-450
  151. for name in list_name:
  152. bool_query = self.make_query(name,DOCUMENT_PRODUCT_NAME,TermQuery,len(name),5)
  153. if bool_query is not None:
  154. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  155. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  156. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  157. list_data = getRow_ots(rows)
  158. while next_token:
  159. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  160. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  161. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  162. list_data.extend(getRow_ots(rows))
  163. for _d in list_data:
  164. _id = _d.get(DOCUMENT_PRODUCT_ID)
  165. _d = {DOCUMENT_PRODUCT_ID:_id}
  166. dp = Document_product(_d)
  167. dp.delete_row(self.ots_client)
  168. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  169. _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:randint(401,450)}
  170. dpt = Document_product_tmp(_d)
  171. dpt.update_row(self.ots_client)
  172. elif action=="update":
  173. # delete document_product and update document_product_temp to rerun
  174. for name in list_name:
  175. bool_query = self.make_query(name,DOCUMENT_PRODUCT_NAME,TermQuery,len(name),5)
  176. if bool_query is not None:
  177. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  178. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  179. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  180. list_data = getRow_ots(rows)
  181. while next_token:
  182. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  183. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  184. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  185. list_data.extend(getRow_ots(rows))
  186. for _d in list_data:
  187. _id = _d.get(DOCUMENT_PRODUCT_ID)
  188. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  189. self.rerun(_id,original_id)
  190. def process_history_brand(self,list_brand,action):
  191. # search document_product and rerun
  192. for name in list_brand:
  193. if action=="insert":
  194. name = re.sub("有限|责任|公司",'',name)
  195. bool_query = self.make_query(name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,4,5)
  196. else:
  197. bool_query = self.make_query(name,DOCUMENT_PRODUCT_BRAND,TermQuery,len(name),5)
  198. if bool_query is not None:
  199. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  200. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  201. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  202. list_data = getRow_ots(rows)
  203. log("insert brand %s %d counts"%(name,total_count))
  204. while next_token:
  205. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  206. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  207. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  208. list_data.extend(getRow_ots(rows))
  209. for _d in list_data:
  210. _id = _d.get(DOCUMENT_PRODUCT_ID)
  211. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  212. self.rerun(_id,original_id)
  213. def process_history_specs(self,list_specs,action):
  214. # search document_product and rerun
  215. for name in list_specs:
  216. if action=="insert":
  217. bool_query = self.make_query(name,DOCUMENT_PRODUCT_ORIGINAL_SPECS,MatchPhraseQuery,len(name),5)
  218. else:
  219. bool_query = self.make_query(name,DOCUMENT_PRODUCT_SPECS,TermQuery,len(name),5)
  220. if bool_query is not None:
  221. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  222. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  223. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  224. list_data = getRow_ots(rows)
  225. while next_token:
  226. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  227. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  228. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID],return_type=ColumnReturnType.SPECIFIED))
  229. list_data.extend(getRow_ots(rows))
  230. for _d in list_data:
  231. _id = _d.get(DOCUMENT_PRODUCT_ID)
  232. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  233. self.rerun(_id,original_id)
  234. def rerun(self,id,original_id):
  235. _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
  236. dpt = Document_product_tmp(_d)
  237. dpt.update_row(self.ots_client)
  238. _d = {DOCUMENT_PRODUCT_ID:id}
  239. dp = Document_product(_d)
  240. dp.delete_row(self.ots_client)
  241. def make_query(self,name,column,query_type,min_len,strides):
  242. should_q = []
  243. strides_spce = len(name)-min_len+1
  244. for _i in range(min(strides_spce,strides)):
  245. _s = str(name)[_i:min_len]
  246. if query_type==WildcardQuery:
  247. should_q.append(query_type(column,"*%s*"%_s))
  248. elif query_type==TermQuery or query_type==MatchPhraseQuery:
  249. should_q.append(query_type(column,"%s"%(_s)))
  250. if len(should_q)>0:
  251. return BoolQuery(should_queries=should_q)
  252. return None
  253. def process_history_by_name(self,list_name,grade,action):
  254. assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_UPDATE,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
  255. if grade==NAME_GRADE:
  256. self.process_history_name(list_name,action)
  257. elif grade==BRAND_GRADE:
  258. self.process_history_brand(list_name,action)
  259. elif grade==SPECS_GRADE:
  260. self.process_history_specs(list_name,action)
  261. def process_history_by_standard_name(self,name,grade,list_name,action):
  262. assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
  263. if grade==NAME_GRADE:
  264. if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
  265. for n_name in list_name:
  266. bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_NAME,TermQuery,len(n_name),5)
  267. if bool_query is not None:
  268. _query = bool_query
  269. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  270. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  271. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
  272. list_data = getRow_ots(rows)
  273. total_count = total_count
  274. while next_token:
  275. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  276. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  277. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_NAME_ID],return_type=ColumnReturnType.SPECIFIED))
  278. list_data.extend(getRow_ots(rows))
  279. for _d in list_data:
  280. dict_name_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
  281. for dict_id in [dict_name_id]:
  282. if dict_id is not None and dict_id!="":
  283. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
  284. self.recurse_delete_dict(dict_name_id)
  285. dpd.delete_row(self.ots_client)
  286. _id = _d.get(DOCUMENT_PRODUCT_ID)
  287. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  288. self.rerun(_id,original_id)
  289. log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
  290. if total_count==0:
  291. self.process_history_name([n_name],action)
  292. elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
  293. for n_name in list_name:
  294. bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_NAME,MatchPhraseQuery,len(n_name),5)
  295. if bool_query is not None:
  296. _query = BoolQuery(must_queries=[
  297. TermQuery(DOCUMENT_PRODUCT_NAME,name),
  298. bool_query
  299. ])
  300. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  301. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  302. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  303. list_data = getRow_ots(rows)
  304. while next_token:
  305. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  306. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  307. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  308. list_data.extend(getRow_ots(rows))
  309. for _d in list_data:
  310. dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
  311. if dict_brand_id is not None and dict_brand_id!="":
  312. _query = BoolQuery(must_queries=[
  313. TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
  314. ])
  315. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  316. SearchQuery(_query,get_total_count=True))
  317. if total_count==1:
  318. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
  319. self.recurse_delete_dict(dict_brand_id)
  320. _id = _d.get(DOCUMENT_PRODUCT_ID)
  321. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  322. self.rerun(_id,original_id)
  323. if grade==BRAND_GRADE:
  324. if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
  325. for n_name in list_name:
  326. bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_BRAND,TermQuery,len(n_name),5)
  327. if bool_query is not None:
  328. _query = bool_query
  329. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  330. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  331. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  332. list_data = getRow_ots(rows)
  333. while next_token:
  334. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  335. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  336. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  337. list_data.extend(getRow_ots(rows))
  338. for _d in list_data:
  339. dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
  340. for dict_id in [dict_brand_id]:
  341. if dict_id is not None and dict_id!="":
  342. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
  343. self.recurse_delete_dict(dict_brand_id)
  344. dpd.delete_row(self.ots_client)
  345. _id = _d.get(DOCUMENT_PRODUCT_ID)
  346. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  347. self.rerun(_id,original_id)
  348. log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
  349. if total_count==0:
  350. self.process_history_brand([n_name],action)
  351. elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
  352. for n_name in list_name:
  353. bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,4,5)
  354. if bool_query is not None:
  355. _query = BoolQuery(must_queries=[
  356. TermQuery(DOCUMENT_PRODUCT_BRAND,name),
  357. bool_query
  358. ])
  359. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  360. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  361. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  362. list_data = getRow_ots(rows)
  363. while next_token:
  364. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  365. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  366. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  367. list_data.extend(getRow_ots(rows))
  368. for _d in list_data:
  369. dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
  370. if dict_brand_id is not None and dict_brand_id!="":
  371. _query = BoolQuery(must_queries=[
  372. TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
  373. ])
  374. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  375. SearchQuery(_query,get_total_count=True))
  376. if total_count==1:
  377. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
  378. self.recurse_delete_dict(dict_brand_id)
  379. dpd.delete_row(self.ots_client)
  380. _id = _d.get(DOCUMENT_PRODUCT_ID)
  381. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  382. self.rerun(_id,original_id)
  383. if grade==SPECS_GRADE:
  384. if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
  385. for n_name in list_name:
  386. bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_SPECS,TermQuery,len(n_name),5)
  387. if bool_query is not None:
  388. _query = bool_query
  389. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  390. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  391. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
  392. list_data = getRow_ots(rows)
  393. while next_token:
  394. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  395. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  396. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
  397. list_data.extend(getRow_ots(rows))
  398. for _d in list_data:
  399. dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
  400. for dict_id in [dict_specs_id]:
  401. if dict_id is not None and dict_id!="":
  402. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_id})
  403. self.recurse_delete_dict(dict_specs_id)
  404. dpd.delete_row(self.ots_client)
  405. _id = _d.get(DOCUMENT_PRODUCT_ID)
  406. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  407. self.rerun(_id,original_id)
  408. log("%s insert standard_alias %s exists %d counts "%(name,n_name,total_count))
  409. if total_count==0:
  410. self.process_history_specs([n_name],action)
  411. elif action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
  412. _query = BoolQuery(must_queries=[
  413. TermQuery(DOCUMENT_PRODUCT_SPECS,name),
  414. ])
  415. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  416. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  417. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
  418. list_data = getRow_ots(rows)
  419. while next_token:
  420. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  421. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  422. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
  423. list_data.extend(getRow_ots(rows))
  424. for _d in list_data:
  425. dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
  426. if dict_specs_id is not None and dict_specs_id!="":
  427. _query = BoolQuery(must_queries=[
  428. TermQuery(DOCUMENT_PRODUCT_DICT_SPECS_ID,dict_brand_id)
  429. ])
  430. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  431. SearchQuery(_query,get_total_count=True))
  432. if total_count==1:
  433. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_specs_id})
  434. self.recurse_delete_dict(dict_specs_id)
  435. dpd.delete_row(self.ots_client)
  436. _id = _d.get(DOCUMENT_PRODUCT_ID)
  437. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  438. self.rerun(_id,original_id)
  439. def process_history_by_remove_words(self,name,grade,list_name,action):
  440. assert action in [DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE]
  441. if grade==NAME_GRADE:
  442. if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
  443. for n_name in list_name:
  444. bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_NAME,MatchPhraseQuery,len(n_name),5)
  445. if bool_query is not None:
  446. _query = BoolQuery(must_queries=[
  447. TermQuery(DOCUMENT_PRODUCT_NAME,name),
  448. bool_query
  449. ])
  450. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  451. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  452. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  453. list_data = getRow_ots(rows)
  454. while next_token:
  455. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  456. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  457. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  458. list_data.extend(getRow_ots(rows))
  459. for _d in list_data:
  460. dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_NAME_ID)
  461. if dict_brand_id is not None and dict_brand_id!="":
  462. _query = BoolQuery(must_queries=[
  463. TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
  464. ])
  465. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  466. SearchQuery(_query,get_total_count=True))
  467. if total_count==1:
  468. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
  469. self.recurse_delete_dict(dict_brand_id)
  470. _id = _d.get(DOCUMENT_PRODUCT_ID)
  471. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  472. self.rerun(_id,original_id)
  473. elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
  474. self.process_history_name(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
  475. if grade==BRAND_GRADE:
  476. if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
  477. for n_name in list_name:
  478. bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_BRAND,MatchPhraseQuery,len(n_name),5)
  479. if bool_query is not None:
  480. _query = BoolQuery(must_queries=[
  481. TermQuery(DOCUMENT_PRODUCT_BRAND,name),
  482. bool_query
  483. ])
  484. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  485. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  486. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  487. list_data = getRow_ots(rows)
  488. while next_token:
  489. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  490. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  491. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_BRAND_ID],return_type=ColumnReturnType.SPECIFIED))
  492. list_data.extend(getRow_ots(rows))
  493. for _d in list_data:
  494. dict_brand_id = _d.get(DOCUMENT_PRODUCT_DICT_BRAND_ID)
  495. if dict_brand_id is not None and dict_brand_id!="":
  496. _query = BoolQuery(must_queries=[
  497. TermQuery(DOCUMENT_PRODUCT_DICT_BRAND_ID,dict_brand_id)
  498. ])
  499. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  500. SearchQuery(_query,get_total_count=True))
  501. if total_count==1:
  502. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_brand_id})
  503. self.recurse_delete_dict(dict_brand_id)
  504. dpd.delete_row(self.ots_client)
  505. _id = _d.get(DOCUMENT_PRODUCT_ID)
  506. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  507. self.rerun(_id,original_id)
  508. elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
  509. self.process_history_brand(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
  510. if grade==SPECS_GRADE:
  511. if action==DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT:
  512. for n_name in list_name:
  513. bool_query = self.make_query(n_name,DOCUMENT_PRODUCT_ORIGINAL_SPECS,MatchPhraseQuery,len(n_name),5)
  514. if bool_query is not None:
  515. _query = BoolQuery(must_queries=[
  516. TermQuery(DOCUMENT_PRODUCT_SPECS,name),
  517. bool_query
  518. ])
  519. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  520. SearchQuery(_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  521. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
  522. list_data = getRow_ots(rows)
  523. while next_token:
  524. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  525. SearchQuery(_query,next_token=next_token,limit=100,get_total_count=True),
  526. columns_to_get=ColumnsToGet([DOCUMENT_PRODUCT_ORIGINAL_ID,DOCUMENT_PRODUCT_DICT_SPECS_ID],return_type=ColumnReturnType.SPECIFIED))
  527. list_data.extend(getRow_ots(rows))
  528. for _d in list_data:
  529. dict_specs_id = _d.get(DOCUMENT_PRODUCT_DICT_SPECS_ID)
  530. if dict_specs_id is not None and dict_specs_id!="":
  531. _query = BoolQuery(must_queries=[
  532. TermQuery(DOCUMENT_PRODUCT_DICT_SPECS_ID,dict_brand_id)
  533. ])
  534. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  535. SearchQuery(_query,get_total_count=True))
  536. if total_count==1:
  537. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:dict_specs_id})
  538. self.recurse_delete_dict(dict_specs_id)
  539. dpd.delete_row(self.ots_client)
  540. _id = _d.get(DOCUMENT_PRODUCT_ID)
  541. original_id = _d.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  542. self.rerun(_id,original_id)
  543. elif action == DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE:
  544. self.process_history_specs(list_name,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
  545. def exists_records(self,name,grade,create_time):
  546. term_columns = None
  547. if grade==NAME_GRADE:
  548. term_columns = DOCUMENT_PRODUCT_NAME
  549. elif grade==BRAND_GRADE:
  550. term_columns = DOCUMENT_PRODUCT_BRAND
  551. elif grade==SPECS_GRADE:
  552. term_columns = DOCUMENT_PRODUCT_SPECS
  553. if term_columns is not None:
  554. bool_query = BoolQuery(must_queries=[
  555. TermQuery(term_columns,str(name)),
  556. RangeQuery(DOCUMENT_PRODUCT_DICT_CREATE_TIME,None,str(create_time))
  557. ])
  558. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  559. SearchQuery(bool_query,get_total_count=True,limit=1),
  560. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  561. if total_count>0:
  562. return True
  563. return False
  564. def act_insert(self,name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level):
  565. #update document_product_dict
  566. if original_id is None or original_id=="":
  567. if parent_id is None:
  568. original_id = get_document_product_dict_id("",name)
  569. else:
  570. original_id = get_document_product_dict_id(parent_id,name)
  571. if parent_id is not None and parent_id!="":
  572. _d = {DOCUMENT_PRODUCT_DICT_ID:original_id,
  573. DOCUMENT_PRODUCT_DICT_ALIAS:alias,
  574. DOCUMENT_PRODUCT_DICT_NAME:name,
  575. DOCUMENT_PRODUCT_DICT_STATUS:1,
  576. DOCUMENT_PRODUCT_DICT_GRADE:grade,
  577. DOCUMENT_PRODUCT_DICT_PARENT_ID:parent_id,
  578. DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS:standard_alias,
  579. DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
  580. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  581. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  582. _dpd = Document_product_dict(_d)
  583. _dpd.update_row(self.ots_client)
  584. # search interface if name and grade exists then update document_product_dict and return
  585. interface_id = get_document_product_dict_interface_base_id(name,grade)
  586. _interface_d = {
  587. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
  588. DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:alias,
  589. DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
  590. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300),
  591. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE,
  592. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
  593. DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:parent_id,
  594. DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:standard_alias,
  595. DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
  596. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  597. DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:remove_words,
  598. DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL:level
  599. }
  600. _dpdi = Document_product_dict_interface(_interface_d)
  601. if _dpdi.exists_row(self.ots_client):
  602. return
  603. list_name = []
  604. #update milvus
  605. Coll,_ = self.get_collection(grade)
  606. if insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias):
  607. list_name.append(name)
  608. if standard_alias is not None and standard_alias!="":
  609. list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
  610. for _alias in list_alias:
  611. _alias = _alias.strip()
  612. if len(_alias)==0:
  613. continue
  614. if _alias==name:
  615. continue
  616. list_name.append(_alias)
  617. time.sleep(PRODUCT_REDIS_CACHE_TIME)
  618. #judge whether there exists records before this record created,if not process the history data
  619. if not self.exists_records(name,grade,create_time):
  620. self.process_history_by_name(list_name,grade,"insert")
  621. _dpdi.update_row(self.ots_client)
  622. def get_updated_record(self,alias,standard_alias,remove_words,level,original_alias,original_standard_alias,original_remove_words,original_level):
  623. original_alias_set = set()
  624. original_standard_alias_set = set()
  625. original_remove_words_set = set()
  626. if original_alias is not None and original_alias!="":
  627. _split = original_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  628. for _s in _split:
  629. _s = _s.strip()
  630. if _s=="":
  631. continue
  632. original_alias_set.add(_s)
  633. if original_standard_alias is not None and original_standard_alias!="":
  634. _split = original_standard_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  635. for _s in _split:
  636. _s = _s.strip()
  637. if _s=="":
  638. continue
  639. original_standard_alias_set.add(_s)
  640. if original_remove_words is not None and original_remove_words!="":
  641. _split = original_remove_words.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  642. for _s in _split:
  643. _s = _s.strip()
  644. if _s=="":
  645. continue
  646. original_remove_words_set.add(_s)
  647. new_alias_set = set()
  648. new_standard_alias_set = set()
  649. new_remove_words_set = set()
  650. if alias is not None and alias!="":
  651. if alias[0]=="+":
  652. new_alias_set |= original_alias_set
  653. _split = alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  654. for _s in _split:
  655. _s = _s.strip()
  656. if _s=="":
  657. continue
  658. new_alias_set.add(_s)
  659. elif alias[0]=="-":
  660. new_alias_set |= original_alias_set
  661. _split = alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  662. for _s in _split:
  663. _s = _s.strip()
  664. if _s=="":
  665. continue
  666. if _s in new_alias_set:
  667. new_alias_set.remove(_s)
  668. else:
  669. _split = alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  670. for _s in _split:
  671. _s = _s.strip()
  672. if _s=="":
  673. continue
  674. new_alias_set.add(_s)
  675. else:
  676. new_alias_set = original_alias_set
  677. if standard_alias is not None and standard_alias!="":
  678. if standard_alias[0]=="+":
  679. new_standard_alias_set |= original_standard_alias_set
  680. _split = standard_alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  681. for _s in _split:
  682. _s = _s.strip()
  683. if _s=="":
  684. continue
  685. new_standard_alias_set.add(_s)
  686. elif standard_alias[0]=="-":
  687. new_standard_alias_set |= original_standard_alias_set
  688. _split = standard_alias[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  689. for _s in _split:
  690. _s = _s.strip()
  691. if _s=="":
  692. continue
  693. if _s in new_standard_alias_set:
  694. new_standard_alias_set.remove(_s)
  695. else:
  696. _split = standard_alias.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  697. for _s in _split:
  698. _s = _s.strip()
  699. if _s=="":
  700. continue
  701. new_standard_alias_set.add(_s)
  702. else:
  703. new_standard_alias_set = original_standard_alias_set
  704. if remove_words is not None and remove_words!="":
  705. if remove_words[0]=="+":
  706. new_remove_words_set |= original_remove_words_set
  707. _split = remove_words[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  708. for _s in _split:
  709. _s = _s.strip()
  710. if _s=="":
  711. continue
  712. new_remove_words_set.add(_s)
  713. elif remove_words[0]=="-":
  714. new_remove_words_set |= original_remove_words_set
  715. _split = remove_words[1:].split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  716. for _s in _split:
  717. _s = _s.strip()
  718. if _s=="":
  719. continue
  720. if _s in new_remove_words_set:
  721. new_remove_words_set.remove(_s)
  722. else:
  723. _split = remove_words.split(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR)
  724. for _s in _split:
  725. _s = _s.strip()
  726. if _s=="":
  727. continue
  728. new_remove_words_set.add(_s)
  729. else:
  730. new_remove_words_set = original_remove_words_set
  731. update_flag = False
  732. milvus_update_flag = False
  733. if len(new_alias_set&original_alias_set)!=len(new_alias_set):
  734. update_flag = True
  735. if len(new_standard_alias_set&original_remove_words_set)!=len(new_standard_alias_set):
  736. update_flag = True
  737. milvus_update_flag = True
  738. if len(new_remove_words_set&original_remove_words_set)!=len(new_remove_words_set):
  739. update_flag = True
  740. milvus_update_flag = True
  741. if str(level)!=str(original_level):
  742. update_flag = True
  743. milvus_update_flag = True
  744. return update_flag,milvus_update_flag,original_alias_set,original_standard_alias_set,original_remove_words_set,new_alias_set,new_standard_alias_set,new_remove_words_set
  745. def act_update(self,name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level):
  746. # check whether there are change variable
  747. _interface_id = get_document_product_dict_interface_base_id(name,grade)
  748. _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:_interface_id}
  749. _dpdi = Document_product_dict_interface(_d)
  750. if not _dpdi.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS,DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS,DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS,DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL],True):
  751. return
  752. original_alias = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS)
  753. original_standard_alias = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
  754. original_remove_words = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS)
  755. original_level = _dpdi.getProperties().get(DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL)
  756. update_flag,milvus_update_flag,original_alias_set,original_standard_alias_set,original_remove_words_set,new_alias_set,new_standard_alias_set,new_remove_words_set = self.get_updated_record(alias,standard_alias,remove_words,level,original_alias,original_standard_alias,original_remove_words,original_level)
  757. if not update_flag:
  758. return
  759. interface_id = get_document_product_dict_interface_base_id(name,grade)
  760. final_alias = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_alias_set))
  761. final_standard_alias = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_standard_alias_set))
  762. final_remove_words = DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS_SEPARATOR.join(list(new_remove_words_set))
  763. if parent_id is None:
  764. parent_id = ""
  765. if level is None or level=="":
  766. level = 1
  767. delete_standard_names = list(original_standard_alias_set-new_standard_alias_set)
  768. insert_standard_names = list(new_standard_alias_set-original_standard_alias_set)
  769. delete_remove_words = list(original_remove_words_set-new_remove_words_set)
  770. insert_remove_words = list(new_remove_words_set-original_remove_words_set)
  771. log("update_interface delete_standard_names:%s insert_standard_names:%s delete_remove_words:%s insert_remove_words:%s"%(str(delete_standard_names),str(insert_standard_names),str(delete_remove_words),str(insert_remove_words)))
  772. # update the milvus
  773. Coll,_ = self.get_collection(grade)
  774. if milvus_update_flag:
  775. insert_new_record_to_milvus(Coll,name,grade,parent_id,final_standard_alias,final_remove_words,level)
  776. if len(delete_standard_names)>0:
  777. for _name in delete_standard_names:
  778. delete_record_from_milvus(Coll,_name,"")
  779. time.sleep(PRODUCT_REDIS_CACHE_TIME)
  780. # update document_product_dict
  781. # update alias
  782. if len(new_alias_set&original_alias_set)!=len(new_alias_set):
  783. bool_query = BoolQuery(must_queries=[
  784. TermQuery(DOCUMENT_PRODUCT_DICT_NAME,name),
  785. TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
  786. ])
  787. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  788. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  789. ColumnsToGet(return_type=ColumnReturnType.NONE))
  790. list_data = getRow_ots(rows)
  791. log("update dict table alias %d counts"%(total_count))
  792. while next_token:
  793. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  794. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  795. ColumnsToGet(return_type=ColumnReturnType.NONE))
  796. list_data.extend(getRow_ots(rows))
  797. for _data in list_data:
  798. dpd = Document_product_dict(_data)
  799. dpd.setValue(DOCUMENT_PRODUCT_DICT_ALIAS,final_alias,True)
  800. dpd.update_row(self.ots_client)
  801. #if merge current names then update dict
  802. for _name in insert_standard_names:
  803. if _name==name:
  804. continue
  805. bool_query = BoolQuery(must_queries=[
  806. TermQuery(DOCUMENT_PRODUCT_DICT_NAME,_name),
  807. TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
  808. ])
  809. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  810. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  811. ColumnsToGet(return_type=ColumnReturnType.NONE))
  812. list_data = getRow_ots(rows)
  813. log("delete dict table %d counts"%(total_count))
  814. while next_token:
  815. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  816. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  817. ColumnsToGet(return_type=ColumnReturnType.NONE))
  818. list_data.extend(getRow_ots(rows))
  819. for _data in list_data:
  820. dpd = Document_product_dict(_data)
  821. _id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
  822. log("delete id:%s"%(_id))
  823. self.recurse_delete_dict(_id)
  824. dpd.delete_row(self.ots_client)
  825. face_id = get_document_product_dict_interface_base_id(_name,grade)
  826. _interface_d = {
  827. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:face_id,
  828. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(401,451)
  829. }
  830. _dpdi = Document_product_dict_interface(_interface_d)
  831. if _dpdi.exists_row(self.ots_client):
  832. _dpdi.update_row(self.ots_client)
  833. # process history
  834. if len(delete_standard_names)>0:
  835. self.process_history_by_standard_name(name,grade,delete_standard_names,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE)
  836. if len(insert_standard_names)>0:
  837. self.process_history_by_standard_name(name,grade,insert_standard_names,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
  838. if len(delete_remove_words)>0:
  839. self.process_history_by_remove_words(name,grade,delete_remove_words,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_DELETE)
  840. if len(insert_remove_words)>0:
  841. self.process_history_by_remove_words(name,grade,insert_remove_words,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_INSERT)
  842. _interface_d = {
  843. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
  844. DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:final_alias,
  845. DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
  846. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300),
  847. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE,
  848. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
  849. DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:parent_id,
  850. DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:final_standard_alias,
  851. DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
  852. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  853. DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:final_remove_words,
  854. DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL:level
  855. }
  856. _dpdi = Document_product_dict_interface(_interface_d)
  857. _dpdi.update_row(self.ots_client)
  858. def recurse_update_dict(self,parent_id,new_parent_id):
  859. bool_query = BoolQuery(must_queries=[
  860. TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,parent_id)
  861. ])
  862. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  863. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
  864. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  865. list_data = getRow_ots(rows)
  866. while next_token:
  867. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  868. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  869. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  870. list_data.extend(getRow_ots(rows))
  871. for _data in list_data:
  872. dpd = Document_product_dict(_data)
  873. old_id = dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_ID)
  874. new_id = get_document_product_dict_id(new_parent_id,dpd.getProperties().get(DOCUMENT_PRODUCT_DICT_NAME))
  875. self.recurse_update_dict(old_id,new_id)
  876. dpd.setValue(DOCUMENT_PRODUCT_DICT_PARENT_ID,new_parent_id,True)
  877. dpd.setValue(DOCUMENT_PRODUCT_DICT_ID,new_id,True)
  878. dpd.update_row(self.ots_client)
  879. dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:old_id})
  880. dpd.delete_row(self.ots_client)
  881. def act_delete(self,name,alias,grade,original_id,parent_id,standard_alias,create_time):
  882. #search records which name=name and grade=grade
  883. bool_query = BoolQuery(must_queries=[
  884. TermQuery(DOCUMENT_PRODUCT_DICT_NAME,str(name)),
  885. TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
  886. ])
  887. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  888. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  889. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  890. if total_count==0:
  891. return
  892. list_data = getRow_ots(rows)
  893. while next_token:
  894. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  895. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  896. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  897. list_data.extend(getRow_ots(rows))
  898. interface_id = get_document_product_dict_interface_base_id(name,grade)
  899. #delete milvus records
  900. Coll,_ = self.get_collection(grade)
  901. delete_record_from_milvus(Coll,name,standard_alias)
  902. time.sleep(PRODUCT_REDIS_CACHE_TIME)
  903. #process_history data
  904. self.process_history_by_name([name],grade,"delete")
  905. #delete document_product_dict
  906. log("delete document_product_dict name:%s grade:%s count:%s"%(str(name),str(grade),str(len(list_data))))
  907. for _data in list_data:
  908. id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
  909. self.recurse_delete_dict(id)
  910. _d = {DOCUMENT_PRODUCT_DICT_ID:id}
  911. dpd = Document_product_dict(_d)
  912. dpd.delete_row(self.ots_client)
  913. _interface_d = {
  914. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:interface_id,
  915. }
  916. _dpdi = Document_product_dict_interface(_interface_d)
  917. _dpdi.delete_row(self.ots_client)
  918. def recurse_delete_dict(self,id):
  919. bool_query = BoolQuery(must_queries=[
  920. TermQuery(DOCUMENT_PRODUCT_DICT_PARENT_ID,id)
  921. ])
  922. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  923. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_PARENT_ID)]),limit=100,get_total_count=True),
  924. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  925. list_data = getRow_ots(rows)
  926. while next_token:
  927. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  928. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  929. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  930. list_data.extend(getRow_ots(rows))
  931. for _data in list_data:
  932. _id = _data.get(DOCUMENT_PRODUCT_DICT_ID)
  933. self.recurse_delete_dict(_id)
  934. dpd = Document_product_dict(_data)
  935. dpd.delete_row(self.ots_client)
  936. def embedding_interface_producer(self):
  937. bool_query = BoolQuery(must_queries=[
  938. # TermQuery("name",'济南鑫驰'),
  939. RangeQuery("status",1,50,True,True)
  940. ])
  941. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
  942. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)]),limit=100,get_total_count=True),
  943. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  944. list_data = getRow_ots(rows)
  945. for _data in list_data:
  946. self.queue_product_interface.put(_data)
  947. while next_token:
  948. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
  949. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  950. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  951. list_data = getRow_ots(rows)
  952. for _data in list_data:
  953. self.queue_product_interface.put(_data)
  954. if self.queue_product_dict.qsize()>1000:
  955. break
  956. log("embedding interface total_count %d"%(total_count))
  957. def embedding_interface_comsumer(self):
  958. def _handle(item,result_queue):
  959. id = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ID)
  960. action = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION)
  961. name = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_NAME)
  962. alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS)
  963. grade = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE)
  964. original_id = item.get(DOCUMENT_PRODUCT_DICT_ORIGINAL_ID)
  965. parent_id = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID,"")
  966. standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
  967. create_time = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME)
  968. remove_words = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS,'')
  969. level = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_LEVEL,1)
  970. if name is not None and len(name)>1 and len(name)<MAX_NAME_LENGTH:
  971. if action=="insert":
  972. self.act_insert(name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level)
  973. elif action=="update":
  974. self.act_update(name,alias,grade,original_id,parent_id,standard_alias,create_time,remove_words,level)
  975. elif action=="delete":
  976. self.act_delete(name,alias,grade,original_id,parent_id,standard_alias,create_time)
  977. _pdi = Document_product_dict_interface({DOCUMENT_PRODUCT_DICT_INTERFACE_ID:id,
  978. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(201,300)})
  979. _pdi.update_row(self.ots_client)
  980. self.embedding_interface_producer()
  981. mt = MultiThreadHandler(self.queue_product_interface,_handle,None,20,1)
  982. mt.run()
  983. def start_embedding_product_dict(self):
  984. from apscheduler.schedulers.blocking import BlockingScheduler
  985. scheduler = BlockingScheduler()
  986. # scheduler.add_job(func=self.embedding_producer,trigger="cron",minute="*/1")
  987. scheduler.add_job(func=self.embedding_comsumer,trigger="cron",second="*/5")
  988. scheduler.start()
  989. def delete_collections(self):
  990. drop_embedding_collection(self.collection_name_name)
  991. drop_embedding_collection(self.collection_name_brand)
  992. drop_embedding_collection(self.collection_name_specs)
  993. def start_embedding_product_dict():
  994. pdm = Product_Dict_Manager()
  995. pdm.start_embedding_product_dict()
  996. def drop_product_dict_collections():
  997. pdm = Product_Dict_Manager()
  998. pdm.delete_collections()
  999. def search_similar():
  1000. task_queue = Queue()
  1001. ots_client = getConnect_ots()
  1002. pdm = Product_Dict_Manager()
  1003. list_data = []
  1004. columns=[DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_PARENT_ID,DOCUMENT_PRODUCT_DICT_GRADE]
  1005. bool_query = BoolQuery(must_queries=[
  1006. RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,4,5,True,True),
  1007. ])
  1008. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1009. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
  1010. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  1011. list_dict = getRow_ots(rows)
  1012. for _d in list_dict:
  1013. list_data.append(_d)
  1014. while next_token:
  1015. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1016. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1017. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  1018. list_dict = getRow_ots(rows)
  1019. for _d in list_dict:
  1020. list_data.append(_d)
  1021. # if len(list_data)>=1000:
  1022. # break
  1023. log("product_dict embedding total_count:%d"%total_count)
  1024. set_key = set()
  1025. for _d in list_data:
  1026. name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
  1027. grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
  1028. _key = "%s-%d"%(name,grade)
  1029. if _key in set_key:
  1030. continue
  1031. set_key.add(_key)
  1032. task_queue.put(_d)
  1033. result_queue = Queue()
  1034. def handle(item,result_queue):
  1035. id = item.get(DOCUMENT_PRODUCT_DICT_ID)
  1036. name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
  1037. vector = get_embedding_request(name)
  1038. parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID)
  1039. grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
  1040. Coll,Coll_name = pdm.get_collection(grade)
  1041. output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name"]
  1042. if vector is not None and Coll is not None:
  1043. search_list = get_embedding_search(Coll,embedding_index_name,name,grade,[vector],pdm.search_params,output_fields,limit=10)
  1044. for _item in search_list:
  1045. ots_id = _item.get("ots_id")
  1046. ots_name = _item.get("ots_name")
  1047. ots_parent_id = _item.get("ots_parent_id")
  1048. standard_name = _item.get("standard_name")
  1049. if grade==4:
  1050. if check_brand(name,ots_name):
  1051. _d = {"source_id":id,"source_name":name,"grade":grade,"target_id":ots_id,"target_name":ots_name,"parent_id":parent_id,"target_parent_id":ots_parent_id,"target_standard_name":standard_name}
  1052. result_queue.put(_d)
  1053. elif grade==5:
  1054. if is_similar(name,ots_name) and check_specs(name,ots_name):
  1055. _d = {"source_id":id,"source_name":name,"grade":grade,"target_id":ots_id,"target_name":ots_name,"parent_id":parent_id,"target_parent_id":ots_parent_id,"target_standard_name":standard_name}
  1056. result_queue.put(_d)
  1057. mt = MultiThreadHandler(task_queue,handle,result_queue,5,1)
  1058. mt.run()
  1059. df_data = {}
  1060. _set = set()
  1061. df_columns = ["source_id","source_name","grade","parent_id","target_id","target_name","parent_parent_id"]
  1062. while 1:
  1063. try:
  1064. item = result_queue.get(timeout=1)
  1065. source_name = item.get("source_name")
  1066. target_name = item.get("target_name")
  1067. _key1 = "%s-%s"%(source_name,target_name)
  1068. _key2 = "%s-%s"%(target_name,source_name)
  1069. for c in df_columns:
  1070. if c not in df_data:
  1071. df_data[c] = []
  1072. df_data[c].append(getLegal_str(item.get(c)))
  1073. except Exception as e:
  1074. break
  1075. import pandas as pd
  1076. df = pd.DataFrame(df_data)
  1077. df.to_excel("search_similar2.xlsx",columns=df_columns)
  1078. def clean_similar():
  1079. import pandas as pd
  1080. filename = "../../test/search_similar2_1.xlsx"
  1081. df = pd.read_excel(filename)
  1082. _set = set()
  1083. list_source_name = []
  1084. list_grade = []
  1085. list_target_name = []
  1086. list_check = []
  1087. brand_set_move = set()
  1088. brand_set_keep = set()
  1089. specs_set_move = set()
  1090. specs_set_keep = set()
  1091. for source_name,grade,target_name in zip(df["source_name"],df["grade"],df["target_name"]):
  1092. source_name = str(source_name)
  1093. target_name = str(target_name)
  1094. if source_name==target_name:
  1095. continue
  1096. _key1 = "%s-%s"%(source_name,target_name)
  1097. _key2 = "%s--%s"%(target_name,source_name)
  1098. if _key1 in _set or _key2 in _set:
  1099. continue
  1100. _set.add(_key1)
  1101. _set.add(_key2)
  1102. list_source_name.append(source_name)
  1103. list_grade.append(grade)
  1104. list_target_name.append(target_name)
  1105. if grade==4:
  1106. _check = check_brand(source_name,target_name)
  1107. elif grade==5:
  1108. _check = is_similar(source_name,target_name) and check_specs(source_name,target_name)
  1109. list_check.append(_check)
  1110. if _check:
  1111. if grade==4:
  1112. n_source_name = re.sub("省|市|县|集团|股份|有限|责任|公司",'',str(source_name))
  1113. n_target_name = re.sub("省|市|县|集团|股份|有限|责任|公司",'',str(target_name))
  1114. else:
  1115. n_source_name = source_name
  1116. n_target_name = target_name
  1117. source_dis = abs(len(n_source_name)-4.6)
  1118. target_dis = abs(len(n_target_name)-4.6)
  1119. if source_dis>target_dis:
  1120. if grade==4:
  1121. brand_set_keep.add(target_name)
  1122. brand_set_move.add(source_name)
  1123. elif grade==5:
  1124. specs_set_keep.add(target_name)
  1125. specs_set_move.add(source_name)
  1126. else:
  1127. if grade==4:
  1128. brand_set_keep.add(source_name)
  1129. brand_set_move.add(target_name)
  1130. elif grade==5:
  1131. specs_set_keep.add(source_name)
  1132. specs_set_move.add(target_name)
  1133. df = pd.DataFrame({"source_name":list_source_name,
  1134. "grade":list_grade,
  1135. "target_name":list_target_name,
  1136. "check":list_check})
  1137. df.to_excel("%s_clean.xlsx"%(filename),columns=["source_name","grade","target_name","check"])
  1138. list_brand_move = list(brand_set_move)
  1139. list_brand_keep = list(brand_set_keep)
  1140. list_brand_union = list(brand_set_move&brand_set_keep)
  1141. list_specs_move = list(specs_set_move)
  1142. list_specs_keep = list(specs_set_keep)
  1143. list_specs_union = list(specs_set_move&specs_set_keep)
  1144. with open("%s_brand_move.txt"%(filename),"w",encoding="utf8") as f:
  1145. for _move in list_brand_move:
  1146. f.write("%s\n"%(_move))
  1147. with open("%s_brand_keep.txt"%(filename),"w",encoding="utf8") as f:
  1148. for _keep in list_brand_keep:
  1149. f.write("%s\n"%(_keep))
  1150. with open("%s_brand_union.txt"%(filename),"w",encoding="utf8") as f:
  1151. for _union in list_brand_union:
  1152. f.write("%s\n"%(_union))
  1153. with open("%s_specs_move.txt"%(filename),"w",encoding="utf8") as f:
  1154. for _move in list_specs_move:
  1155. f.write("%s\n"%(_move))
  1156. with open("%s_specs_keep.txt"%(filename),"w",encoding="utf8") as f:
  1157. for _keep in list_specs_keep:
  1158. f.write("%s\n"%(_keep))
  1159. with open("%s_specs_union.txt"%(filename),"w",encoding="utf8") as f:
  1160. for _union in list_specs_union:
  1161. f.write("%s\n"%(_union))
  1162. def insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words="",level=1,wait_sync=True):
  1163. n_name = get_milvus_standard_name(name)
  1164. name_id = get_milvus_product_dict_id(n_name)
  1165. vector = request_embedding(n_name)
  1166. log("insert name %s grade %d"%(name,grade))
  1167. if vector is not None and Coll is not None:
  1168. expr = " ots_id in ['%s']"%name_id
  1169. Coll.delete(expr)
  1170. data = [[name_id],
  1171. [name],
  1172. [name],
  1173. [name_id],
  1174. [vector],
  1175. [parent_id],
  1176. [grade],
  1177. [remove_words],
  1178. [level]
  1179. ]
  1180. insert_embedding(Coll,data)
  1181. if standard_alias is not None and standard_alias!="":
  1182. list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
  1183. for _alias in list_alias:
  1184. _alias = _alias.strip()
  1185. if len(_alias)==0:
  1186. continue
  1187. if _alias==name:
  1188. continue
  1189. _id = get_document_product_dict_standard_alias_id(_alias)
  1190. expr = " ots_id in ['%s']"%_id
  1191. Coll.delete(expr)
  1192. n_alias = get_milvus_standard_name(_alias)
  1193. vector = request_embedding(n_alias)
  1194. data = [[_id],
  1195. [_alias],
  1196. [name],
  1197. [name_id],
  1198. [vector],
  1199. [parent_id],
  1200. [grade],
  1201. [remove_words],
  1202. [level]
  1203. ]
  1204. insert_embedding(Coll,data)
  1205. if wait_sync:
  1206. while 1:
  1207. try:
  1208. log("milvus insert wait for done")
  1209. list_result = Coll.query(expr=expr,output_fields=["standard_name"])
  1210. log("list_result"+str(list_result)+str(type(list_result[0])))
  1211. if len(list_result)==1:
  1212. if list_result[0].get("standard_name","")==name:
  1213. log("milvus insert done")
  1214. return True
  1215. time.sleep(1)
  1216. except Exception as e:
  1217. traceback.print_exc()
  1218. else:
  1219. return True
  1220. def delete_record_from_milvus(Coll,name,standard_alias):
  1221. n_name = get_milvus_standard_name(name)
  1222. name_id = get_milvus_product_dict_id(n_name)
  1223. log("delete name %s standard_alias %s"%(str(name),str(standard_alias)))
  1224. expr = " ots_id in ['%s']"%name_id
  1225. Coll.delete(expr)
  1226. if standard_alias is not None and standard_alias!="":
  1227. list_alias = standard_alias.split(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS_SEPARATOR)
  1228. for _alias in list_alias:
  1229. _alias = _alias.strip()
  1230. if len(_alias)==0:
  1231. continue
  1232. if _alias==name:
  1233. continue
  1234. _id = get_document_product_dict_standard_alias_id(_alias)
  1235. expr = " ots_id in ['%s']"%_id
  1236. Coll.delete(expr)
  1237. while 1:
  1238. if len(Coll.query(expr=expr))==0:
  1239. return
  1240. else:
  1241. log("milvus delete wait for done")
  1242. time.sleep(1)
  1243. def dict_interface_delete(name,grade,ots_client = getConnect_ots()):
  1244. from uuid import uuid4
  1245. _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
  1246. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
  1247. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
  1248. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
  1249. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"delete",
  1250. DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  1251. dpdi = Document_product_dict_interface(_d)
  1252. dpdi.update_row(ots_client)
  1253. def interface_insert():
  1254. from uuid import uuid4
  1255. a = '''
  1256. '''
  1257. grade = 4
  1258. new_standard_alias = ""
  1259. new_remove_words = ""
  1260. list_brand = []
  1261. ots_client=getConnect_ots()
  1262. for s in re.split("[\n\s,.,。、]",a):
  1263. s = s.strip()
  1264. if s=="":
  1265. continue
  1266. list_brand.append(s)
  1267. grade = 4
  1268. for brand in list_brand:
  1269. print(brand)
  1270. _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:brand,
  1271. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
  1272. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
  1273. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
  1274. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert",
  1275. DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
  1276. DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:new_remove_words,
  1277. DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  1278. dpdi = Document_product_dict_interface(_d)
  1279. dpdi.update_row(ots_client)
  1280. def interface_deletes():
  1281. a = '''
  1282. 株式会社
  1283. '''
  1284. grade = 4
  1285. ots_client=getConnect_ots()
  1286. for s in re.split("[\n\s]",a):
  1287. s = s.strip()
  1288. if s=="":
  1289. continue
  1290. print(s)
  1291. dict_interface_delete(s,grade,ots_client)
  1292. def interface_update():
  1293. name = "万东"
  1294. new_standard_alias = "+万东康源|北京万东"
  1295. new_remove_words = ""
  1296. grade = 4
  1297. ots_client = getConnect_ots()
  1298. from uuid import uuid4
  1299. _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
  1300. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
  1301. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
  1302. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
  1303. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
  1304. DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
  1305. DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:new_remove_words,
  1306. DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  1307. dpdi = Document_product_dict_interface(_d)
  1308. dpdi.update_row(ots_client)
  1309. def interface_brand_update_by_file():
  1310. import pandas as pd
  1311. import re
  1312. filename = "../../test/品牌合并.xlsx"
  1313. df0 = pd.read_excel(filename,0)
  1314. df1 = pd.read_excel(filename,1)
  1315. set_source_brand = set()
  1316. for b in df0["brands"]:
  1317. if b is None or b=="":
  1318. continue
  1319. list_brand = b.split(",")
  1320. for brand in list_brand:
  1321. brand = brand.strip()
  1322. if brand=="":
  1323. continue
  1324. set_source_brand.add(brand)
  1325. target_brand = df1["brand"]
  1326. target_standard_alias = df1["standard_alias"]
  1327. _check_flag = True
  1328. list_target = []
  1329. for tbrand,standard_alias in zip(target_brand,target_standard_alias):
  1330. brand = tbrand.strip()
  1331. if brand not in set_source_brand:
  1332. print("not in source:%s"%(brand))
  1333. _check_flag = False
  1334. if standard_alias is None or standard_alias=="" or str(standard_alias)=="nan":
  1335. continue
  1336. list_brand = re.split("[,,]",standard_alias)
  1337. set_alias = set()
  1338. for brand in list_brand:
  1339. brand = brand.strip()
  1340. if brand=="":
  1341. continue
  1342. if brand not in set_source_brand:
  1343. print("not in source:%s"%(brand))
  1344. _check_flag = False
  1345. set_alias.add(brand)
  1346. _d = {"brand":tbrand.strip(),
  1347. "standard_alias":"+"+"|".join(list(set_alias))}
  1348. list_target.append(_d)
  1349. if _check_flag or 1:
  1350. grade = 4
  1351. ots_client = getConnect_ots()
  1352. from uuid import uuid4
  1353. for target in list_target:
  1354. name = target["brand"]
  1355. new_standard_alias = target["standard_alias"]
  1356. _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
  1357. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
  1358. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
  1359. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
  1360. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
  1361. DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
  1362. DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:"",
  1363. DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  1364. dpdi = Document_product_dict_interface(_d)
  1365. dpdi.update_row(ots_client)
  1366. print(list_target)
  1367. def clean_brands():
  1368. from queue import Queue as TQueue
  1369. task_queue = TQueue()
  1370. ots_client = getConnect_ots()
  1371. list_data = []
  1372. table_name = Document_product_dict_interface_table_name
  1373. table_index = table_name+"_index"
  1374. columns=[DOCUMENT_PRODUCT_DICT_INTERFACE_NAME,DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE]
  1375. bool_query = BoolQuery(must_queries=[
  1376. TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
  1377. RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,4,4,True,True),
  1378. ])
  1379. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  1380. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
  1381. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  1382. list_dict = getRow_ots(rows)
  1383. for _d in list_dict:
  1384. list_data.append(_d)
  1385. while next_token:
  1386. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  1387. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1388. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  1389. list_dict = getRow_ots(rows)
  1390. for _d in list_dict:
  1391. list_data.append(_d)
  1392. # if len(list_data)>=1000:
  1393. # break
  1394. log("product_dict embedding total_count:%d"%total_count)
  1395. set_key = set()
  1396. list_process_data = []
  1397. set_brand = set()
  1398. for _d in list_data:
  1399. name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
  1400. grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
  1401. _key = "%s-%d"%(name,grade)
  1402. if _key in set_key:
  1403. continue
  1404. set_key.add(_key)
  1405. task_queue.put(_d)
  1406. list_process_data.append(_d)
  1407. if grade==BRAND_GRADE:
  1408. set_brand.add(name)
  1409. def _handle(item,result_queue):
  1410. name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
  1411. _legal = is_legal_brand(ots_client,name)
  1412. if is_legal_brand(ots_client,name):
  1413. item["legal"] = 1
  1414. elif _legal==False:
  1415. item["legal"] = 0
  1416. else:
  1417. item["legal"] = 0
  1418. bool_query = BoolQuery(must_queries=[
  1419. TermQuery("brand",name)
  1420. ])
  1421. rows,next_token,total_count,is_all_succeed = ots_client.search("document_product","document_product_index",
  1422. SearchQuery(bool_query,get_total_count=True))
  1423. if total_count>=2:
  1424. item["legal"] = 1
  1425. else:
  1426. bool_query = BoolQuery(must_queries=[
  1427. NestedQuery("products",WildcardQuery("products.brand",name)),
  1428. ])
  1429. rows,next_token,total_count,is_all_succeed = ots_client.search("document","document_index",
  1430. SearchQuery(bool_query,get_total_count=True))
  1431. if total_count>=1:
  1432. item["legal"] = 1
  1433. else:
  1434. item["legal"] = 0
  1435. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1436. mt.run()
  1437. list_legal = []
  1438. list_illegal = []
  1439. for _data in list_process_data:
  1440. name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
  1441. legal = _data["legal"]
  1442. if legal==1:
  1443. list_legal.append(name)
  1444. else:
  1445. list_illegal.append(name)
  1446. with open("../../test/legal_brand.txt", "w", encoding="utf8") as f:
  1447. for _name in list_legal:
  1448. f.write("%s\n"%(_name))
  1449. with open("../../test/illegal_brand.txt", "w", encoding="utf8") as f:
  1450. for _name in list_illegal:
  1451. f.write("%s\n"%(_name))
  1452. def merge_brands():
  1453. from queue import Queue as TQueue
  1454. import pandas as pd
  1455. task_queue = TQueue()
  1456. ots_client = getConnect_ots()
  1457. list_data = []
  1458. table_name = Document_product_dict_interface_table_name
  1459. table_index = table_name+"_index"
  1460. columns=[DOCUMENT_PRODUCT_DICT_INTERFACE_NAME,DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS]
  1461. bool_query = BoolQuery(must_queries=[
  1462. TermQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION,DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION_BASE),
  1463. RangeQuery(DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE,4,4,True,True),
  1464. ])
  1465. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  1466. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED)]),limit=100,get_total_count=True),
  1467. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  1468. list_dict = getRow_ots(rows)
  1469. for _d in list_dict:
  1470. list_data.append(_d)
  1471. while next_token:
  1472. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  1473. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1474. columns_to_get=ColumnsToGet(columns,ColumnReturnType.SPECIFIED))
  1475. list_dict = getRow_ots(rows)
  1476. for _d in list_dict:
  1477. list_data.append(_d)
  1478. # if len(list_data)>=1000:
  1479. # break
  1480. log("product_dict embedding total_count:%d"%total_count)
  1481. set_key = set()
  1482. list_process_data = []
  1483. set_brand = set()
  1484. for _d in list_data:
  1485. name = _d.get(DOCUMENT_PRODUCT_DICT_NAME)
  1486. grade = _d.get(DOCUMENT_PRODUCT_DICT_GRADE)
  1487. _key = "%s-%d"%(name,grade)
  1488. if _key in set_key:
  1489. continue
  1490. set_key.add(_key)
  1491. task_queue.put(_d)
  1492. list_process_data.append(_d)
  1493. if grade==BRAND_GRADE:
  1494. set_brand.add(name)
  1495. area_set = get_area_set()
  1496. def _handle(item,result_queue):
  1497. name = item.get(DOCUMENT_PRODUCT_DICT_NAME)
  1498. grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
  1499. for i in range(min(len(name)-2,8)):
  1500. _n = name[:i+1]
  1501. if _n in area_set:
  1502. n_name = re.sub("^[省市区]]",'',name[i+1:])
  1503. if n_name in set_brand:
  1504. item["belongs_to"] = n_name
  1505. standard_alias = item.get(DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS)
  1506. if standard_alias is not None and standard_alias!="":
  1507. for salias in standard_alias.split("|"):
  1508. face_id = get_document_product_dict_interface_base_id(salias,grade)
  1509. _interface_d = {
  1510. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:face_id,
  1511. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:randint(401,451)
  1512. }
  1513. _dpdi = Document_product_dict_interface(_interface_d)
  1514. if _dpdi.exists_row(ots_client):
  1515. _dpdi.update_row(ots_client)
  1516. mt = MultiThreadHandler(task_queue,_handle,None,20)
  1517. mt.run()
  1518. dict_belongs_alias = {}
  1519. for _data in list_process_data:
  1520. name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
  1521. belongs_to = _data.get("belongs_to")
  1522. if belongs_to is not None:
  1523. if belongs_to not in dict_belongs_alias:
  1524. dict_belongs_alias[belongs_to] = []
  1525. dict_belongs_alias[belongs_to].append(name)
  1526. df_data = {"brand":[],"standard_alias":[]}
  1527. for k,v in dict_belongs_alias.items():
  1528. df_data["brand"].append(k)
  1529. df_data["standard_alias"].append("|".join(v))
  1530. df = pd.DataFrame(df_data)
  1531. df.to_excel("../../merge.xlsx",columns=["brand","standard_alias"])
  1532. # grade = 4
  1533. # ots_client = getConnect_ots()
  1534. # from uuid import uuid4
  1535. # for k,v in dict_belongs_alias.items():
  1536. # name = k
  1537. # new_standard_alias = "+%s"%("|".join(v))
  1538. # print(k,new_standard_alias)
  1539. # _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:name,
  1540. # DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
  1541. # DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
  1542. # DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
  1543. # DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"update",
  1544. # DOCUMENT_PRODUCT_DICT_INTERFACE_STANDARD_ALIAS:new_standard_alias,
  1545. # DOCUMENT_PRODUCT_DICT_INTERFACE_REMOVE_WORDS:"",
  1546. # DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S")}
  1547. # dpdi = Document_product_dict_interface(_d)
  1548. # dpdi.update_row(ots_client)
  1549. def interface_delete_brands():
  1550. from uuid import uuid4
  1551. ots_client=getConnect_ots()
  1552. list_brand = []
  1553. a = '''
  1554. 日本
  1555. '''
  1556. grade = 4
  1557. for s in re.split("[\n\s,.,。、]",a):
  1558. s = s.strip()
  1559. if s=="":
  1560. continue
  1561. list_brand.append(s)
  1562. with open("../../test/illegal_brand.txt","r",encoding="utf8") as f:
  1563. while 1:
  1564. brand = f.readline()
  1565. if not brand:
  1566. break
  1567. brand = brand.strip()
  1568. if brand!="":
  1569. list_brand.append(brand)
  1570. for brand in list_brand:
  1571. print(brand)
  1572. _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:brand,
  1573. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
  1574. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:grade,
  1575. DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
  1576. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"delete",
  1577. DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  1578. "is_temp":1
  1579. }
  1580. dpdi = Document_product_dict_interface(_d)
  1581. dpdi.update_row(ots_client)
  1582. def clean_interface_delete_temp():
  1583. ots_client = getConnect_ots()
  1584. table_name = Document_product_dict_interface_table_name
  1585. table_index = table_name+"_index"
  1586. columns = ["is_temp","status","name"]
  1587. task_queue = Queue()
  1588. bool_query = BoolQuery(must_queries=[
  1589. TermQuery("action","delete")
  1590. ])
  1591. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  1592. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
  1593. columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1594. list_data = getRow_ots(rows)
  1595. for _data in list_data:
  1596. task_queue.put(_data)
  1597. print("%d/%d"%(task_queue.qsize(),total_count))
  1598. while next_token:
  1599. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  1600. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1601. columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1602. list_data = getRow_ots(rows)
  1603. for _data in list_data:
  1604. task_queue.put(_data)
  1605. print("%d/%d"%(task_queue.qsize(),total_count))
  1606. def _handle(item,result_queue):
  1607. is_temp = item.get("is_temp",0)
  1608. status = item.get("status",0)
  1609. name = item.get("name")
  1610. dpdi = Document_product_dict_interface(item)
  1611. if is_temp==1 and status>=201:
  1612. dpdi.delete_row(ots_client)
  1613. else:
  1614. pass
  1615. # dpdi.setValue("status",1,True)
  1616. # dpdi.update_row(ots_client)
  1617. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1618. mt.run()
  1619. def clean_product_dict():
  1620. ots_client = getConnect_ots()
  1621. bool_query = BoolQuery(must_queries=[
  1622. RangeQuery("grade",3)
  1623. ])
  1624. task_queue = Queue()
  1625. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1626. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
  1627. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1628. list_data = getRow_ots(rows)
  1629. for _data in list_data:
  1630. task_queue.put(_data)
  1631. print("%d/%d"%(task_queue.qsize(),total_count))
  1632. while next_token:
  1633. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1634. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1635. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1636. list_data = getRow_ots(rows)
  1637. for _data in list_data:
  1638. task_queue.put(_data)
  1639. print("%d/%d"%(task_queue.qsize(),total_count))
  1640. def _handle(item,result_queue):
  1641. _dpd = Document_product_dict(item)
  1642. _dpd.delete_row(ots_client)
  1643. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1644. mt.run()
  1645. def clean_product_dict_interface():
  1646. ots_client = getConnect_ots()
  1647. bool_query = BoolQuery(must_queries=[
  1648. BoolQuery(should_queries=[
  1649. TermQuery("action","insert"),
  1650. TermQuery("action","base")
  1651. ])
  1652. ])
  1653. task_queue = Queue()
  1654. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
  1655. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
  1656. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1657. list_data = getRow_ots(rows)
  1658. for _data in list_data:
  1659. task_queue.put(_data)
  1660. print("%d/%d"%(task_queue.qsize(),total_count))
  1661. while next_token:
  1662. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
  1663. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1664. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1665. list_data = getRow_ots(rows)
  1666. for _data in list_data:
  1667. task_queue.put(_data)
  1668. print("%d/%d"%(task_queue.qsize(),total_count))
  1669. def _handle(item,result_queue):
  1670. _dpd = Document_product_dict_interface(item)
  1671. _dpd.delete_row(ots_client)
  1672. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1673. mt.run()
  1674. def rerun_interface_deletes():
  1675. ots_client = getConnect_ots()
  1676. table_name = Document_product_dict_interface_table_name
  1677. table_index = table_name+"_index"
  1678. columns = ["is_temp","status","name"]
  1679. task_queue = Queue()
  1680. bool_query = BoolQuery(must_queries=[
  1681. TermQuery("action","delete")
  1682. ])
  1683. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  1684. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
  1685. columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1686. list_data = getRow_ots(rows)
  1687. for _data in list_data:
  1688. task_queue.put(_data)
  1689. print("%d/%d"%(task_queue.qsize(),total_count))
  1690. while next_token:
  1691. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  1692. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1693. columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  1694. list_data = getRow_ots(rows)
  1695. for _data in list_data:
  1696. task_queue.put(_data)
  1697. print("%d/%d"%(task_queue.qsize(),total_count))
  1698. def _handle(item,result_queue):
  1699. status = item.get("status",0)
  1700. dpdi = Document_product_dict_interface(item)
  1701. dpdi.setValue("status",1,True)
  1702. dpdi.update_row(ots_client)
  1703. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1704. mt.run()
  1705. if __name__ == '__main__':
  1706. # start_embedding_product_dict()
  1707. interface_deletes()
  1708. interface_insert()
  1709. # interface_update()
  1710. # interface_brand_update_by_file()
  1711. # clean_similar()
  1712. # clean_brands()
  1713. # merge_brands()
  1714. # interface_delete_brands()
  1715. # clean_interface_delete_temp()
  1716. # clean_product_dict()
  1717. # clean_product_dict_interface()
  1718. # rerun_interface_deletes()