products.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. from BaseDataMaintenance.common.documentFingerprint import getMD5
  2. from BaseDataMaintenance.common.Utils import *
  3. from BaseDataMaintenance.common.milvusUtil import *
  4. from BaseDataMaintenance.common.multiThread import MultiThreadHandler
  5. from BaseDataMaintenance.maintenance.product.productUtils import *
  6. from BaseDataMaintenance.model.ots.document_product_tmp import *
  7. from BaseDataMaintenance.model.ots.document_product import *
  8. from BaseDataMaintenance.model.ots.document_product_dict import *
  9. from tablestore import *
  10. from BaseDataMaintenance.dataSource.source import getConnect_ots
  11. from multiprocessing import Process,Queue
  12. from random import randint
  13. from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
  14. from apscheduler.schedulers.blocking import BlockingScheduler
  15. import logging
  16. root = logging.getLogger()
  17. root.setLevel(logging.INFO)
  18. class Product_Manager(Product_Dict_Manager):
  19. def __init__(self):
  20. super(Product_Manager, self).__init__()
  21. self.process_queue = Queue()
  22. self.ots_client = getConnect_ots()
  23. self.set_id = set()
  24. def get_product_id(self,docid,name,brand,specs,unit_price,quantity):
  25. if name is None:
  26. name = ""
  27. if brand is None:
  28. brand = ""
  29. if specs is None:
  30. specs = ""
  31. if quantity is None:
  32. quantity = ""
  33. if unit_price is None or unit_price=="":
  34. unit_price = ""
  35. else:
  36. unit_price = "%.2f"%float(unit_price)
  37. product_id = getMD5(str(docid)+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity))
  38. return product_id
  39. def producer(self,process_count=3000):
  40. q_size = self.process_queue.qsize()
  41. if q_size>process_count/6:
  42. return
  43. bool_query = BoolQuery(must_queries=[RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,1,51)])
  44. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
  45. SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
  46. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  47. list_data = getRow_ots(rows)
  48. _count = len(list_data)
  49. list_id = []
  50. for _d in list_data:
  51. _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
  52. if _id in self.set_id:
  53. continue
  54. list_id.append(_id)
  55. self.process_queue.put(_d)
  56. while next_token:
  57. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
  58. SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
  59. columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
  60. list_data = getRow_ots(rows)
  61. for _d in list_data:
  62. _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
  63. if _id in self.set_id:
  64. continue
  65. list_id.append(_id)
  66. self.process_queue.put(_d)
  67. _count += len(list_data)
  68. if _count>=process_count:
  69. break
  70. self.set_id = set(list_id)
  71. def comsumer(self):
  72. def start_thread(thread_count):
  73. mt = MultiThreadHandler(self.process_queue,self.comsumer_handle,None,thread_count,1,False,True)
  74. mt.run()
  75. process_count = 3
  76. thread_count = 10
  77. list_process = []
  78. for _i in range(process_count):
  79. p = Process(target=start_thread,args=(thread_count,))
  80. list_process.append(p)
  81. for p in list_process:
  82. p.start()
  83. for p in list_process:
  84. p.join()
  85. def comsumer_handle(self,item,result_queue):
  86. self.standardize(item)
  87. def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]):
  88. '''
  89. Standardizes the product data
  90. 通过匹配标准参数表进行标准化,匹配是非精确匹配,校验规则是?
  91. :return:
  92. only save the standard product
  93. one temp data is regard as standard product onli if match the name,contition on this,
  94. if the brand is matched: if can be standard then change else add new brand ;if not matched replace as ""
  95. and the same as specs
  96. auto add the connection of name-brand and brand-specs because the 3 degree tree structure
  97. '''
  98. # todo:1. 产品参数表自动添加新的数据? 1. add new contections between existing names.2. add new specs
  99. # 型号在进行匹配时要求差异字符串不能包含数字和字母和罗马数字,且不能忽略出现次数差异
  100. save_product_tmp = Document_product_tmp({DOCUMENT_PRODUCT_TMP_ID:tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID)})
  101. _status = 0
  102. document_product_tmp = Document_product_tmp(tmp_dict)
  103. name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,"")
  104. brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,"")
  105. specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,"")
  106. max_len = max(len(name),len(brand),len(specs))
  107. max_len_str = name if len(name)==max_len else brand if len(brand)==max_len else specs
  108. if name=="" and max_len>=8:
  109. name = max_len_str
  110. if brand=="" and max_len>=8:
  111. brand = max_len_str
  112. if specs=="" and max_len>=8:
  113. specs = max_len_str
  114. new_name = ""
  115. new_brand = ""
  116. new_specs = ""
  117. name_ots_id = None
  118. brand_ots_id = None
  119. specs_ots_id = None
  120. if name is not None and name!="":
  121. name_vector = request_embedding(name)
  122. if name_vector is not None:
  123. Coll,_ = self.get_collection(NAME_GRADE)
  124. search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
  125. for _search in search_list:
  126. ots_id = _search.entity.get("standard_name_id")
  127. ots_name = _search.entity.get("standard_name")
  128. ots_parent_id = _search.entity.get("ots_parent_id")
  129. if is_similar(name,ots_name) or check_product(name,ots_name):
  130. name_ots_id = ots_id
  131. new_name = ots_name
  132. #update alias of name
  133. _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
  134. _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
  135. if _flag and _dpd.updateAlias(name):
  136. _dpd.update_row(self.ots_client)
  137. break
  138. if name_ots_id is not None:
  139. if brand is not None and brand!="":
  140. brand_vector = request_embedding(brand)
  141. if brand_vector is not None:
  142. Coll,_ = self.get_collection(BRAND_GRADE)
  143. search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
  144. for _search in search_list:
  145. ots_id = _search.entity.get("standard_name_id")
  146. ots_name = _search.entity.get("standard_name")
  147. ots_parent_id = _search.entity.get("ots_parent_id")
  148. if is_similar(brand,ots_name) or check_brand(brand,ots_name):
  149. new_brand = ots_name
  150. # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
  151. if name_ots_id is not None:
  152. brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
  153. _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
  154. DOCUMENT_PRODUCT_DICT_NAME:new_brand,
  155. DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(str(brand).lower(),str(new_brand).lower()),
  156. DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
  157. DOCUMENT_PRODUCT_DICT_STATUS:1,
  158. DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
  159. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  160. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  161. }
  162. _dpd_brand = Document_product_dict(_d_brand)
  163. if not _dpd_brand.exists_row(self.ots_client):
  164. _dpd_brand.update_row(self.ots_client)
  165. else:
  166. #update alias
  167. _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
  168. _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
  169. if _flag:
  170. if _dpd.updateAlias(brand):
  171. _dpd.update_row(self.ots_client)
  172. break
  173. else:
  174. # add new brand?
  175. pass
  176. if specs is not None and specs!="":
  177. specs_vector = request_embedding(specs)
  178. log("getting sepcs %s"%(specs))
  179. if specs_vector is not None:
  180. Coll,_ = self.get_collection(SPECS_GRADE)
  181. search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
  182. for _search in search_list:
  183. ots_id = _search.entity.get("standard_name_id")
  184. ots_name = _search.entity.get("standard_name")
  185. ots_parent_id = _search.entity.get("ots_parent_id")
  186. log("checking %s and %s"%(specs,ots_name))
  187. if is_similar(specs,ots_name):
  188. log("is_similar")
  189. if check_specs(specs,ots_name):
  190. log("check_specs succeed")
  191. new_specs = ots_name
  192. # to update the document_product_dict which is builded for search
  193. if brand_ots_id is not None:
  194. # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
  195. specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
  196. _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
  197. DOCUMENT_PRODUCT_DICT_NAME:new_specs,
  198. DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(str(specs).lower(),str(new_specs).lower()),
  199. DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
  200. DOCUMENT_PRODUCT_DICT_STATUS:1,
  201. DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
  202. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  203. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  204. }
  205. _dpd_specs = Document_product_dict(_d_specs)
  206. if not _dpd_specs.exists_row(self.ots_client):
  207. _dpd_specs.update_row(self.ots_client)
  208. else:
  209. #update alias
  210. _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
  211. _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
  212. if _flag:
  213. if _dpd.updateAlias(specs):
  214. _dpd.update_row(self.ots_client)
  215. else:
  216. log("check_specs failed")
  217. new_specs = clean_product_specs(specs)
  218. # insert into document_product_dict a new record
  219. # to update the document_product_dict which is builded for search
  220. # add new specs
  221. if brand_ots_id is not None and name_ots_id is not None:
  222. _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
  223. _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
  224. DOCUMENT_PRODUCT_DICT_NAME:new_specs,
  225. DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
  226. DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
  227. DOCUMENT_PRODUCT_DICT_STATUS:1,
  228. DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
  229. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  230. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  231. }
  232. _dpd = Document_product_dict(_d)
  233. _dpd.update_row(self.ots_client)
  234. break
  235. else:
  236. # add new specs?
  237. log("not similar")
  238. if is_legal_specs(specs):
  239. log("is_legal_specs")
  240. new_specs = clean_product_specs(specs)
  241. # insert into document_product_dict a new record
  242. # to update the document_product_dict which is builded for search
  243. # add new specs
  244. if brand_ots_id is not None and name_ots_id is not None:
  245. _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
  246. _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
  247. DOCUMENT_PRODUCT_DICT_NAME:new_specs,
  248. DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
  249. DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
  250. DOCUMENT_PRODUCT_DICT_STATUS:1,
  251. DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
  252. DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  253. DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
  254. }
  255. _dpd = Document_product_dict(_d)
  256. _dpd.update_row(self.ots_client)
  257. # judge if the product matches the standard product
  258. if name_ots_id is not None:
  259. #standard the product and same to document_product table
  260. _product = Document_product(tmp_dict)
  261. docid = _product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
  262. unit_price = _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)
  263. quantity = _product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY)
  264. unit_price = clean_product_unit_price(unit_price)
  265. quantity = clean_product_quantity(quantity)
  266. _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
  267. _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
  268. if isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
  269. total_price = "%.2f"%(unit_price*quantity)
  270. _product.setValue(DOCUMENT_PRODUCT_TOTAL_PRICE,total_price,True)
  271. new_id = self.get_product_id(docid,new_name,new_brand,new_specs,unit_price,quantity)
  272. _product.setValue(DOCUMENT_PRODUCT_ID,new_id,True)
  273. _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_ID,tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID),True)
  274. if name_ots_id is not None:
  275. _product.setValue(DOCUMENT_PRODUCT_DICT_NAME_ID,name_ots_id,True)
  276. if brand_ots_id is not None:
  277. _product.setValue(DOCUMENT_PRODUCT_DICT_BRAND_ID,brand_ots_id,True)
  278. if specs_ots_id is not None:
  279. _product.setValue(DOCUMENT_PRODUCT_DICT_SPECS_ID,specs_ots_id,True)
  280. _product.setValue(DOCUMENT_PRODUCT_NAME,new_name,True)
  281. _product.setValue(DOCUMENT_PRODUCT_BRAND,new_brand,True)
  282. _product.setValue(DOCUMENT_PRODUCT_SPECS,new_specs,True)
  283. _product.setValue(DOCUMENT_PRODUCT_BRANDSPECS,"%s&&%s"%(new_brand,new_specs),True)
  284. _product.setValue(DOCUMENT_PRODUCT_FULL_NAME,"%s&&%s&&%s"%(new_name,new_brand,new_specs),True)
  285. if self.dumplicate(_product):
  286. _status = randint(201,301)
  287. save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_NEW_ID,new_id,True)
  288. _product.update_row(self.ots_client)
  289. else:
  290. _status = randint(451,500)
  291. else:
  292. _status = randint(401,450)
  293. save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_STATUS,_status,True)
  294. save_product_tmp.update_row(self.ots_client)
  295. def get_value_count(self,name,brand,specs,unit_price,quantity):
  296. value_count = 0
  297. if len(name)>0:
  298. value_count += 1
  299. if len(brand)>0:
  300. value_count += 1
  301. if len(specs)>0:
  302. value_count += 1
  303. if isinstance(unit_price,(float,int)) and unit_price>0:
  304. value_count += 1
  305. if isinstance(quantity,(float,int)) and quantity>0:
  306. value_count += 1
  307. def dumplicate_search_product(self,document_product):
  308. docid = document_product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
  309. name = document_product.getProperties().get(DOCUMENT_PRODUCT_NAME)
  310. brand = document_product.getProperties().get(DOCUMENT_PRODUCT_BRAND,"")
  311. specs = document_product.getProperties().get(DOCUMENT_PRODUCT_SPECS,"")
  312. unit_price = document_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
  313. quantity = document_product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY,"")
  314. page_time = document_product.getProperties().get(DOCUMENT_PRODUCT_PAGE_TIME)
  315. tenderee = document_product.getProperties().get(DOCUMENT_PRODUCT_TENDEREE,"")
  316. supplier = document_product.getProperties().get(DOCUMENT_PRODUCT_SUPPLIER,"")
  317. page_time_before = page_time
  318. page_time_after = page_time
  319. try:
  320. page_time_bofore = timeAdd(page_time,-30)
  321. page_time_after = timeAdd(page_time,30)
  322. except Exception as e:
  323. pass
  324. if len(name)>0 and len(brand)>0 and len(specs)>0 and isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
  325. bool_query = BoolQuery(must_queries=[TermQuery("name",name),
  326. RangeQuery("page_time",page_time_before,page_time_after,True,True),
  327. TermQuery("brand",brand),
  328. TermQuery("specs",specs),
  329. TermQuery("unit_price",unit_price),
  330. TermQuery("quantity",quantity)
  331. ])
  332. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
  333. SearchQuery(bool_query,limit=1),
  334. columns_to_get=ColumnsToGet(["name",'brand','specs'],return_type=ColumnReturnType.SPECIFIED))
  335. list_data = getRow_ots(rows)
  336. if len(list_data)>0:
  337. return list_data[0].get(DOCUMENT_PRODUCT_ID),1
  338. if len(name)>0 and len(brand)>0 and len(supplier)>0 and len(tenderee)>0:
  339. bool_query = BoolQuery(must_queries=[TermQuery("name",name),
  340. RangeQuery("page_time",page_time_before,page_time_after,True,True),
  341. TermQuery("brand",brand),
  342. TermQuery("tenderee",tenderee),
  343. TermQuery("supplier",supplier),
  344. ])
  345. rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
  346. SearchQuery(bool_query,limit=50),
  347. columns_to_get=ColumnsToGet(["name",'brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
  348. list_data = getRow_ots(rows)
  349. value_count = self.get_value_count(name,brand,specs,unit_price,quantity)
  350. for _d in list_data:
  351. s_id = _d.get(DOCUMENT_PRODUCT_ID)
  352. s_name = _d.get(DOCUMENT_PRODUCT_NAME,"")
  353. s_brand = _d.get(DOCUMENT_PRODUCT_BRAND,"")
  354. s_specs = _d.get(DOCUMENT_PRODUCT_SPECS,"")
  355. s_unit_price = _d.get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
  356. s_quantity = _d.get(DOCUMENT_PRODUCT_QUANTITY,"")
  357. check_flag = True
  358. value_count1 = self.get_value_count(s_name,s_brand,s_specs,s_unit_price,s_quantity)
  359. if len(specs)>0 and len(s_specs)>0 and specs!=s_specs:
  360. check_flag = False
  361. elif isinstance(unit_price,(float,int)) and isinstance(s_unit_price,(float,int)) and unit_price!=s_unit_price:
  362. check_flag = False
  363. elif isinstance(quantity,(float,int)) and isinstance(s_quantity,(float,int)) and quantity!=s_quantity:
  364. check_flag = False
  365. if check_flag:
  366. if value_count<value_count1:
  367. to_save = 0
  368. else:
  369. to_save = 1
  370. return s_id,to_save
  371. return None,1
  372. def dumplicate(self,document_product):
  373. '''
  374. Duplicates the product data
  375. 将同一个产品的采购结果公示进行去重,结合公告进行。
  376. :return:True if not repeated else False
  377. '''
  378. dump_id,to_save = self.dumplicate_search_product(document_product)
  379. if dump_id is not None:
  380. document_product.setValue(DOCUMENT_PRODUCT_DUMP_ID,dump_id,True)
  381. if to_save==1:
  382. if dump_id is not None:
  383. _d = {DOCUMENT_PRODUCT_ID,dump_id}
  384. _dp = Document_product(_d)
  385. _dp.delete_row(self.ots_client)
  386. return True
  387. else:
  388. return False
  389. def start_processing(self):
  390. scheduler = BlockingScheduler()
  391. scheduler.add_job(self.producer,"cron",second="*/20")
  392. scheduler.add_job(self.comsumer,"cron",minute="*/1")
  393. scheduler.add_job(self.embedding_comsumer,"cron",minute="*/1")
  394. scheduler.start()
  395. def start_process_product():
  396. pm = Product_Manager()
  397. pm.start_processing()
  398. if __name__ == '__main__':
  399. # start_process_product()
  400. print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))