123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- from BaseDataMaintenance.common.documentFingerprint import getMD5
- from BaseDataMaintenance.common.Utils import *
- from BaseDataMaintenance.common.milvusUtil import *
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.maintenance.product.productUtils import *
- from BaseDataMaintenance.model.ots.document_product_tmp import *
- from BaseDataMaintenance.model.ots.document_product import *
- from BaseDataMaintenance.model.ots.document_product_dict import *
- from tablestore import *
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from multiprocessing import Process,Queue
- from random import randint
- from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
- from apscheduler.schedulers.blocking import BlockingScheduler
- import logging
- root = logging.getLogger()
- root.setLevel(logging.INFO)
- class Product_Manager(Product_Dict_Manager):
- def __init__(self):
- super(Product_Manager, self).__init__()
- self.process_queue = Queue()
- self.ots_client = getConnect_ots()
- self.set_id = set()
- def get_product_id(self,docid,name,brand,specs,unit_price,quantity):
- if name is None:
- name = ""
- if brand is None:
- brand = ""
- if specs is None:
- specs = ""
- if quantity is None:
- quantity = ""
- if unit_price is None or unit_price=="":
- unit_price = ""
- else:
- unit_price = "%.2f"%float(unit_price)
- product_id = getMD5(str(docid)+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity))
- return product_id
- def producer(self,process_count=3000):
- q_size = self.process_queue.qsize()
- if q_size>process_count/6:
- return
- bool_query = BoolQuery(must_queries=[RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,1,51)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- _count = len(list_data)
- list_id = []
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
- if _id in self.set_id:
- continue
- list_id.append(_id)
- self.process_queue.put(_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
- if _id in self.set_id:
- continue
- list_id.append(_id)
- self.process_queue.put(_d)
- _count += len(list_data)
- if _count>=process_count:
- break
- self.set_id = set(list_id)
- def comsumer(self):
- def start_thread(thread_count):
- mt = MultiThreadHandler(self.process_queue,self.comsumer_handle,None,thread_count,1,False,True)
- mt.run()
- process_count = 3
- thread_count = 10
- list_process = []
- for _i in range(process_count):
- p = Process(target=start_thread,args=(thread_count,))
- list_process.append(p)
- for p in list_process:
- p.start()
- for p in list_process:
- p.join()
- def comsumer_handle(self,item,result_queue):
- self.standardize(item)
- def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]):
- '''
- Standardizes the product data
- 通过匹配标准参数表进行标准化,匹配是非精确匹配,校验规则是?
- :return:
- only save the standard product
- one temp data is regard as standard product onli if match the name,contition on this,
- if the brand is matched: if can be standard then change else add new brand ;if not matched replace as ""
- and the same as specs
- auto add the connection of name-brand and brand-specs because the 3 degree tree structure
- '''
- # todo:1. 产品参数表自动添加新的数据? 1. add new contections between existing names.2. add new specs
- # 型号在进行匹配时要求差异字符串不能包含数字和字母和罗马数字,且不能忽略出现次数差异
- save_product_tmp = Document_product_tmp({DOCUMENT_PRODUCT_TMP_ID:tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID)})
- _status = 0
- document_product_tmp = Document_product_tmp(tmp_dict)
- name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,"")
- brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,"")
- specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,"")
- max_len = max(len(name),len(brand),len(specs))
- max_len_str = name if len(name)==max_len else brand if len(brand)==max_len else specs
- if name=="" and max_len>=8:
- name = max_len_str
- if brand=="" and max_len>=8:
- brand = max_len_str
- if specs=="" and max_len>=8:
- specs = max_len_str
- new_name = ""
- new_brand = ""
- new_specs = ""
- name_ots_id = None
- brand_ots_id = None
- specs_ots_id = None
- if name is not None and name!="":
- name_vector = request_embedding(name)
- if name_vector is not None:
- Coll,_ = self.get_collection(NAME_GRADE)
- search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
- for _search in search_list:
- ots_id = _search.entity.get("standard_name_id")
- ots_name = _search.entity.get("standard_name")
- ots_parent_id = _search.entity.get("ots_parent_id")
- if is_similar(name,ots_name) or check_product(name,ots_name):
- name_ots_id = ots_id
- new_name = ots_name
- #update alias of name
- _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
- _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- if _flag and _dpd.updateAlias(name):
- _dpd.update_row(self.ots_client)
- break
- if name_ots_id is not None:
- if brand is not None and brand!="":
- brand_vector = request_embedding(brand)
- if brand_vector is not None:
- Coll,_ = self.get_collection(BRAND_GRADE)
- search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
- for _search in search_list:
- ots_id = _search.entity.get("standard_name_id")
- ots_name = _search.entity.get("standard_name")
- ots_parent_id = _search.entity.get("ots_parent_id")
- if is_similar(brand,ots_name) or check_brand(brand,ots_name):
- new_brand = ots_name
- # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
- if name_ots_id is not None:
- brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
- _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(str(brand).lower(),str(new_brand).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_brand = Document_product_dict(_d_brand)
- if not _dpd_brand.exists_row(self.ots_client):
- _dpd_brand.update_row(self.ots_client)
- else:
- #update alias
- _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
- _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- if _flag:
- if _dpd.updateAlias(brand):
- _dpd.update_row(self.ots_client)
- break
- else:
- # add new brand?
- pass
- if specs is not None and specs!="":
- specs_vector = request_embedding(specs)
- log("getting sepcs %s"%(specs))
- if specs_vector is not None:
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = search_embedding(Coll,embedding_index_name,[name_vector],self.search_params,output_fields,limit=60)
- for _search in search_list:
- ots_id = _search.entity.get("standard_name_id")
- ots_name = _search.entity.get("standard_name")
- ots_parent_id = _search.entity.get("ots_parent_id")
- log("checking %s and %s"%(specs,ots_name))
- if is_similar(specs,ots_name):
- log("is_similar")
- if check_specs(specs,ots_name):
- log("check_specs succeed")
- new_specs = ots_name
- # to update the document_product_dict which is builded for search
- if brand_ots_id is not None:
- # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(str(specs).lower(),str(new_specs).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_specs = Document_product_dict(_d_specs)
- if not _dpd_specs.exists_row(self.ots_client):
- _dpd_specs.update_row(self.ots_client)
- else:
- #update alias
- _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
- _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- if _flag:
- if _dpd.updateAlias(specs):
- _dpd.update_row(self.ots_client)
- else:
- log("check_specs failed")
- new_specs = clean_product_specs(specs)
- # insert into document_product_dict a new record
- # to update the document_product_dict which is builded for search
- # add new specs
- if brand_ots_id is not None and name_ots_id is not None:
- _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
- _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd = Document_product_dict(_d)
- _dpd.update_row(self.ots_client)
- break
- else:
- # add new specs?
- log("not similar")
- if is_legal_specs(specs):
- log("is_legal_specs")
- new_specs = clean_product_specs(specs)
- # insert into document_product_dict a new record
- # to update the document_product_dict which is builded for search
- # add new specs
- if brand_ots_id is not None and name_ots_id is not None:
- _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
- _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd = Document_product_dict(_d)
- _dpd.update_row(self.ots_client)
- # judge if the product matches the standard product
- if name_ots_id is not None:
- #standard the product and same to document_product table
- _product = Document_product(tmp_dict)
- docid = _product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
- unit_price = _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)
- quantity = _product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY)
- unit_price = clean_product_unit_price(unit_price)
- quantity = clean_product_quantity(quantity)
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- if isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
- total_price = "%.2f"%(unit_price*quantity)
- _product.setValue(DOCUMENT_PRODUCT_TOTAL_PRICE,total_price,True)
- new_id = self.get_product_id(docid,new_name,new_brand,new_specs,unit_price,quantity)
- _product.setValue(DOCUMENT_PRODUCT_ID,new_id,True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_ID,tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID),True)
- if name_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_NAME_ID,name_ots_id,True)
- if brand_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_BRAND_ID,brand_ots_id,True)
- if specs_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_SPECS_ID,specs_ots_id,True)
- _product.setValue(DOCUMENT_PRODUCT_NAME,new_name,True)
- _product.setValue(DOCUMENT_PRODUCT_BRAND,new_brand,True)
- _product.setValue(DOCUMENT_PRODUCT_SPECS,new_specs,True)
- _product.setValue(DOCUMENT_PRODUCT_BRANDSPECS,"%s&&%s"%(new_brand,new_specs),True)
- _product.setValue(DOCUMENT_PRODUCT_FULL_NAME,"%s&&%s&&%s"%(new_name,new_brand,new_specs),True)
- if self.dumplicate(_product):
- _status = randint(201,301)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_NEW_ID,new_id,True)
- _product.update_row(self.ots_client)
- else:
- _status = randint(451,500)
- else:
- _status = randint(401,450)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_STATUS,_status,True)
- save_product_tmp.update_row(self.ots_client)
- def get_value_count(self,name,brand,specs,unit_price,quantity):
- value_count = 0
- if len(name)>0:
- value_count += 1
- if len(brand)>0:
- value_count += 1
- if len(specs)>0:
- value_count += 1
- if isinstance(unit_price,(float,int)) and unit_price>0:
- value_count += 1
- if isinstance(quantity,(float,int)) and quantity>0:
- value_count += 1
- def dumplicate_search_product(self,document_product):
- docid = document_product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
- name = document_product.getProperties().get(DOCUMENT_PRODUCT_NAME)
- brand = document_product.getProperties().get(DOCUMENT_PRODUCT_BRAND,"")
- specs = document_product.getProperties().get(DOCUMENT_PRODUCT_SPECS,"")
- unit_price = document_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
- quantity = document_product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY,"")
- page_time = document_product.getProperties().get(DOCUMENT_PRODUCT_PAGE_TIME)
- tenderee = document_product.getProperties().get(DOCUMENT_PRODUCT_TENDEREE,"")
- supplier = document_product.getProperties().get(DOCUMENT_PRODUCT_SUPPLIER,"")
- page_time_before = page_time
- page_time_after = page_time
- try:
- page_time_bofore = timeAdd(page_time,-30)
- page_time_after = timeAdd(page_time,30)
- except Exception as e:
- pass
- if len(name)>0 and len(brand)>0 and len(specs)>0 and isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- RangeQuery("page_time",page_time_before,page_time_after,True,True),
- TermQuery("brand",brand),
- TermQuery("specs",specs),
- TermQuery("unit_price",unit_price),
- TermQuery("quantity",quantity)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
- SearchQuery(bool_query,limit=1),
- columns_to_get=ColumnsToGet(["name",'brand','specs'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- if len(list_data)>0:
- return list_data[0].get(DOCUMENT_PRODUCT_ID),1
- if len(name)>0 and len(brand)>0 and len(supplier)>0 and len(tenderee)>0:
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- RangeQuery("page_time",page_time_before,page_time_after,True,True),
- TermQuery("brand",brand),
- TermQuery("tenderee",tenderee),
- TermQuery("supplier",supplier),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
- SearchQuery(bool_query,limit=50),
- columns_to_get=ColumnsToGet(["name",'brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- value_count = self.get_value_count(name,brand,specs,unit_price,quantity)
- for _d in list_data:
- s_id = _d.get(DOCUMENT_PRODUCT_ID)
- s_name = _d.get(DOCUMENT_PRODUCT_NAME,"")
- s_brand = _d.get(DOCUMENT_PRODUCT_BRAND,"")
- s_specs = _d.get(DOCUMENT_PRODUCT_SPECS,"")
- s_unit_price = _d.get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
- s_quantity = _d.get(DOCUMENT_PRODUCT_QUANTITY,"")
- check_flag = True
- value_count1 = self.get_value_count(s_name,s_brand,s_specs,s_unit_price,s_quantity)
- if len(specs)>0 and len(s_specs)>0 and specs!=s_specs:
- check_flag = False
- elif isinstance(unit_price,(float,int)) and isinstance(s_unit_price,(float,int)) and unit_price!=s_unit_price:
- check_flag = False
- elif isinstance(quantity,(float,int)) and isinstance(s_quantity,(float,int)) and quantity!=s_quantity:
- check_flag = False
- if check_flag:
- if value_count<value_count1:
- to_save = 0
- else:
- to_save = 1
- return s_id,to_save
- return None,1
- def dumplicate(self,document_product):
- '''
- Duplicates the product data
- 将同一个产品的采购结果公示进行去重,结合公告进行。
- :return:True if not repeated else False
- '''
- dump_id,to_save = self.dumplicate_search_product(document_product)
- if dump_id is not None:
- document_product.setValue(DOCUMENT_PRODUCT_DUMP_ID,dump_id,True)
- if to_save==1:
- if dump_id is not None:
- _d = {DOCUMENT_PRODUCT_ID,dump_id}
- _dp = Document_product(_d)
- _dp.delete_row(self.ots_client)
- return True
- else:
- return False
- def start_processing(self):
- scheduler = BlockingScheduler()
- scheduler.add_job(self.producer,"cron",second="*/20")
- scheduler.add_job(self.comsumer,"cron",minute="*/1")
- scheduler.add_job(self.embedding_comsumer,"cron",minute="*/1")
- scheduler.start()
- def start_process_product():
- pm = Product_Manager()
- pm.start_processing()
- if __name__ == '__main__':
- # start_process_product()
- print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))
|