products.py 74 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549
  1. from BaseDataMaintenance.common.documentFingerprint import getMD5
  2. from BaseDataMaintenance.common.Utils import *
  3. from BaseDataMaintenance.common.milvusUtil import *
  4. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  5. from BaseDataMaintenance.maintenance.product.productUtils import *
  6. from BaseDataMaintenance.model.ots.document_product_tmp import *
  7. from BaseDataMaintenance.model.ots.document_product import *
  8. from BaseDataMaintenance.model.ots.document_product_dict import *
  9. from BaseDataMaintenance.model.ots.document_product_dict_interface import *
  10. from BaseDataMaintenance.model.ots.document import *
  11. from BaseDataMaintenance.model.ots.attachment import *
  12. from BaseDataMaintenance.model.ots.enterprise import *
  13. from BaseDataMaintenance.model.ots.project import *
  14. from tablestore import *
  15. from BaseDataMaintenance.dataSource.source import getConnect_ots
  16. from multiprocessing import Process,Queue
  17. from random import randint
  18. from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
  19. from apscheduler.schedulers.blocking import BlockingScheduler
  20. from BaseDataMaintenance.maintenance.product.make_brand_pattern import *
  21. from BaseDataMaintenance.maintenance.product.product_dict import *
  22. import logging
  23. root = logging.getLogger()
  24. root.setLevel(logging.INFO)
  25. from uuid import uuid4
  26. from multiprocessing import Queue as PQueue
  27. class Product_Manager(Product_Dict_Manager):
  28. def __init__(self):
  29. super(Product_Manager, self).__init__()
  30. self.process_queue = PQueue()
  31. self.ots_client = getConnect_ots()
  32. self.set_id = set()
  33. def get_product_id(self,docid,name,brand,specs,unit_price,quantity):
  34. if name is None:
  35. name = ""
  36. if brand is None:
  37. brand = ""
  38. if specs is None:
  39. specs = ""
  40. if quantity is None:
  41. quantity = ""
  42. if unit_price is None or unit_price=="":
  43. unit_price = ""
  44. else:
  45. unit_price = "%.2f"%float(unit_price)
  46. product_id = getMD5(str(docid)+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity))
  47. return product_id
  48. def producer(self,process_count=3000):
  49. q_size = self.process_queue.qsize()
  50. if q_size>process_count/6:
  51. return
  52. bool_query = BoolQuery(must_queries=[RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,1,51)])
  53. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
  54. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  55. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  56. list_data = getRow_ots(rows)
  57. _count = len(list_data)
  58. log("producer %d/%d"%(q_size,total_count))
  59. list_id = []
  60. for _d in list_data:
  61. _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
  62. if _id in self.set_id:
  63. continue
  64. list_id.append(_id)
  65. self.process_queue.put(_d)
  66. while next_token:
  67. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
  68. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  69. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  70. list_data = getRow_ots(rows)
  71. for _d in list_data:
  72. _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
  73. if _id in self.set_id:
  74. continue
  75. list_id.append(_id)
  76. self.process_queue.put(_d)
  77. _count += len(list_data)
  78. if _count>=process_count:
  79. break
  80. self.set_id = set(list_id)
  81. def comsumer(self):
  82. def start_thread(thread_count):
  83. mt = MultiThreadHandler(self.process_queue,self.comsumer_handle,None,thread_count,1,False,True)
  84. mt.run()
  85. process_count = 6
  86. thread_count = 6
  87. list_process = []
  88. for _i in range(process_count):
  89. p = Process(target=start_thread,args=(thread_count,))
  90. list_process.append(p)
  91. for p in list_process:
  92. p.start()
  93. for p in list_process:
  94. p.join()
  95. def comsumer_handle(self,item,result_queue):
  96. try:
  97. self.standardize(item)
  98. except Exception as e:
  99. traceback.print_exc()
  100. def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id","remove_words","level"]):
  101. '''
  102. Standardizes the product data
  103. 通过匹配标准参数表进行标准化,匹配是非精确匹配,校验规则是?
  104. :return:
  105. only save the standard product
  106. one temp data is regard as standard product onli if match the name,contition on this,
  107. if the brand is matched: if can be standard then change else add new brand ;if not matched replace as ""
  108. and the same as specs
  109. auto add the connection of name-brand and brand-specs because the 3 degree tree structure
  110. '''
  111. # todo:1. 产品参数表自动添加新的数据? 1. add new contections between existing names.2. add new specs
  112. # 型号在进行匹配时要求差异字符串不能包含数字和字母和罗马数字,且不能忽略出现次数差异
  113. save_product_tmp = Document_product_tmp({DOCUMENT_PRODUCT_TMP_ID:tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID)})
  114. _status = 0
  115. document_product_tmp = Document_product_tmp(tmp_dict)
  116. tenderee = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_TENDEREE,"")
  117. name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,"")
  118. brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,"")
  119. specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,"")
  120. parameters = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_PARAMETER,"")
  121. name = name.replace(tenderee,"")
  122. brand = brand.replace(tenderee,"")
  123. original_name = name
  124. original_brand = brand
  125. original_specs = specs
  126. list_candidates = [a for a in [name,brand,specs,parameters] if a!=""]
  127. list_candidate_brand_specs = [a for a in [brand,specs,parameters,name] if a!=""]
  128. if brand=="" and parameters!="":
  129. brand = parameters
  130. if specs=="" and parameters!="":
  131. specs = parameters
  132. new_name = ""
  133. new_brand = ""
  134. new_specs = ""
  135. name_ots_id = None
  136. brand_ots_id = None
  137. specs_ots_id = None
  138. if name is not None and name!="":
  139. Coll,_ = self.get_collection(NAME_GRADE)
  140. search_list = get_intellect_search(Coll,embedding_index_name,name,NAME_GRADE,self.search_params,output_fields,limit=10)
  141. for _search in search_list:
  142. ots_id = _search.get("standard_name_id")
  143. ots_name = _search.get("ots_name")
  144. standard_name = _search.get("standard_name")
  145. ots_parent_id = _search.get("ots_parent_id")
  146. remove_words = _search.get("remove_words")
  147. if check_product(name,ots_name,remove_words):
  148. name_ots_id = get_document_product_dict_id(ots_parent_id,standard_name)
  149. original_name = name
  150. new_name = standard_name
  151. log("checking name %s succeed %s %s"%(name,ots_name,str(remove_words)))
  152. # #update alias of name
  153. # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
  154. # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
  155. # if _flag and _dpd.updateAlias(name):
  156. # _dpd.update_row(self.ots_client)
  157. break
  158. if name_ots_id is None:
  159. for name in list_candidates:
  160. Coll,_ = self.get_collection(NAME_GRADE)
  161. search_list = get_intellect_search(Coll,embedding_index_name,name,NAME_GRADE,self.search_params,output_fields,limit=10)
  162. for _search in search_list:
  163. ots_id = _search.get("standard_name_id")
  164. ots_name = _search.get("ots_name")
  165. standard_name = _search.get("standard_name")
  166. ots_parent_id = _search.get("ots_parent_id")
  167. remove_words = _search.get("remove_words")
  168. if check_product(name,ots_name,remove_words):
  169. log("checking name %s succeed %s %s"%(name,ots_name,str(remove_words)))
  170. name_ots_id = get_document_product_dict_id(ots_parent_id,standard_name)
  171. original_name = name
  172. new_name = standard_name
  173. # #update alias of name
  174. # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
  175. # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
  176. # if _flag and _dpd.updateAlias(name):
  177. # _dpd.update_row(self.ots_client)
  178. break
  179. if name_ots_id is not None:
  180. if brand is not None and brand!="":
  181. s_brand = brand
  182. l_brand = [brand]
  183. Coll,_ = self.get_collection(BRAND_GRADE)
  184. _find = False
  185. for brand in l_brand:
  186. if len(brand)>100:
  187. continue
  188. search_list = get_intellect_search(Coll,embedding_index_name,brand,BRAND_GRADE,self.search_params,output_fields,limit=10)
  189. # log("search brand %s"%(brand))
  190. for _search in search_list:
  191. ots_id = _search.get("standard_name_id")
  192. ots_name = _search.get("ots_name")
  193. standard_name = _search.get("standard_name")
  194. ots_parent_id = _search.get("ots_parent_id")
  195. remove_words = _search.get("remove_words")
  196. # log("check brand %s and %s"%(brand,ots_name))
  197. if check_brand(brand,ots_name,remove_words):
  198. # log("check brand similar succeed:%s and %s"%(brand,ots_name))
  199. if ots_name==new_name:
  200. continue
  201. original_brand = brand
  202. if original_brand==original_name:
  203. if len(new_name)+len(ots_name)>len(original_name):
  204. continue
  205. if original_brand.find(ots_name)>=1:
  206. continue
  207. if len(original_brand)<=3:
  208. continue
  209. new_brand = standard_name
  210. log("checking brand %s succeed %s"%(brand,new_brand))
  211. # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
  212. if name_ots_id is not None:
  213. brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
  214. _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
  215. DOCUMENT_PRODUCT_DICT_NAME:new_brand,
  216. DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_brand).lower()),
  217. DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
  218. DOCUMENT_PRODUCT_DICT_STATUS:1,
  219. DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
  220. DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
  221. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  222. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  223. }
  224. _dpd_brand = Document_product_dict(_d_brand)
  225. # _dpd_brand.updateAlias(str(new_brand).lower())
  226. if not _dpd_brand.exists_row(self.ots_client):
  227. _dpd_brand.update_row(self.ots_client)
  228. else:
  229. pass
  230. # #update alias
  231. # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
  232. # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
  233. # if _flag:
  234. # if _dpd.updateAlias(brand):
  235. # _dpd.update_row(self.ots_client)
  236. _find = True
  237. break
  238. else:
  239. # log("check brand similar failed:%s and %s"%(brand,ots_name))
  240. # add new brand?
  241. pass
  242. if _find:
  243. break
  244. if not _find:
  245. for brand in l_brand:
  246. if len(brand)>100:
  247. continue
  248. c_brand = clean_product_brand(brand)
  249. if self.check_new_brand(c_brand):
  250. if c_brand=="":
  251. continue
  252. original_brand = brand
  253. if original_brand==original_name:
  254. if len(new_name)+len(c_brand)>len(original_name):
  255. continue
  256. if new_name==original_brand:
  257. continue
  258. if original_brand.find(c_brand)>=1:
  259. continue
  260. if len(original_brand)<=3:
  261. continue
  262. new_brand = c_brand
  263. log("adding new brand %s"%(str(new_brand)))
  264. _d_brand = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
  265. DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_brand,
  266. DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(str(new_brand).lower()),
  267. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:BRAND_GRADE,
  268. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
  269. DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:name_ots_id,
  270. DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  271. DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  272. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert",
  273. DOCUMENT_PRODUCT_ORIGINAL_BRAND:brand
  274. }
  275. dpdi = Document_product_dict_interface(_d_brand)
  276. dpdi.update_row(self.ots_client)
  277. break
  278. if brand_ots_id is None:
  279. _find = False
  280. Coll,_ = self.get_collection(BRAND_GRADE)
  281. for brand in list_candidates:
  282. if _find:
  283. break
  284. l_brand = [brand]
  285. for brand in l_brand:
  286. if len(brand)>100:
  287. continue
  288. if _find:
  289. break
  290. search_list = get_intellect_search(Coll,embedding_index_name,brand,BRAND_GRADE,self.search_params,output_fields,limit=10)
  291. # log("search brand %s"%(brand))
  292. for _search in search_list:
  293. ots_id = _search.get("standard_name_id")
  294. ots_name = _search.get("ots_name")
  295. standard_name = _search.get("standard_name")
  296. ots_parent_id = _search.get("ots_parent_id")
  297. remove_words = _search.get("remove_words")
  298. # log("check brand %s and %s"%(brand,ots_name))
  299. if check_brand(brand,ots_name,remove_words):
  300. # log("check brand similar succeed:%s and %s"%(brand,ots_name))
  301. if ots_name==new_name:
  302. continue
  303. original_brand = brand
  304. if original_brand==original_name:
  305. if len(new_name)+len(ots_name)>len(original_name):
  306. continue
  307. if original_brand.find(ots_name)>=1:
  308. continue
  309. if len(original_brand)<=3:
  310. continue
  311. new_brand = standard_name
  312. log("checking brand %s succeed %s"%(brand,new_brand))
  313. # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
  314. if name_ots_id is not None:
  315. brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
  316. _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
  317. DOCUMENT_PRODUCT_DICT_NAME:new_brand,
  318. DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_brand).lower()),
  319. DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
  320. DOCUMENT_PRODUCT_DICT_STATUS:1,
  321. DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
  322. DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
  323. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  324. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  325. }
  326. _dpd_brand = Document_product_dict(_d_brand)
  327. # _dpd_brand.updateAlias(str(new_brand).lower())
  328. if not _dpd_brand.exists_row(self.ots_client):
  329. _dpd_brand.update_row(self.ots_client)
  330. else:
  331. pass
  332. # #update alias
  333. # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
  334. # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
  335. # if _flag:
  336. # if _dpd.updateAlias(brand):
  337. # _dpd.update_row(self.ots_client)
  338. _find = True
  339. break
  340. if specs is not None and specs!="":
  341. debug("getting sepcs %s"%(specs))
  342. list_specs = []
  343. c_specs = clean_product_specs(specs)
  344. list_specs.append(c_specs)
  345. for s in re.split("[\u4e00-\u9fff]",specs):
  346. if s!="" and len(s)>4:
  347. list_specs.append(s)
  348. _index = 0
  349. break_flag = False
  350. list_similar_specs = []
  351. for c_specs in list_specs:
  352. if break_flag:
  353. break
  354. _index += 1
  355. specs_vector = get_embedding_request(c_specs)
  356. if specs_vector is not None:
  357. Coll,_ = self.get_collection(SPECS_GRADE)
  358. search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=20)
  359. for _search in search_list:
  360. ots_id = _search.get("standard_name_id")
  361. ots_name = _search.get("ots_name")
  362. standard_name = _search.get("standard_name")
  363. ots_parent_id = _search.get("ots_parent_id")
  364. debug("checking specs %s and %s"%(specs,ots_name))
  365. if is_similar(specs,ots_name):
  366. # log("specs is_similar")
  367. if check_specs(c_specs,ots_name):
  368. break_flag = True
  369. original_specs = c_specs
  370. if standard_name==new_name or standard_name==new_brand:
  371. continue
  372. new_specs = standard_name
  373. log("check_specs %s succeed %s"%(specs,new_specs))
  374. # to update the document_product_dict which is builded for search
  375. if brand_ots_id is not None:
  376. # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
  377. specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
  378. _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
  379. DOCUMENT_PRODUCT_DICT_NAME:new_specs,
  380. DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_specs).lower()),
  381. DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
  382. DOCUMENT_PRODUCT_DICT_STATUS:1,
  383. DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
  384. DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
  385. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  386. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  387. }
  388. _dpd_specs = Document_product_dict(_d_specs)
  389. # _dpd_specs.updateAlias(str(new_specs).lower())
  390. if not _dpd_specs.exists_row(self.ots_client):
  391. _dpd_specs.update_row(self.ots_client)
  392. # user interface to add
  393. else:
  394. pass
  395. # #update alias
  396. # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
  397. # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
  398. # if _flag:
  399. # if _dpd.updateAlias(specs):
  400. # _dpd.update_row(self.ots_client)
  401. break_flag = True
  402. break
  403. else:
  404. list_similar_specs.append(specs)
  405. # add new specs?
  406. if new_specs is not None and new_specs!="":
  407. pass
  408. else:
  409. debug("specs not similar")
  410. for specs in list_similar_specs:
  411. if is_legal_specs(specs) and len(specs)<MAX_NAME_LENGTH and len(specs)>=5:
  412. debug("is_legal_specs")
  413. original_specs = specs
  414. new_specs = clean_product_specs(specs)
  415. if new_specs==new_name or new_specs==new_brand:
  416. new_specs = ""
  417. continue
  418. # insert into document_product_dict a new record
  419. # to update the document_product_dict which is builded for search
  420. # add new specs
  421. if brand_ots_id is not None and name_ots_id is not None:
  422. specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
  423. # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
  424. # DOCUMENT_PRODUCT_DICT_NAME:new_specs,
  425. # DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
  426. # DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
  427. # DOCUMENT_PRODUCT_DICT_STATUS:1,
  428. # DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
  429. # DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  430. # DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  431. # }
  432. # _dpd = Document_product_dict(_d)
  433. # _dpd.update_row(self.ots_client)
  434. log("adding new specs %s"%(new_specs))
  435. # user interface to add
  436. _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
  437. DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
  438. DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
  439. DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
  440. DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
  441. DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
  442. DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  443. DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  444. DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
  445. }
  446. _dpdi = Document_product_dict_interface(_d)
  447. _dpdi.update_row(self.ots_client)
  448. break
  449. if specs_ots_id is None:
  450. _find = False
  451. for specs in list_candidate_brand_specs:
  452. if _find:
  453. break
  454. debug("getting sepcs %s"%(specs))
  455. list_specs = []
  456. c_specs = clean_product_specs(specs)
  457. list_specs.append(c_specs)
  458. for s in re.split("[\u4e00-\u9fff]",specs):
  459. if s!="" and len(s)>4:
  460. list_specs.append(s)
  461. similar_flag = None
  462. _index = 0
  463. for c_specs in list_specs:
  464. if _find:
  465. break
  466. _index += 1
  467. specs_vector = get_embedding_request(c_specs)
  468. if specs_vector is not None:
  469. Coll,_ = self.get_collection(SPECS_GRADE)
  470. search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=10)
  471. for _search in search_list:
  472. if _find:
  473. break
  474. ots_id = _search.get("standard_name_id")
  475. ots_name = _search.get("ots_name")
  476. standard_name = _search.get("standard_name")
  477. ots_parent_id = _search.get("ots_parent_id")
  478. debug("checking specs %s and %s"%(specs,ots_name))
  479. if is_similar(c_specs,ots_name):
  480. # log("specs is_similar")
  481. if check_specs(c_specs,ots_name):
  482. break_flag = True
  483. original_specs = c_specs
  484. new_specs = standard_name
  485. if new_specs==new_name or new_specs==new_brand:
  486. new_specs = ""
  487. continue
  488. if brand_ots_id is not None:
  489. # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
  490. specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
  491. _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
  492. DOCUMENT_PRODUCT_DICT_NAME:new_specs,
  493. DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_specs).lower()),
  494. DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
  495. DOCUMENT_PRODUCT_DICT_STATUS:1,
  496. DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
  497. DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
  498. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  499. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  500. }
  501. _dpd_specs = Document_product_dict(_d_specs)
  502. # _dpd_specs.updateAlias(str(new_specs).lower())
  503. if not _dpd_specs.exists_row(self.ots_client):
  504. _dpd_specs.update_row(self.ots_client)
  505. _find = True
  506. break
  507. # judge if the product matches the standard product
  508. if name_ots_id is not None:
  509. is_legal_data = True
  510. #standard the product and same to document_product table
  511. _product = Document_product(tmp_dict)
  512. docid = _product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
  513. unit_price = _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)
  514. quantity = _product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY)
  515. unit_price = clean_product_unit_price(unit_price)
  516. quantity = clean_product_quantity(quantity)
  517. total_price = _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)
  518. _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
  519. _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
  520. win_bid_price = _product.getProperties().get(DOCUMENT_PRODUCT_WIN_BID_PRICE)
  521. if isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
  522. if unit_price>0:
  523. new_quantity = total_price/unit_price
  524. if new_quantity!=quantity:
  525. # if new_quantity==total_price//unit_price:
  526. # quantity = int(new_quantity)
  527. # _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
  528. # else:
  529. # is_legal_data = False
  530. is_legal_data = False
  531. elif quantity>0:
  532. unit_price = total_price/quantity
  533. _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
  534. elif isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
  535. total_price = float("%.2f"%(unit_price*quantity))
  536. _product.setValue(DOCUMENT_PRODUCT_TOTAL_PRICE,total_price,True)
  537. elif isinstance(unit_price,(float,int)) and isinstance(total_price,(float,int)):
  538. if unit_price>0:
  539. quantity = int(total_price//unit_price)
  540. _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
  541. elif isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
  542. if quantity>0:
  543. unit_price = float("%.2f"%(total_price/quantity))
  544. _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
  545. elif isinstance(quantity,(float,int)) and quantity>10000:
  546. is_legal_data = False
  547. if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE),(float,int)) and isinstance(win_bid_price,(float,int)):
  548. if _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)>win_bid_price*10 and win_bid_price>0:
  549. is_legal_data = False
  550. if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE),(float,int)) and _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)>100000000:
  551. is_legal_data = False
  552. new_id = self.get_product_id(docid,new_name,new_brand,new_specs,unit_price,quantity)
  553. _product.setValue(DOCUMENT_PRODUCT_ID,new_id,True)
  554. _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_ID,tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID),True)
  555. if name_ots_id is not None:
  556. _product.setValue(DOCUMENT_PRODUCT_DICT_NAME_ID,name_ots_id,True)
  557. if brand_ots_id is not None:
  558. _product.setValue(DOCUMENT_PRODUCT_DICT_BRAND_ID,brand_ots_id,True)
  559. if specs_ots_id is not None:
  560. _product.setValue(DOCUMENT_PRODUCT_DICT_SPECS_ID,specs_ots_id,True)
  561. _product.setValue(DOCUMENT_PRODUCT_NAME,new_name,True)
  562. _product.setValue(DOCUMENT_PRODUCT_BRAND,new_brand,True)
  563. _product.setValue(DOCUMENT_PRODUCT_SPECS,new_specs,True)
  564. _product.setValue(DOCUMENT_PRODUCT_STATUS,randint(201,300),True)
  565. _product.setValue(DOCUMENT_PRODUCT_BRANDSPECS,"%s&&%s"%(new_brand,new_specs),True)
  566. _product.setValue(DOCUMENT_PRODUCT_FULL_NAME,"%s&&%s&&%s"%(new_name,new_brand,new_specs),True)
  567. _product.setValue(DOCUMENT_PRODUCT_CREATE_TIME,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
  568. _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_NAME,original_name,True)
  569. _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_BRAND,original_brand,True)
  570. _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,original_specs,True)
  571. list_attachments,bid_filemd5s = self.get_bid_filemd5s(docid,self.ots_client)
  572. if len(list_attachments)>0:
  573. _product.setValue(DOCUMENT_PRODUCT_ATTACHMENTS,json.dumps(list_attachments,ensure_ascii=False),True)
  574. _product.setValue(DOCUMENT_PRODUCT_HAS_ATTACHMENTS,1,True)
  575. if bid_filemd5s!="":
  576. _product.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
  577. _product.setValue(DOCUMENT_PRODUCT_HAS_BIDFILE,1,True)
  578. _product.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,0,True)
  579. if not is_legal_data:
  580. _status = randint(501,550)
  581. else:
  582. _flag,dump_id = self.dumplicate(_product)
  583. if _flag:
  584. _status = randint(201,300)
  585. save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_NEW_ID,new_id,True)
  586. _product.update_row(self.ots_client)
  587. else:
  588. _status = randint(451,500)
  589. save_product_tmp.setValue(DOCUMENT_PRODUCT_DUMP_ID,str(dump_id),True)
  590. else:
  591. _status = randint(401,450)
  592. save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_STATUS,_status,True)
  593. save_product_tmp.update_row(self.ots_client)
  594. def check_new_brand(self,brand):
  595. return is_legal_brand(self.ots_client,brand)
  596. @staticmethod
  597. def get_bid_filemd5s(docid,ots_client):
  598. bool_query = BoolQuery(must_queries=[
  599. TermQuery("docids",docid)
  600. ])
  601. rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
  602. SearchQuery(bool_query,limit=10),
  603. columns_to_get=ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
  604. list_data = getRow_ots(rows)
  605. list_bid_filemd5s = []
  606. list_attachments = []
  607. set_docids = set([docid])
  608. set_md5s = set()
  609. for _d in list_data:
  610. try:
  611. docids = _d.get("docids","")
  612. for _id in docids.split(","):
  613. set_docids.add(int(_id))
  614. except Exception as e:
  615. pass
  616. list_docids = list(set_docids)
  617. for _docid in list_docids:
  618. _d = {document_partitionkey:_docid%500+1,
  619. document_docid:_docid}
  620. _doc = Document(_d)
  621. _doc.fix_columns(ots_client,[document_attachment_path],True)
  622. page_attachments = _doc.getProperties().get(document_attachment_path)
  623. if page_attachments is not None and page_attachments!="":
  624. attachments = json.loads(page_attachments)
  625. for _a in attachments:
  626. _filemd5 = _a.get(document_attachment_path_filemd5)
  627. if _filemd5 in set_md5s or _filemd5 is None:
  628. continue
  629. set_md5s.add(_filemd5)
  630. _da = {attachment_filemd5:_filemd5}
  631. _attach = attachment(_da)
  632. if _attach.fix_columns(ots_client,[attachment_classification,attachment_filetype],True):
  633. _da[attachment_classification] = _attach.getProperties().get(attachment_classification)
  634. _da[attachment_filetype] = _attach.getProperties().get(attachment_filetype)
  635. list_attachments.append(_da)
  636. if _attach.getProperties().get(attachment_classification,"")=="招标文件":
  637. list_bid_filemd5s.append(_filemd5)
  638. return list_attachments,",".join(list(set(list_bid_filemd5s)))
  639. def get_value_count(self,name,brand,specs,unit_price,quantity):
  640. value_count = 0
  641. if name is not None and len(name)>0:
  642. value_count += 1
  643. if brand is not None and len(brand)>0:
  644. value_count += 1
  645. if specs is not None and len(specs)>0:
  646. value_count += 1
  647. if isinstance(unit_price,(float,int)) and unit_price>0:
  648. value_count += 1
  649. if isinstance(quantity,(float,int)) and quantity>0:
  650. value_count += 1
  651. return value_count
  652. def dumplicate_search_product(self,document_product):
  653. docid = document_product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
  654. name = str(document_product.getProperties().get(DOCUMENT_PRODUCT_NAME,""))
  655. brand = str(document_product.getProperties().get(DOCUMENT_PRODUCT_BRAND,""))
  656. specs = str(document_product.getProperties().get(DOCUMENT_PRODUCT_SPECS,""))
  657. unit_price = document_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
  658. quantity = document_product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY,"")
  659. page_time = document_product.getProperties().get(DOCUMENT_PRODUCT_PAGE_TIME)
  660. tenderee = str(document_product.getProperties().get(DOCUMENT_PRODUCT_TENDEREE,""))
  661. supplier = str(document_product.getProperties().get(DOCUMENT_PRODUCT_SUPPLIER,""))
  662. base_value_count = self.get_value_count(name,brand,specs,unit_price,quantity)
  663. list_dump_id = []
  664. page_time_before = page_time
  665. page_time_after = page_time
  666. try:
  667. page_time_before = timeAdd(page_time,-30,format="%Y-%m-%d",)
  668. page_time_after = timeAdd(page_time,30)
  669. except Exception as e:
  670. pass
  671. to_save = 1
  672. if len(name)>0 and len(brand)>0 and len(specs)>0 and isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
  673. bool_query = BoolQuery(must_queries=[TermQuery("name",name),
  674. RangeQuery("page_time",page_time_before,page_time_after,True,True),
  675. TermQuery("brand",brand),
  676. TermQuery("specs",specs),
  677. TermQuery("unit_price",unit_price),
  678. TermQuery("quantity",quantity)
  679. ])
  680. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  681. SearchQuery(bool_query,limit=1),
  682. columns_to_get=ColumnsToGet(["name",'brand','specs'],return_type=ColumnReturnType.SPECIFIED))
  683. list_data = getRow_ots(rows)
  684. if len(list_data)>0:
  685. return list_data[0].get(DOCUMENT_PRODUCT_ID),0
  686. bool_query = BoolQuery(must_queries=[
  687. TermQuery(project_docids,str(docid)),
  688. ])
  689. rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
  690. SearchQuery(bool_query,limit=10),
  691. ColumnsToGet([project_docids],return_type=ColumnReturnType.SPECIFIED))
  692. list_data = getRow_ots(rows)
  693. set_docid = set()
  694. for _data in list_data:
  695. _docids = _data.get(project_docids,"")
  696. for d_id in _docids.split(","):
  697. d_id = d_id.strip()
  698. if d_id!="":
  699. set_docid.add(int(d_id))
  700. if docid in set_docid:
  701. set_docid.remove(docid)
  702. should_q = [TermQuery(DOCUMENT_PRODUCT_DOCID,did) for did in set_docid]
  703. if len(should_q)>0:
  704. bool_query = BoolQuery(must_queries=[TermQuery("name",name),
  705. BoolQuery(should_queries=should_q),
  706. ])
  707. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  708. SearchQuery(bool_query,limit=50),
  709. columns_to_get=ColumnsToGet(["docid",'name','brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
  710. list_data = getRow_ots(rows)
  711. dict_docid_name = {}
  712. match_ids = []
  713. for _data in list_data:
  714. docid1 = _data.get(DOCUMENT_PRODUCT_DOCID)
  715. name1 = _data.get(DOCUMENT_PRODUCT_NAME)
  716. brand1 = _data.get(DOCUMENT_PRODUCT_BRAND)
  717. specs1 = _data.get(DOCUMENT_PRODUCT_SPECS)
  718. unit_price1 = _data.get(DOCUMENT_PRODUCT_UNIT_PRICE)
  719. quantity1 = _data.get(DOCUMENT_PRODUCT_QUANTITY)
  720. id = _data.get(DOCUMENT_PRODUCT_ID)
  721. value_count1 = self.get_value_count(name1,brand1,specs1,unit_price1,quantity1)
  722. if name1==name:
  723. match_ids.append({DOCUMENT_PRODUCT_ID:id,"value_count":value_count1})
  724. if docid1 not in dict_docid_name:
  725. dict_docid_name[docid1] = []
  726. dict_docid_name[docid1].append(name)
  727. is_all_one = True
  728. for k,v in dict_docid_name.items():
  729. if len(v)!=1:
  730. is_all_one = False
  731. if is_all_one:
  732. match_ids.sort(key=lambda x:x.get("value_count",0),reverse=True)
  733. if len(match_ids)>0:
  734. _id = match_ids[0].get(DOCUMENT_PRODUCT_ID)
  735. value_count1 = match_ids[0]["value_count"]
  736. if base_value_count<value_count1:
  737. to_save = 0
  738. for _match in match_ids:
  739. list_dump_id.append(_match.get(DOCUMENT_PRODUCT_ID))
  740. if len(name)>0 and len(brand)>0 and len(supplier)>0 and len(tenderee)>0:
  741. # log("docid %s name %s page_time_before %s page_time_after %s brand %s supplier %s tenderee %s"%(str(docid),name,page_time_before,page_time_after,brand,supplier,tenderee))
  742. bool_query = BoolQuery(must_queries=[TermQuery("name",name),
  743. RangeQuery("page_time",page_time_before,page_time_after,True,True),
  744. TermQuery(DOCUMENT_PRODUCT_BRAND,brand),
  745. TermQuery(DOCUMENT_PRODUCT_TENDEREE,tenderee),
  746. TermQuery(DOCUMENT_PRODUCT_SUPPLIER,supplier),
  747. ])
  748. rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  749. SearchQuery(bool_query,limit=50),
  750. columns_to_get=ColumnsToGet(['name','brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
  751. list_data = getRow_ots(rows)
  752. for _d in list_data:
  753. s_id = _d.get(DOCUMENT_PRODUCT_ID)
  754. s_name = _d.get(DOCUMENT_PRODUCT_NAME,"")
  755. s_brand = _d.get(DOCUMENT_PRODUCT_BRAND,"")
  756. s_specs = _d.get(DOCUMENT_PRODUCT_SPECS,"")
  757. s_unit_price = _d.get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
  758. s_quantity = _d.get(DOCUMENT_PRODUCT_QUANTITY,"")
  759. check_flag = True
  760. value_count1 = self.get_value_count(s_name,s_brand,s_specs,s_unit_price,s_quantity)
  761. if len(specs)>0 and len(s_specs)>0 and specs!=s_specs:
  762. check_flag = False
  763. elif isinstance(unit_price,(float,int)) and isinstance(s_unit_price,(float,int)) and unit_price!=s_unit_price:
  764. check_flag = False
  765. elif isinstance(quantity,(float,int)) and isinstance(s_quantity,(float,int)) and quantity!=s_quantity:
  766. check_flag = False
  767. if check_flag:
  768. if base_value_count<value_count1:
  769. to_save = 0
  770. list_dump_id.append(s_id)
  771. return list_dump_id,to_save
  772. def dumplicate(self,document_product):
  773. '''
  774. Duplicates the product data
  775. 将同一个产品的采购结果公示进行去重,结合公告进行。
  776. :return:True if not repeated else False
  777. '''
  778. dump_id,to_save = self.dumplicate_search_product(document_product)
  779. if dump_id is not None:
  780. document_product.setValue(DOCUMENT_PRODUCT_DUMP_ID,str(dump_id),True)
  781. if to_save==1:
  782. if dump_id is not None:
  783. if isinstance(dump_id,str):
  784. _d = {DOCUMENT_PRODUCT_ID:dump_id,
  785. DOCUMENT_PRODUCT_STATUS:randint(401,450),
  786. DOCUMENT_PRODUCT_DUMP_ID:document_product.getProperties().get(DOCUMENT_PRODUCT_ID)}
  787. _dp = Document_product(_d)
  788. _dp.update_row(self.ots_client)
  789. elif isinstance(dump_id,list):
  790. for d_id in dump_id:
  791. _d = {DOCUMENT_PRODUCT_ID:d_id,
  792. DOCUMENT_PRODUCT_STATUS:randint(401,450),
  793. DOCUMENT_PRODUCT_DUMP_ID:document_product.getProperties().get(DOCUMENT_PRODUCT_ID)}
  794. _dp = Document_product(_d)
  795. _dp.update_row(self.ots_client)
  796. return True,dump_id
  797. else:
  798. return False,dump_id
  799. def start_processing(self):
  800. scheduler = BlockingScheduler()
  801. scheduler.add_job(self.producer,"cron",second="*/20")
  802. scheduler.add_job(self.comsumer,"cron",minute="*/1")
  803. scheduler.add_job(self.embedding_comsumer,"cron",minute="*/1")
  804. scheduler.add_job(self.embedding_interface_comsumer,"cron",second="*/20")
  805. scheduler.start()
  806. def test(self):
  807. from BaseDataMaintenance.common.sentencesUtil import cosine_similarity
  808. import torch
  809. output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]
  810. id = '56bdad168c71a1fc4d57cd10bcd987f0'
  811. collection,_ = self.get_collection(SPECS_GRADE)
  812. vector = request_embedding("西门子MAGNETOMLumina")
  813. vector1 = request_embedding("西门子")
  814. print("cosine similarity",cosine_similarity(torch.from_numpy(np.array([vector])) ,torch.from_numpy(np.array([vector1]))))
  815. Coll,_ = self.get_collection(SPECS_GRADE)
  816. search_list = search_embedding(Coll,embedding_index_name,[vector],self.search_params,output_fields,limit=60)
  817. for p in search_list:
  818. print(p)
  819. #
  820. # res = collection.query(
  821. # expr = "ots_id in ['%s']"%(id),
  822. # offset = 0,
  823. # limit = 10,
  824. # output_fields = output_fields,
  825. # consistency_level="Strong"
  826. # )
  827. # print(res)
  828. def start_process_product():
  829. pm = Product_Manager()
  830. pm.start_processing()
  831. def fix_product_data():
  832. '''
  833. # delete document_product and change the record status to 1 in document_product_temp which id=original id
  834. :return:
  835. '''
  836. table_name = "document_product_temp"
  837. table_index = "document_product_temp_index"
  838. columns = [DOCUMENT_PRODUCT_TMP_NEW_ID,DOCUMENT_PRODUCT_TMP_STATUS]
  839. # table_name = Document_product_table_name
  840. # table_index = Document_product_table_name+"_index"
  841. # columns = [DOCUMENT_PRODUCT_ORIGINAL_ID]
  842. ots_client = getConnect_ots()
  843. bool_query = BoolQuery(should_queries=[
  844. # RangeQuery("status",501),
  845. # TermQuery("docid",246032980)
  846. RangeQuery("status",201,501),
  847. # RangeQuery("status",401,451)
  848. # WildcardQuery(DOCUMENT_PRODUCT_ORIGINAL_SPECS,"MFUSOne")
  849. # TermQuery(DOCUMENT_PRODUCT_SPECS,"MFUSOne")
  850. ])
  851. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  852. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  853. columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  854. list_rows = getRow_ots(rows)
  855. print(total_count)
  856. while next_token:
  857. rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
  858. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  859. columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
  860. list_rows.extend(getRow_ots(rows))
  861. print("%d/%d"%(len(list_rows),total_count))
  862. # if len(list_rows)>10000:
  863. # break
  864. task_queue = Queue()
  865. for d in list_rows:
  866. task_queue.put(d)
  867. def fix_missing_data(item,result_queue):
  868. original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  869. print("original_id",original_id)
  870. _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
  871. dpt = Document_product_tmp(_d)
  872. dpt.fix_columns(ots_client,["name","brand","specs"],True)
  873. _d = {DOCUMENT_PRODUCT_ID:item.get(DOCUMENT_PRODUCT_ID)}
  874. dp = Document_product(_d)
  875. #fix the project_code and original_name and bidi_filemd5s
  876. docid = int(item.get(DOCUMENT_PRODUCT_DOCID))
  877. partitionkey = docid%500+1
  878. # project_name = item.get(DOCUMENT_PRODUCT_PROJECT_NAME,"")
  879. # if project_name=="":
  880. # #fix project_name
  881. # _doc = Document({"partitionkey":partitionkey,
  882. # "docid":docid})
  883. # _doc.fix_columns(ots_client,["doctitle"],True)
  884. # dp.setValue(DOCUMENT_PRODUCT_DOCTITLE,_doc.getProperties().get("doctitle"),True)
  885. list_attachments,bid_filemd5s = Product_Manager.get_bid_filemd5s(docid,ots_client)
  886. if bid_filemd5s!="":
  887. dp.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
  888. dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_NAME,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,""),True)
  889. dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_BRAND,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,""),True)
  890. dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,""),True)
  891. dp.update_row(ots_client)
  892. def deleteAndReprocess(item,result_queue):
  893. original_id = item.get(DOCUMENT_PRODUCT_TMP_ID)
  894. new_id = item.get(DOCUMENT_PRODUCT_TMP_NEW_ID)
  895. # original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
  896. # new_id = item.get(DOCUMENT_PRODUCT_ID)
  897. print("original_id",original_id,"id",item.get(DOCUMENT_PRODUCT_ID))
  898. # delete data and rerun
  899. _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
  900. dpt = Document_product_tmp(_d)
  901. dpt.update_row(ots_client)
  902. if new_id is not None and new_id!="":
  903. _d = {DOCUMENT_PRODUCT_ID:new_id}
  904. dp = Document_product(_d)
  905. dp.delete_row(ots_client)
  906. def handle(item,result_queue):
  907. win_bid_price = item.get(DOCUMENT_PRODUCT_TMP_WIN_BID_PRICE,1)
  908. if win_bid_price==0:
  909. dpt = Document_product_tmp(item)
  910. dpt.setValue(DOCUMENT_PRODUCT_TMP_STATUS,1,True)
  911. dpt.update_row(ots_client)
  912. mt = MultiThreadHandler(task_queue,deleteAndReprocess,None,30,1)
  913. mt.run()
  914. def test_check_brand():
  915. import logging
  916. root = logging.getLogger()
  917. root.setLevel(logging.DEBUG)
  918. from queue import Queue
  919. brand_path = "brand.txt"
  920. list_brand = []
  921. with open(brand_path,"r",encoding="utf8") as f:
  922. while 1:
  923. line = f.readline()
  924. if not line:
  925. break
  926. line = line.strip()
  927. if len(line)>0:
  928. brand = {"brand":line}
  929. list_brand.append(brand)
  930. # if len(list_brand)>100:
  931. # break
  932. task_queue = Queue()
  933. for _d in list_brand:
  934. task_queue.put(_d)
  935. pm = Product_Manager()
  936. def _handle(item,result_queue):
  937. brand = item.get("brand")
  938. new_brand = clean_product_brand(brand)
  939. _f = pm.check_new_brand(brand)
  940. item["f"] = _f
  941. item["new_brand"] = new_brand
  942. mt = MultiThreadHandler(task_queue,_handle,None,30,1)
  943. mt.run()
  944. list_legal_brand = []
  945. list_illegal_brand = []
  946. for _d in list_brand:
  947. f = _d.get("f")
  948. log("brand %s flag %s"%(brand,str(f)))
  949. if f:
  950. brand = _d.get("new_brand")
  951. list_legal_brand.append(brand)
  952. else:
  953. brand = _d.get("brand")
  954. list_illegal_brand.append(brand)
  955. with open("../../test/legal_brand.txt", "w", encoding="utf8") as f:
  956. for b in list_legal_brand:
  957. f.write(b+"\n")
  958. with open("../../test/illegal_brand.txt", "w", encoding="utf8") as f:
  959. for b in list_illegal_brand:
  960. f.write(b+"\n")
  961. def test_match():
  962. a = "迈瑞晟"
  963. # vector = request_embedding(get_milvus_standard_name(a))
  964. # vector = [get_embedding_request(b) for b in a]
  965. pm = Product_Manager()
  966. _GRADE = BRAND_GRADE
  967. Coll,_ = pm.get_collection(_GRADE)
  968. print(Coll.name)
  969. output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id","remove_words","level"]
  970. # start_time = time.time()
  971. _id = get_milvus_product_dict_id(a)
  972. print(Coll.query(expr=" ots_id in ['%s'] "%(_id),output_fields=output_fields))
  973. # print("cost",time.time()-start_time)
  974. # print(Coll.compact())
  975. # result = search_embedding(Coll,embedding_index_name,[vector],pm.search_params,output_fields,limit=20)
  976. #
  977. # final_list = []
  978. # for _search in result:
  979. # _d = {}
  980. # for k in output_fields:
  981. # _d[k] = _search.entity.get(k)
  982. # final_list.append(_d)
  983. # final_list = remove_repeat_item(final_list,k="ots_name")
  984. start_time = time.time()
  985. # final_list = get_embedding_search(Coll,embedding_index_name,a,_GRADE,vector,pm.search_params,output_fields,limit=5)
  986. final_list = get_intellect_search(Coll,embedding_index_name,a,_GRADE,pm.search_params,output_fields,limit=10)
  987. for _search in final_list:
  988. ots_id = _search.get("standard_name_id")
  989. ots_name = _search.get("ots_name")
  990. standard_name = _search.get("standard_name")
  991. ots_parent_id = _search.get("ots_parent_id")
  992. remove_words = _search.get("remove_words")
  993. if check_brand(a,ots_name,remove_words):
  994. print("similar",a,ots_name)
  995. else:
  996. print("not similar",a,ots_name)
  997. print("cost",time.time()-start_time)
  998. print(final_list)
  999. def rebuild_milvus():
  1000. pdm = Product_Dict_Manager()
  1001. from multiprocessing import Queue as PQueue
  1002. bool_query = BoolQuery(must_queries=[
  1003. RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3)
  1004. ])
  1005. ots_client = getConnect_ots()
  1006. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1007. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("name")]),limit=100,get_total_count=True),
  1008. ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
  1009. list_data = getRow_ots(rows)
  1010. while next_token:
  1011. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1012. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1013. ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
  1014. list_data.extend(getRow_ots(rows))
  1015. print("%d/%d"%(len(list_data),total_count))
  1016. # if len(list_data)>1000:
  1017. # break
  1018. set_name_grade = set()
  1019. task_queue = PQueue()
  1020. for _data in list_data:
  1021. name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
  1022. grade = _data.get(DOCUMENT_PRODUCT_DICT_GRADE)
  1023. _key = "%s--%d"%(name,grade)
  1024. if _key not in set_name_grade:
  1025. task_queue.put(_data)
  1026. set_name_grade.add(_key)
  1027. log("rebuild milvus %d counts"%(task_queue.qsize()))
  1028. def insert_into_milvus(item,result_queue):
  1029. name = item.get(DOCUMENT_PRODUCT_DICT_NAME,"")
  1030. grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
  1031. if grade==SPECS_GRADE:
  1032. name = clean_product_specs(name)
  1033. if len(name)<2:
  1034. return
  1035. if len(name)<2:
  1036. return
  1037. parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID,"")
  1038. Coll,_ = pdm.get_collection(grade)
  1039. standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
  1040. log("insert name %s grade %d"%(name,grade))
  1041. remove_words = item.get(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,"")
  1042. level = item.get(DOCUMENT_PRODUCT_DICT_LEVEL)
  1043. if level is None:
  1044. if re.search("装置|设备",name) is not None:
  1045. level = 2
  1046. else:
  1047. level = 1
  1048. insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level)
  1049. def start_thread():
  1050. mt = MultiThreadHandler(task_queue,insert_into_milvus,None,5)
  1051. mt.run()
  1052. p_count = 5
  1053. list_p = []
  1054. for i in range(p_count):
  1055. p = Process(target=start_thread)
  1056. list_p.append(p)
  1057. for p in list_p:
  1058. p.start()
  1059. for p in list_p:
  1060. p.join()
  1061. def move_document_product():
  1062. bool_query = BoolQuery(must_queries=[
  1063. ExistsQuery(DOCUMENT_PRODUCT_NAME)
  1064. ])
  1065. ots_client = getConnect_ots()
  1066. Document_product_table_name = "document_product"
  1067. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  1068. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("name")]),limit=100,get_total_count=True),
  1069. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1070. list_data = getRow_ots(rows)
  1071. while next_token:
  1072. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  1073. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1074. ColumnsToGet(return_type=ColumnReturnType.ALL))
  1075. list_data.extend(getRow_ots(rows))
  1076. print("%d/%d"%(len(list_data),total_count))
  1077. # if len(list_data)>=1000:
  1078. # break
  1079. task_queue = Queue()
  1080. for _data in list_data:
  1081. task_queue.put(_data)
  1082. def _handle(item,result_queue):
  1083. D1 = Document_product(item)
  1084. D1.update_row(ots_client)
  1085. D1.table_name = Document_product_table_name
  1086. D1.delete_row(ots_client)
  1087. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1088. mt.run()
  1089. current_path = os.path.dirname(__file__)
  1090. def delete_brands():
  1091. filename = os.path.join(current_path,"illegal_brand.txt")
  1092. ots_client = getConnect_ots()
  1093. list_brand = []
  1094. with open(filename,"r",encoding="utf8") as f:
  1095. while 1:
  1096. brand = f.readline()
  1097. if not brand:
  1098. break
  1099. brand = brand.strip()
  1100. list_brand.append(brand)
  1101. pm = Product_Manager()
  1102. Coll,_ = pm.get_collection(BRAND_GRADE)
  1103. print(Coll.name)
  1104. Coll.compact()
  1105. _count = 0
  1106. task_queue = Queue()
  1107. for brand in list_brand:
  1108. _count += 1
  1109. task_queue.put(brand)
  1110. # if _count>=2:
  1111. # break
  1112. def _handle(brand,result_queue):
  1113. bool_query = BoolQuery(must_queries=[
  1114. TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,BRAND_GRADE),
  1115. TermQuery(DOCUMENT_PRODUCT_DICT_NAME,brand)
  1116. ])
  1117. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1118. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  1119. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1120. list_data = getRow_ots(rows)
  1121. _id = get_milvus_product_dict_id(brand)
  1122. while next_token:
  1123. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1124. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1125. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1126. list_data.extend(getRow_ots(rows))
  1127. for _d in list_data:
  1128. dpd = Document_product_dict(_d)
  1129. dpd.delete_row(ots_client)
  1130. # print(Coll.query(expr=" ots_id in ['%s']"%(_id),output_fields=["ots_id","ots_name"]))
  1131. delete_counts = Coll.delete(expr=" ots_id in ['%s']"%(_id)).delete_count
  1132. log("brand %s total_count %d md5:%s delete_counts:%d"%(brand,total_count,_id,delete_counts))
  1133. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1134. mt.run()
  1135. def delete_specs():
  1136. filename = os.path.join(current_path,"illegal_specs.txt")
  1137. ots_client = getConnect_ots()
  1138. list_brand = []
  1139. with open(filename,"r",encoding="utf8") as f:
  1140. while 1:
  1141. brand = f.readline()
  1142. if not brand:
  1143. break
  1144. brand = brand.strip()
  1145. list_brand.append(brand)
  1146. pm = Product_Manager()
  1147. Coll,_ = pm.get_collection(SPECS_GRADE)
  1148. print(Coll.name)
  1149. Coll.compact()
  1150. _count = 0
  1151. task_queue = Queue()
  1152. for specs in list_brand:
  1153. task_queue.put(specs)
  1154. _count += 1
  1155. # if _count>=2:
  1156. # break
  1157. def _handle(specs,result_queue):
  1158. bool_query = BoolQuery(must_queries=[
  1159. TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,SPECS_GRADE),
  1160. TermQuery(DOCUMENT_PRODUCT_DICT_NAME,specs)
  1161. ])
  1162. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1163. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  1164. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1165. list_data = getRow_ots(rows)
  1166. _id = get_milvus_product_dict_id(specs)
  1167. while next_token:
  1168. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1169. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  1170. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1171. list_data.extend(getRow_ots(rows))
  1172. for _d in list_data:
  1173. dpd = Document_product_dict(_d)
  1174. dpd.delete_row(ots_client)
  1175. # print(Coll.query(expr=" ots_id in ['%s']"%(_id),output_fields=["ots_id","ots_name"]))
  1176. delete_counts = Coll.delete(expr=" ots_id in ['%s']"%(_id)).delete_count
  1177. log("brand %s total_count %d md5:%s delete_counts:%d"%(specs,total_count,_id,delete_counts))
  1178. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1179. mt.run()
  1180. Coll.compact()
  1181. def remove_redis_keys():
  1182. db = redis.Redis(connection_pool=pool_product)
  1183. db.flushdb()
  1184. def update_document_product_dict():
  1185. import pandas as pd
  1186. filename = "update_product.csv"
  1187. df = pd.read_csv(filename,encoding="gbk")
  1188. ots_client = getConnect_ots()
  1189. for name,grade,standard_alias,remove_words,level in zip(df["name"],df["grade"],df["standard_alias"],df["remove_words"],df["level"]):
  1190. name = name.strip()
  1191. bool_query = BoolQuery(must_queries=[
  1192. TermQuery(DOCUMENT_PRODUCT_DICT_NAME,name),
  1193. TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
  1194. ])
  1195. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
  1196. SearchQuery(bool_query,get_total_count=True),
  1197. ColumnsToGet(return_type=ColumnReturnType.NONE))
  1198. if total_count==1:
  1199. list_data = getRow_ots(rows)
  1200. _data = list_data[0]
  1201. dpd = Document_product_dict(_data)
  1202. level = 1
  1203. if re.search("器械|设备|其他",name) is not None and level==1:
  1204. level = 2
  1205. if str(remove_words)=="nan":
  1206. remove_words = ""
  1207. dpd.setValue(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,standard_alias,True)
  1208. dpd.setValue(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,remove_words,True)
  1209. dpd.setValue(DOCUMENT_PRODUCT_DICT_LEVEL,level,True)
  1210. dpd.setValue(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED+1,True)
  1211. dpd.update_row(ots_client)
  1212. print(dpd.getProperties())
  1213. def test():
  1214. # pm = Product_Manager()
  1215. # pm.test()
  1216. # fix_product_data()
  1217. # test_check_brand()
  1218. test_match()
  1219. # rebuild_milvus()
  1220. # move_document_product()
  1221. # delete_brands()
  1222. # delete_specs()
  1223. # remove_redis_keys()
  1224. # update_document_product_dict()
  1225. def clean_product_dict_interface():
  1226. ots_client = getConnect_ots()
  1227. bool_query = BoolQuery(must_queries=[
  1228. BoolQuery(should_queries=[
  1229. TermQuery("action","insert"),
  1230. TermQuery("action","base")
  1231. ])
  1232. ])
  1233. task_queue = Queue()
  1234. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
  1235. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
  1236. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1237. list_data = getRow_ots(rows)
  1238. for _data in list_data:
  1239. task_queue.put(_data)
  1240. print("%d/%d"%(task_queue.qsize(),total_count))
  1241. while next_token:
  1242. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
  1243. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1244. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
  1245. list_data = getRow_ots(rows)
  1246. for _data in list_data:
  1247. task_queue.put(_data)
  1248. print("%d/%d"%(task_queue.qsize(),total_count))
  1249. def _handle(item,result_queue):
  1250. _dpd = Document_product_dict_interface(item)
  1251. _dpd.delete_row(ots_client)
  1252. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1253. mt.run()
  1254. def fix_attachment():
  1255. ots_client = getConnect_ots()
  1256. bool_query = BoolQuery(must_queries=[
  1257. RangeQuery("docid",1)
  1258. ])
  1259. task_queue = Queue()
  1260. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  1261. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
  1262. columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1263. list_data = getRow_ots(rows)
  1264. for _data in list_data:
  1265. task_queue.put(_data)
  1266. print("%d/%d"%(task_queue.qsize(),total_count))
  1267. while next_token:
  1268. rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
  1269. SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
  1270. columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
  1271. list_data = getRow_ots(rows)
  1272. for _data in list_data:
  1273. task_queue.put(_data)
  1274. print("%d/%d"%(task_queue.qsize(),total_count))
  1275. def _handle(item,result_queue):
  1276. _product = Document_product(item)
  1277. docid = _product.getProperties().get("docid")
  1278. list_attachments,bid_filemd5s = Product_Manager.get_bid_filemd5s(docid,ots_client)
  1279. if len(list_attachments)>0:
  1280. _product.setValue(DOCUMENT_PRODUCT_ATTACHMENTS,json.dumps(list_attachments,ensure_ascii=False),True)
  1281. _product.setValue(DOCUMENT_PRODUCT_HAS_ATTACHMENTS,1,True)
  1282. if bid_filemd5s!="":
  1283. _product.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
  1284. _product.setValue(DOCUMENT_PRODUCT_HAS_BIDFILE,1,True)
  1285. _product.update_row(ots_client)
  1286. mt = MultiThreadHandler(task_queue,_handle,None,30)
  1287. mt.run()
  1288. if __name__ == '__main__':
  1289. # test()
  1290. fix_attachment()
  1291. # start_process_product()
  1292. # print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))
  1293. # print(Product_Manager.get_bid_filemd5s(155415770,getConnect_ots()))
  1294. # name = "一"
  1295. # ots_name = "一氧化碳分析仪"
  1296. # print(is_similar(name,ots_name),check_product(name,ots_name))
  1297. # print(is_legal_specs('SCM-A/SB(0.18D)'))
  1298. # clean_product_dict_interface()