1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006 |
- from BaseDataMaintenance.common.documentFingerprint import getMD5
- from BaseDataMaintenance.common.Utils import *
- from BaseDataMaintenance.common.milvusUtil import *
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.maintenance.product.productUtils import *
- from BaseDataMaintenance.model.ots.document_product_tmp import *
- from BaseDataMaintenance.model.ots.document_product import *
- from BaseDataMaintenance.model.ots.document_product_dict import *
- from BaseDataMaintenance.model.ots.document_product_dict_interface import *
- from BaseDataMaintenance.model.ots.document import *
- from BaseDataMaintenance.model.ots.attachment import *
- from BaseDataMaintenance.model.ots.enterprise import *
- from tablestore import *
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from multiprocessing import Process,Queue
- from random import randint
- from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.maintenance.product.make_brand_pattern import *
- from BaseDataMaintenance.maintenance.product.product_dict import IS_SYNCHONIZED
- import logging
- root = logging.getLogger()
- root.setLevel(logging.INFO)
- from uuid import uuid4
- class Product_Manager(Product_Dict_Manager):
- def __init__(self):
- super(Product_Manager, self).__init__()
- self.process_queue = Queue()
- self.ots_client = getConnect_ots()
- self.set_id = set()
- def get_product_id(self,docid,name,brand,specs,unit_price,quantity):
- if name is None:
- name = ""
- if brand is None:
- brand = ""
- if specs is None:
- specs = ""
- if quantity is None:
- quantity = ""
- if unit_price is None or unit_price=="":
- unit_price = ""
- else:
- unit_price = "%.2f"%float(unit_price)
- product_id = getMD5(str(docid)+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity))
- return product_id
- def producer(self,process_count=3000):
- q_size = self.process_queue.qsize()
- if q_size>process_count/6:
- return
- bool_query = BoolQuery(must_queries=[RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,1,51)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- _count = len(list_data)
- list_id = []
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
- if _id in self.set_id:
- continue
- list_id.append(_id)
- self.process_queue.put(_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product_temp","document_product_temp_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
- if _id in self.set_id:
- continue
- list_id.append(_id)
- self.process_queue.put(_d)
- _count += len(list_data)
- if _count>=process_count:
- break
- self.set_id = set(list_id)
- def comsumer(self):
- def start_thread(thread_count):
- mt = MultiThreadHandler(self.process_queue,self.comsumer_handle,None,thread_count,1,False,True)
- mt.run()
- process_count = 4
- thread_count = 10
- list_process = []
- for _i in range(process_count):
- p = Process(target=start_thread,args=(thread_count,))
- list_process.append(p)
- for p in list_process:
- p.start()
- for p in list_process:
- p.join()
- def comsumer_handle(self,item,result_queue):
- self.standardize(item)
- def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]):
- '''
- Standardizes the product data
- 通过匹配标准参数表进行标准化,匹配是非精确匹配,校验规则是?
- :return:
- only save the standard product
- one temp data is regard as standard product onli if match the name,contition on this,
- if the brand is matched: if can be standard then change else add new brand ;if not matched replace as ""
- and the same as specs
- auto add the connection of name-brand and brand-specs because the 3 degree tree structure
- '''
- # todo:1. 产品参数表自动添加新的数据? 1. add new contections between existing names.2. add new specs
- # 型号在进行匹配时要求差异字符串不能包含数字和字母和罗马数字,且不能忽略出现次数差异
- save_product_tmp = Document_product_tmp({DOCUMENT_PRODUCT_TMP_ID:tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID)})
- _status = 0
- document_product_tmp = Document_product_tmp(tmp_dict)
- name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,"")
- brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,"")
- specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,"")
- parameters = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_PARAMETER,"")
- list_candidates = [a for a in [name,brand,specs,parameters] if a!=""]
- if brand=="" and parameters!="":
- brand = parameters
- if specs=="" and parameters!="":
- specs = parameters
- new_name = ""
- new_brand = ""
- new_specs = ""
- name_ots_id = None
- brand_ots_id = None
- specs_ots_id = None
- if name is not None and name!="":
- name_vector = get_embedding_request(name)
- if name_vector is not None:
- Coll,_ = self.get_collection(NAME_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,name,NAME_GRADE,[name_vector],self.search_params,output_fields,limit=60)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- if is_similar(name,ots_name) or check_product(name,ots_name):
- name_ots_id = ots_id
- new_name = ots_name
- # #update alias of name
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag and _dpd.updateAlias(name):
- # _dpd.update_row(self.ots_client)
- break
- if name_ots_id is None:
- for name in list_candidates:
- name_vector = get_embedding_request(name)
- if name_vector is not None:
- Coll,_ = self.get_collection(NAME_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,name,NAME_GRADE,[name_vector],self.search_params,output_fields,limit=20)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- if is_similar(name,ots_name) or check_product(name,ots_name):
- name_ots_id = ots_id
- new_name = ots_name
- # #update alias of name
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag and _dpd.updateAlias(name):
- # _dpd.update_row(self.ots_client)
- break
- if name_ots_id is not None:
- if brand is not None and brand!="":
- s_brand = brand
- l_brand = [brand]
- l_brand.append(clean_product_brand(s_brand))
- brand_ch = get_chinese_string(brand)
- l_brand.extend(brand_ch)
- _find = False
- for brand in l_brand:
- brand_vector = get_embedding_request(brand)
- if brand_vector is not None:
- Coll,_ = self.get_collection(BRAND_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,brand,BRAND_GRADE,[brand_vector],self.search_params,output_fields,limit=60)
- # log("search brand %s"%(brand))
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- # log("check brand %s and %s"%(brand,ots_name))
- if is_similar(brand,ots_name) or check_brand(brand,ots_name):
- # log("check brand similar succeed:%s and %s"%(brand,ots_name))
- if ots_name==new_name:
- continue
- new_brand = ots_name
- log("checking brand %s succeed %s"%(brand,new_brand))
- # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
- if name_ots_id is not None:
- brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
- _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(brand).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_brand = Document_product_dict(_d_brand)
- # _dpd_brand.updateAlias(str(new_brand).lower())
- if not _dpd_brand.exists_row(self.ots_client):
- _dpd_brand.update_row(self.ots_client)
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(brand):
- # _dpd.update_row(self.ots_client)
- _find = True
- break
- else:
- # log("check brand similar failed:%s and %s"%(brand,ots_name))
- # add new brand?
- pass
- if _find:
- break
- if not _find:
- for brand in l_brand:
- if self.check_new_brand(brand):
- new_brand = clean_product_brand(brand)
- if new_brand=="":
- continue
- log("adding new brand %s"%(str(new_brand)))
- _d_brand = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(str(brand).lower()),
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
- }
- dpdi = Document_product_dict_interface(_d_brand)
- dpdi.update_row(self.ots_client)
- break
- if brand_ots_id is None:
- _find = False
- for brand in list_candidates:
- if _find:
- break
- l_brand = [brand]
- l_brand.append(clean_product_brand(brand))
- brand_ch = get_chinese_string(brand)
- l_brand.extend(brand_ch)
- for brand in l_brand:
- if _find:
- break
- start_time = time.time()
- # brand_vector = request_embedding(brand)
- brand_vector = get_embedding_request(brand)
- debug("get embedding for brand %s takes %.4fs"%(brand,time.time()-start_time))
- if brand_vector is not None:
- Coll,_ = self.get_collection(BRAND_GRADE)
- start_time = time.time()
- # search_list = search_embedding(Coll,embedding_index_name,[brand_vector],self.search_params,output_fields,limit=10)
- search_list = get_embedding_search(Coll,embedding_index_name,brand,BRAND_GRADE,[brand_vector],self.search_params,output_fields,limit=10)
- debug("get search_list for brand %s takes %.4fs"%(brand,time.time()-start_time))
- # log("search brand %s"%(brand))
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- # log("check brand %s and %s"%(brand,ots_name))
- if is_similar(brand,ots_name,_radio=95) or check_brand(brand,ots_name):
- # log("check brand similar succeed:%s and %s"%(brand,ots_name))
- if ots_name==new_name:
- continue
- new_brand = ots_name
- log("checking brand %s succeed %s"%(brand,new_brand))
- # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
- if name_ots_id is not None:
- brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
- _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(brand).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_brand = Document_product_dict(_d_brand)
- # _dpd_brand.updateAlias(str(new_brand).lower())
- if not _dpd_brand.exists_row(self.ots_client):
- _dpd_brand.update_row(self.ots_client)
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(brand):
- # _dpd.update_row(self.ots_client)
- _find = True
- break
- if specs is not None and specs!="":
- debug("getting sepcs %s"%(specs))
- list_specs = []
- c_specs = clean_product_specs(specs)
- list_specs.append(c_specs)
- for s in re.split("[\u4e00-\u9fff]",specs):
- if s!="" and len(s)>4:
- list_specs.append(s)
- similar_flag = None
- _index = 0
- break_flag = False
- for c_specs in list_specs:
- if break_flag:
- break
- _index += 1
- specs_vector = get_embedding_request(c_specs)
- if specs_vector is not None:
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=60)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- debug("checking specs %s and %s"%(specs,ots_name))
- if is_similar(specs,ots_name):
- # log("specs is_similar")
- if check_specs(c_specs,ots_name):
- break_flag = True
- new_specs = ots_name
- log("check_specs %s succeed %s"%(specs,new_specs))
- # to update the document_product_dict which is builded for search
- if brand_ots_id is not None:
- # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(specs).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_specs = Document_product_dict(_d_specs)
- # _dpd_specs.updateAlias(str(new_specs).lower())
- if not _dpd_specs.exists_row(self.ots_client):
- _dpd_specs.update_row(self.ots_client)
- # user interface to add
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(specs):
- # _dpd.update_row(self.ots_client)
- break
- else:
- if _index == 1:
- similar_flag = True
- # add new specs?
- debug("specs not similar")
- if is_legal_specs(specs) and len(specs)<MAX_NAME_LENGTH and len(specs)>=5:
- debug("is_legal_specs")
- new_specs = clean_product_specs(specs)
- # insert into document_product_dict a new record
- # to update the document_product_dict which is builded for search
- # add new specs
- if brand_ots_id is not None and name_ots_id is not None:
- _md5 = get_document_product_dict_id(brand_ots_id,new_specs)
- # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
- # DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- # DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
- # DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- # DOCUMENT_PRODUCT_DICT_STATUS:1,
- # DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- # DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- # DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- # }
- # _dpd = Document_product_dict(_d)
- # _dpd.update_row(self.ots_client)
- log("adding new specs %s"%(new_specs))
- # user interface to add
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
- }
- _dpdi = Document_product_dict_interface(_d)
- _dpdi.update_row(self.ots_client)
- if specs_ots_id is None:
- _find = False
- for specs in list_candidates:
- if _find:
- break
- debug("getting sepcs %s"%(specs))
- list_specs = []
- c_specs = clean_product_specs(specs)
- list_specs.append(c_specs)
- for s in re.split("[\u4e00-\u9fff]",specs):
- if s!="" and len(s)>4:
- list_specs.append(s)
- similar_flag = None
- _index = 0
- for c_specs in list_specs:
- if _find:
- break
- _index += 1
- specs_vector = get_embedding_request(c_specs)
- if specs_vector is not None:
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=20)
- for _search in search_list:
- if _find:
- break
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- debug("checking specs %s and %s"%(specs,ots_name))
- if is_similar(specs,ots_name):
- # log("specs is_similar")
- if check_specs(c_specs,ots_name):
- break_flag = True
- new_specs = ots_name
- if brand_ots_id is not None:
- # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(specs).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_specs = Document_product_dict(_d_specs)
- # _dpd_specs.updateAlias(str(new_specs).lower())
- if not _dpd_specs.exists_row(self.ots_client):
- _dpd_specs.update_row(self.ots_client)
- _find = True
- break
- # judge if the product matches the standard product
- if name_ots_id is not None:
- is_legal_data = True
- #standard the product and same to document_product table
- _product = Document_product(tmp_dict)
- docid = _product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
- unit_price = _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)
- quantity = _product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY)
- unit_price = clean_product_unit_price(unit_price)
- quantity = clean_product_quantity(quantity)
- total_price = _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- win_bid_price = _product.getProperties().get(DOCUMENT_PRODUCT_WIN_BID_PRICE)
- if isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
- if unit_price>0:
- new_quantity = total_price/unit_price
- if new_quantity!=quantity:
- if new_quantity==total_price//unit_price:
- quantity = int(new_quantity)
- _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- else:
- is_legal_data = False
- elif quantity>0:
- unit_price = total_price/quantity
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- elif isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
- total_price = float("%.2f"%(unit_price*quantity))
- _product.setValue(DOCUMENT_PRODUCT_TOTAL_PRICE,total_price,True)
- elif isinstance(unit_price,(float,int)) and isinstance(total_price,(float,int)):
- if unit_price>0:
- quantity = int(total_price//unit_price)
- _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- elif isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
- if quantity>0:
- unit_price = float("%.2f"%(total_price/quantity))
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- elif isinstance(quantity,(float,int)) and quantity>10000:
- is_legal_data = False
- if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE),(float,int)) and isinstance(win_bid_price,(float,int)):
- if _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)>win_bid_price*10 and win_bid_price>0:
- is_legal_data = False
- if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE),(float,int)) and _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)>100000000:
- is_legal_data = False
- new_id = self.get_product_id(docid,new_name,new_brand,new_specs,unit_price,quantity)
- _product.setValue(DOCUMENT_PRODUCT_ID,new_id,True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_ID,tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID),True)
- if name_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_NAME_ID,name_ots_id,True)
- if brand_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_BRAND_ID,brand_ots_id,True)
- if specs_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_SPECS_ID,specs_ots_id,True)
- _product.setValue(DOCUMENT_PRODUCT_NAME,new_name,True)
- _product.setValue(DOCUMENT_PRODUCT_BRAND,new_brand,True)
- _product.setValue(DOCUMENT_PRODUCT_SPECS,new_specs,True)
- _product.setValue(DOCUMENT_PRODUCT_STATUS,randint(201,300),True)
- _product.setValue(DOCUMENT_PRODUCT_BRANDSPECS,"%s&&%s"%(new_brand,new_specs),True)
- _product.setValue(DOCUMENT_PRODUCT_FULL_NAME,"%s&&%s&&%s"%(new_name,new_brand,new_specs),True)
- _product.setValue(DOCUMENT_PRODUCT_CREATE_TIME,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_NAME,document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,""),True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_BRAND,document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,""),True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,""),True)
- bid_filemd5s = self.get_bid_filemd5s(docid,self.ots_client)
- if bid_filemd5s is not None:
- _product.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
- if not is_legal_data:
- _status = randint(501,550)
- elif self.dumplicate(_product):
- _status = randint(201,300)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_NEW_ID,new_id,True)
- _product.update_row(self.ots_client)
- else:
- _status = randint(451,500)
- else:
- _status = randint(401,450)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_STATUS,_status,True)
- save_product_tmp.update_row(self.ots_client)
- def check_new_brand(self,brand):
- return is_legal_brand(self.ots_client,brand)
- @staticmethod
- def get_bid_filemd5s(docid,ots_client):
- bool_query = BoolQuery(must_queries=[
- TermQuery("docids",docid)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,limit=10),
- columns_to_get=ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- list_bid_filemd5s = []
- set_docids = set([docid])
- set_md5s = set()
- for _d in list_data:
- try:
- docids = _d.get("docids","")
- for _id in docids.split(","):
- set_docids.add(int(_id))
- except Exception as e:
- pass
- list_docids = list(set_docids)
- for _docid in list_docids:
- _d = {document_partitionkey:_docid%500+1,
- document_docid:_docid}
- _doc = Document(_d)
- _doc.fix_columns(ots_client,[document_attachment_path],True)
- page_attachments = _doc.getProperties().get(document_attachment_path)
- if page_attachments is not None and page_attachments!="":
- attachments = json.loads(page_attachments)
- for _a in attachments:
- _filemd5 = _a.get(document_attachment_path_filemd5)
- if _filemd5 in set_md5s or _filemd5 is None:
- continue
- set_md5s.add(_filemd5)
- _da = {attachment_filemd5:_filemd5}
- _attach = attachment(_da)
- _attach.fix_columns(ots_client,[attachment_classification],True)
- if _attach.getProperties().get(attachment_classification,"")=="招标文件":
- list_bid_filemd5s.append(_filemd5)
- if len(list_bid_filemd5s)==0:
- return None
- return ",".join(list(set(list_bid_filemd5s)))
- def get_value_count(self,name,brand,specs,unit_price,quantity):
- value_count = 0
- if len(name)>0:
- value_count += 1
- if len(brand)>0:
- value_count += 1
- if len(specs)>0:
- value_count += 1
- if isinstance(unit_price,(float,int)) and unit_price>0:
- value_count += 1
- if isinstance(quantity,(float,int)) and quantity>0:
- value_count += 1
- return value_count
- def dumplicate_search_product(self,document_product):
- docid = document_product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
- name = str(document_product.getProperties().get(DOCUMENT_PRODUCT_NAME,""))
- brand = str(document_product.getProperties().get(DOCUMENT_PRODUCT_BRAND,""))
- specs = str(document_product.getProperties().get(DOCUMENT_PRODUCT_SPECS,""))
- unit_price = document_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
- quantity = document_product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY,"")
- page_time = document_product.getProperties().get(DOCUMENT_PRODUCT_PAGE_TIME)
- tenderee = str(document_product.getProperties().get(DOCUMENT_PRODUCT_TENDEREE,""))
- supplier = str(document_product.getProperties().get(DOCUMENT_PRODUCT_SUPPLIER,""))
- page_time_before = page_time
- page_time_after = page_time
- try:
- page_time_before = timeAdd(page_time,-30,format="%Y-%m-%d",)
- page_time_after = timeAdd(page_time,30)
- except Exception as e:
- pass
- if len(name)>0 and len(brand)>0 and len(specs)>0 and isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- RangeQuery("page_time",page_time_before,page_time_after,True,True),
- TermQuery("brand",brand),
- TermQuery("specs",specs),
- TermQuery("unit_price",unit_price),
- TermQuery("quantity",quantity)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
- SearchQuery(bool_query,limit=1),
- columns_to_get=ColumnsToGet(["name",'brand','specs'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- if len(list_data)>0:
- return list_data[0].get(DOCUMENT_PRODUCT_ID),1
- if len(name)>0 and len(brand)>0 and len(supplier)>0 and len(tenderee)>0:
- # log("docid %s name %s page_time_before %s page_time_after %s brand %s supplier %s tenderee %s"%(str(docid),name,page_time_before,page_time_after,brand,supplier,tenderee))
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- RangeQuery("page_time",page_time_before,page_time_after,True,True),
- TermQuery(DOCUMENT_PRODUCT_BRAND,brand),
- TermQuery(DOCUMENT_PRODUCT_TENDEREE,tenderee),
- TermQuery(DOCUMENT_PRODUCT_SUPPLIER,supplier),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("document_product","document_product_index",
- SearchQuery(bool_query,limit=50),
- columns_to_get=ColumnsToGet(['name','brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- value_count = self.get_value_count(name,brand,specs,unit_price,quantity)
- for _d in list_data:
- s_id = _d.get(DOCUMENT_PRODUCT_ID)
- s_name = _d.get(DOCUMENT_PRODUCT_NAME,"")
- s_brand = _d.get(DOCUMENT_PRODUCT_BRAND,"")
- s_specs = _d.get(DOCUMENT_PRODUCT_SPECS,"")
- s_unit_price = _d.get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
- s_quantity = _d.get(DOCUMENT_PRODUCT_QUANTITY,"")
- check_flag = True
- value_count1 = self.get_value_count(s_name,s_brand,s_specs,s_unit_price,s_quantity)
- if len(specs)>0 and len(s_specs)>0 and specs!=s_specs:
- check_flag = False
- elif isinstance(unit_price,(float,int)) and isinstance(s_unit_price,(float,int)) and unit_price!=s_unit_price:
- check_flag = False
- elif isinstance(quantity,(float,int)) and isinstance(s_quantity,(float,int)) and quantity!=s_quantity:
- check_flag = False
- if check_flag:
- if value_count<value_count1:
- to_save = 0
- else:
- to_save = 1
- return s_id,to_save
- return None,1
- def dumplicate(self,document_product):
- '''
- Duplicates the product data
- 将同一个产品的采购结果公示进行去重,结合公告进行。
- :return:True if not repeated else False
- '''
- dump_id,to_save = self.dumplicate_search_product(document_product)
- if dump_id is not None:
- document_product.setValue(DOCUMENT_PRODUCT_DUMP_ID,dump_id,True)
- if to_save==1:
- if dump_id is not None:
- _d = {DOCUMENT_PRODUCT_ID:dump_id,
- DOCUMENT_PRODUCT_STATUS:randint(401,450),
- DOCUMENT_PRODUCT_DUMP_ID:document_product.getProperties().get(DOCUMENT_PRODUCT_ID)}
- _dp = Document_product(_d)
- _dp.update_row(self.ots_client)
- return True
- else:
- return False
- def start_processing(self):
- scheduler = BlockingScheduler()
- scheduler.add_job(self.producer,"cron",second="*/20")
- scheduler.add_job(self.comsumer,"cron",minute="*/1")
- scheduler.add_job(self.embedding_comsumer,"cron",minute="*/1")
- scheduler.add_job(self.embedding_interface_comsumer,"cron",second="*/20")
- scheduler.start()
- def test(self):
- from BaseDataMaintenance.common.sentencesUtil import cosine_similarity
- import torch
- output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]
- id = '56bdad168c71a1fc4d57cd10bcd987f0'
- collection,_ = self.get_collection(SPECS_GRADE)
- vector = request_embedding("西门子MAGNETOMLumina")
- vector1 = request_embedding("西门子")
- print("cosine similarity",cosine_similarity(torch.from_numpy(np.array([vector])) ,torch.from_numpy(np.array([vector1]))))
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = search_embedding(Coll,embedding_index_name,[vector],self.search_params,output_fields,limit=60)
- for p in search_list:
- print(p)
- #
- # res = collection.query(
- # expr = "ots_id in ['%s']"%(id),
- # offset = 0,
- # limit = 10,
- # output_fields = output_fields,
- # consistency_level="Strong"
- # )
- # print(res)
- def start_process_product():
- pm = Product_Manager()
- pm.start_processing()
- def fix_product_data():
- '''
- # delete document_product and change the record status to 1 in document_product_temp which id=original id
- :return:
- '''
- table_name = "document_product_temp"
- table_index = "document_product_temp_index"
- columns = [DOCUMENT_PRODUCT_TMP_WIN_BID_PRICE]
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- RangeQuery("status",501),
- # TermQuery("docid",246032980)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_rows = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_rows.extend(getRow_ots(rows))
- print("%d/%d"%(len(list_rows),total_count))
- # if len(list_rows)>10000:
- # break
- task_queue = Queue()
- for d in list_rows:
- task_queue.put(d)
- def fix_missing_data(item,result_queue):
- original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
- dpt = Document_product_tmp(_d)
- dpt.fix_columns(ots_client,["name","brand","specs"],True)
- _d = {DOCUMENT_PRODUCT_ID:item.get(DOCUMENT_PRODUCT_ID)}
- dp = Document_product(_d)
- #fix the project_code and original_name and bidi_filemd5s
- docid = int(item.get(DOCUMENT_PRODUCT_DOCID))
- partitionkey = docid%500+1
- # project_name = item.get(DOCUMENT_PRODUCT_PROJECT_NAME,"")
- # if project_name=="":
- # #fix project_name
- # _doc = Document({"partitionkey":partitionkey,
- # "docid":docid})
- # _doc.fix_columns(ots_client,["doctitle"],True)
- # dp.setValue(DOCUMENT_PRODUCT_DOCTITLE,_doc.getProperties().get("doctitle"),True)
- bid_filemd5s = Product_Manager.get_bid_filemd5s(docid,ots_client)
- if bid_filemd5s is not None:
- dp.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_NAME,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,""),True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_BRAND,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,""),True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,""),True)
- dp.update_row(ots_client)
- def deleteAndReprocess(item,result_queue):
- original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- # delete data and rerun
- _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
- dpt = Document_product_tmp(_d)
- dpt.update_row(ots_client)
- _d = {DOCUMENT_PRODUCT_ID:item.get(DOCUMENT_PRODUCT_ID)}
- dp = Document_product(_d)
- dp.delete_row(ots_client)
- def handle(item,result_queue):
- win_bid_price = item.get(DOCUMENT_PRODUCT_TMP_WIN_BID_PRICE,1)
- if win_bid_price==0:
- dpt = Document_product_tmp(item)
- dpt.setValue(DOCUMENT_PRODUCT_TMP_STATUS,1,True)
- dpt.update_row(ots_client)
- mt = MultiThreadHandler(task_queue,handle,None,30,1)
- mt.run()
- def test_check_brand():
- import logging
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- from queue import Queue
- brand_path = "brand.txt"
- list_brand = []
- with open(brand_path,"r",encoding="utf8") as f:
- while 1:
- line = f.readline()
- if not line:
- break
- line = line.strip()
- if len(line)>0:
- brand = {"brand":line}
- list_brand.append(brand)
- # if len(list_brand)>100:
- # break
- task_queue = Queue()
- for _d in list_brand:
- task_queue.put(_d)
- pm = Product_Manager()
- def _handle(item,result_queue):
- brand = item.get("brand")
- new_brand = clean_product_brand(brand)
- _f = pm.check_new_brand(brand)
- item["f"] = _f
- item["new_brand"] = new_brand
- mt = MultiThreadHandler(task_queue,_handle,None,30,1)
- mt.run()
- list_legal_brand = []
- list_illegal_brand = []
- for _d in list_brand:
- f = _d.get("f")
- log("brand %s flag %s"%(brand,str(f)))
- if f:
- brand = _d.get("new_brand")
- list_legal_brand.append(brand)
- else:
- brand = _d.get("brand")
- list_illegal_brand.append(brand)
- with open("legal_brand.txt","w",encoding="utf8") as f:
- for b in list_legal_brand:
- f.write(b+"\n")
- with open("illegal_brand.txt","w",encoding="utf8") as f:
- for b in list_illegal_brand:
- f.write(b+"\n")
- def test_match():
- a = "Mini-7"
- vector = request_embedding(a)
- pm = Product_Manager()
- Coll,_ = pm.get_collection(NAME_GRADE)
- output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]
- search_list = search_embedding(Coll,embedding_index_name,[vector],pm.search_params,output_fields,limit=60)
- print(search_list)
- def test():
- # pm = Product_Manager()
- # pm.test()
- fix_product_data()
- # test_check_brand()
- # test_match()
- if __name__ == '__main__':
- # start_process_product()
- # print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))
- test()
- print(Product_Manager.get_bid_filemd5s(155415770,getConnect_ots()))
- name = "一"
- ots_name = "一氧化碳分析仪"
- print(is_similar(name,ots_name),check_product(name,ots_name))
- print(is_legal_specs('SCM-A/SB(0.18D)'))
|