1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549 |
- from BaseDataMaintenance.common.documentFingerprint import getMD5
- from BaseDataMaintenance.common.Utils import *
- from BaseDataMaintenance.common.milvusUtil import *
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.maintenance.product.productUtils import *
- from BaseDataMaintenance.model.ots.document_product_tmp import *
- from BaseDataMaintenance.model.ots.document_product import *
- from BaseDataMaintenance.model.ots.document_product_dict import *
- from BaseDataMaintenance.model.ots.document_product_dict_interface import *
- from BaseDataMaintenance.model.ots.document import *
- from BaseDataMaintenance.model.ots.attachment import *
- from BaseDataMaintenance.model.ots.enterprise import *
- from BaseDataMaintenance.model.ots.project import *
- from tablestore import *
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from multiprocessing import Process,Queue
- from random import randint
- from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.maintenance.product.make_brand_pattern import *
- from BaseDataMaintenance.maintenance.product.product_dict import *
- import logging
- root = logging.getLogger()
- root.setLevel(logging.INFO)
- from uuid import uuid4
- from multiprocessing import Queue as PQueue
- class Product_Manager(Product_Dict_Manager):
- def __init__(self):
- super(Product_Manager, self).__init__()
- self.process_queue = PQueue()
- self.ots_client = getConnect_ots()
- self.set_id = set()
- def get_product_id(self,docid,name,brand,specs,unit_price,quantity):
- if name is None:
- name = ""
- if brand is None:
- brand = ""
- if specs is None:
- specs = ""
- if quantity is None:
- quantity = ""
- if unit_price is None or unit_price=="":
- unit_price = ""
- else:
- unit_price = "%.2f"%float(unit_price)
- product_id = getMD5(str(docid)+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity))
- return product_id
- def producer(self,process_count=3000):
- q_size = self.process_queue.qsize()
- if q_size>process_count/6:
- return
- bool_query = BoolQuery(must_queries=[RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,1,51)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- _count = len(list_data)
- log("producer %d/%d"%(q_size,total_count))
- list_id = []
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
- if _id in self.set_id:
- continue
- list_id.append(_id)
- self.process_queue.put(_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
- if _id in self.set_id:
- continue
- list_id.append(_id)
- self.process_queue.put(_d)
- _count += len(list_data)
- if _count>=process_count:
- break
- self.set_id = set(list_id)
- def comsumer(self):
- def start_thread(thread_count):
- mt = MultiThreadHandler(self.process_queue,self.comsumer_handle,None,thread_count,1,False,True)
- mt.run()
- process_count = 6
- thread_count = 6
- list_process = []
- for _i in range(process_count):
- p = Process(target=start_thread,args=(thread_count,))
- list_process.append(p)
- for p in list_process:
- p.start()
- for p in list_process:
- p.join()
- def comsumer_handle(self,item,result_queue):
- try:
- self.standardize(item)
- except Exception as e:
- traceback.print_exc()
- def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id","remove_words","level"]):
- '''
- Standardizes the product data
- 通过匹配标准参数表进行标准化,匹配是非精确匹配,校验规则是?
- :return:
- only save the standard product
- one temp data is regard as standard product onli if match the name,contition on this,
- if the brand is matched: if can be standard then change else add new brand ;if not matched replace as ""
- and the same as specs
- auto add the connection of name-brand and brand-specs because the 3 degree tree structure
- '''
- # todo:1. 产品参数表自动添加新的数据? 1. add new contections between existing names.2. add new specs
- # 型号在进行匹配时要求差异字符串不能包含数字和字母和罗马数字,且不能忽略出现次数差异
- save_product_tmp = Document_product_tmp({DOCUMENT_PRODUCT_TMP_ID:tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID)})
- _status = 0
- document_product_tmp = Document_product_tmp(tmp_dict)
- tenderee = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_TENDEREE,"")
- name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,"")
- brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,"")
- specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,"")
- parameters = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_PARAMETER,"")
- name = name.replace(tenderee,"")
- brand = brand.replace(tenderee,"")
- original_name = name
- original_brand = brand
- original_specs = specs
- list_candidates = [a for a in [name,brand,specs,parameters] if a!=""]
- list_candidate_brand_specs = [a for a in [brand,specs,parameters,name] if a!=""]
- if brand=="" and parameters!="":
- brand = parameters
- if specs=="" and parameters!="":
- specs = parameters
- new_name = ""
- new_brand = ""
- new_specs = ""
- name_ots_id = None
- brand_ots_id = None
- specs_ots_id = None
- if name is not None and name!="":
- Coll,_ = self.get_collection(NAME_GRADE)
- search_list = get_intellect_search(Coll,embedding_index_name,name,NAME_GRADE,self.search_params,output_fields,limit=10)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- if check_product(name,ots_name,remove_words):
- name_ots_id = get_document_product_dict_id(ots_parent_id,standard_name)
- original_name = name
- new_name = standard_name
- log("checking name %s succeed %s %s"%(name,ots_name,str(remove_words)))
- # #update alias of name
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag and _dpd.updateAlias(name):
- # _dpd.update_row(self.ots_client)
- break
- if name_ots_id is None:
- for name in list_candidates:
- Coll,_ = self.get_collection(NAME_GRADE)
- search_list = get_intellect_search(Coll,embedding_index_name,name,NAME_GRADE,self.search_params,output_fields,limit=10)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- if check_product(name,ots_name,remove_words):
- log("checking name %s succeed %s %s"%(name,ots_name,str(remove_words)))
- name_ots_id = get_document_product_dict_id(ots_parent_id,standard_name)
- original_name = name
- new_name = standard_name
- # #update alias of name
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag and _dpd.updateAlias(name):
- # _dpd.update_row(self.ots_client)
- break
- if name_ots_id is not None:
- if brand is not None and brand!="":
- s_brand = brand
- l_brand = [brand]
- Coll,_ = self.get_collection(BRAND_GRADE)
- _find = False
- for brand in l_brand:
- if len(brand)>100:
- continue
- search_list = get_intellect_search(Coll,embedding_index_name,brand,BRAND_GRADE,self.search_params,output_fields,limit=10)
- # log("search brand %s"%(brand))
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- # log("check brand %s and %s"%(brand,ots_name))
- if check_brand(brand,ots_name,remove_words):
- # log("check brand similar succeed:%s and %s"%(brand,ots_name))
- if ots_name==new_name:
- continue
- original_brand = brand
- if original_brand==original_name:
- if len(new_name)+len(ots_name)>len(original_name):
- continue
- if original_brand.find(ots_name)>=1:
- continue
- if len(original_brand)<=3:
- continue
- new_brand = standard_name
- log("checking brand %s succeed %s"%(brand,new_brand))
- # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
- if name_ots_id is not None:
- brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
- _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_brand).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_brand = Document_product_dict(_d_brand)
- # _dpd_brand.updateAlias(str(new_brand).lower())
- if not _dpd_brand.exists_row(self.ots_client):
- _dpd_brand.update_row(self.ots_client)
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(brand):
- # _dpd.update_row(self.ots_client)
- _find = True
- break
- else:
- # log("check brand similar failed:%s and %s"%(brand,ots_name))
- # add new brand?
- pass
- if _find:
- break
- if not _find:
- for brand in l_brand:
- if len(brand)>100:
- continue
- c_brand = clean_product_brand(brand)
- if self.check_new_brand(c_brand):
- if c_brand=="":
- continue
- original_brand = brand
- if original_brand==original_name:
- if len(new_name)+len(c_brand)>len(original_name):
- continue
- if new_name==original_brand:
- continue
- if original_brand.find(c_brand)>=1:
- continue
- if len(original_brand)<=3:
- continue
- new_brand = c_brand
- log("adding new brand %s"%(str(new_brand)))
- _d_brand = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(str(new_brand).lower()),
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert",
- DOCUMENT_PRODUCT_ORIGINAL_BRAND:brand
- }
- dpdi = Document_product_dict_interface(_d_brand)
- dpdi.update_row(self.ots_client)
- break
- if brand_ots_id is None:
- _find = False
- Coll,_ = self.get_collection(BRAND_GRADE)
- for brand in list_candidates:
- if _find:
- break
- l_brand = [brand]
- for brand in l_brand:
- if len(brand)>100:
- continue
- if _find:
- break
- search_list = get_intellect_search(Coll,embedding_index_name,brand,BRAND_GRADE,self.search_params,output_fields,limit=10)
- # log("search brand %s"%(brand))
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- # log("check brand %s and %s"%(brand,ots_name))
- if check_brand(brand,ots_name,remove_words):
- # log("check brand similar succeed:%s and %s"%(brand,ots_name))
- if ots_name==new_name:
- continue
- original_brand = brand
- if original_brand==original_name:
- if len(new_name)+len(ots_name)>len(original_name):
- continue
- if original_brand.find(ots_name)>=1:
- continue
- if len(original_brand)<=3:
- continue
- new_brand = standard_name
- log("checking brand %s succeed %s"%(brand,new_brand))
- # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
- if name_ots_id is not None:
- brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
- _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_brand).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_brand = Document_product_dict(_d_brand)
- # _dpd_brand.updateAlias(str(new_brand).lower())
- if not _dpd_brand.exists_row(self.ots_client):
- _dpd_brand.update_row(self.ots_client)
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(brand):
- # _dpd.update_row(self.ots_client)
- _find = True
- break
- if specs is not None and specs!="":
- debug("getting sepcs %s"%(specs))
- list_specs = []
- c_specs = clean_product_specs(specs)
- list_specs.append(c_specs)
- for s in re.split("[\u4e00-\u9fff]",specs):
- if s!="" and len(s)>4:
- list_specs.append(s)
- _index = 0
- break_flag = False
- list_similar_specs = []
- for c_specs in list_specs:
- if break_flag:
- break
- _index += 1
- specs_vector = get_embedding_request(c_specs)
- if specs_vector is not None:
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=20)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- debug("checking specs %s and %s"%(specs,ots_name))
- if is_similar(specs,ots_name):
- # log("specs is_similar")
- if check_specs(c_specs,ots_name):
- break_flag = True
- original_specs = c_specs
- if standard_name==new_name or standard_name==new_brand:
- continue
- new_specs = standard_name
- log("check_specs %s succeed %s"%(specs,new_specs))
- # to update the document_product_dict which is builded for search
- if brand_ots_id is not None:
- # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_specs).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_specs = Document_product_dict(_d_specs)
- # _dpd_specs.updateAlias(str(new_specs).lower())
- if not _dpd_specs.exists_row(self.ots_client):
- _dpd_specs.update_row(self.ots_client)
- # user interface to add
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(specs):
- # _dpd.update_row(self.ots_client)
- break_flag = True
- break
- else:
- list_similar_specs.append(specs)
- # add new specs?
- if new_specs is not None and new_specs!="":
- pass
- else:
- debug("specs not similar")
- for specs in list_similar_specs:
- if is_legal_specs(specs) and len(specs)<MAX_NAME_LENGTH and len(specs)>=5:
- debug("is_legal_specs")
- original_specs = specs
- new_specs = clean_product_specs(specs)
- if new_specs==new_name or new_specs==new_brand:
- new_specs = ""
- continue
- # insert into document_product_dict a new record
- # to update the document_product_dict which is builded for search
- # add new specs
- if brand_ots_id is not None and name_ots_id is not None:
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
- # DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- # DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
- # DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- # DOCUMENT_PRODUCT_DICT_STATUS:1,
- # DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- # DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- # DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- # }
- # _dpd = Document_product_dict(_d)
- # _dpd.update_row(self.ots_client)
- log("adding new specs %s"%(new_specs))
- # user interface to add
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
- }
- _dpdi = Document_product_dict_interface(_d)
- _dpdi.update_row(self.ots_client)
- break
- if specs_ots_id is None:
- _find = False
- for specs in list_candidate_brand_specs:
- if _find:
- break
- debug("getting sepcs %s"%(specs))
- list_specs = []
- c_specs = clean_product_specs(specs)
- list_specs.append(c_specs)
- for s in re.split("[\u4e00-\u9fff]",specs):
- if s!="" and len(s)>4:
- list_specs.append(s)
- similar_flag = None
- _index = 0
- for c_specs in list_specs:
- if _find:
- break
- _index += 1
- specs_vector = get_embedding_request(c_specs)
- if specs_vector is not None:
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=10)
- for _search in search_list:
- if _find:
- break
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- debug("checking specs %s and %s"%(specs,ots_name))
- if is_similar(c_specs,ots_name):
- # log("specs is_similar")
- if check_specs(c_specs,ots_name):
- break_flag = True
- original_specs = c_specs
- new_specs = standard_name
- if new_specs==new_name or new_specs==new_brand:
- new_specs = ""
- continue
- if brand_ots_id is not None:
- # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_specs).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_specs = Document_product_dict(_d_specs)
- # _dpd_specs.updateAlias(str(new_specs).lower())
- if not _dpd_specs.exists_row(self.ots_client):
- _dpd_specs.update_row(self.ots_client)
- _find = True
- break
- # judge if the product matches the standard product
- if name_ots_id is not None:
- is_legal_data = True
- #standard the product and same to document_product table
- _product = Document_product(tmp_dict)
- docid = _product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
- unit_price = _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)
- quantity = _product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY)
- unit_price = clean_product_unit_price(unit_price)
- quantity = clean_product_quantity(quantity)
- total_price = _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- win_bid_price = _product.getProperties().get(DOCUMENT_PRODUCT_WIN_BID_PRICE)
- if isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
- if unit_price>0:
- new_quantity = total_price/unit_price
- if new_quantity!=quantity:
- # if new_quantity==total_price//unit_price:
- # quantity = int(new_quantity)
- # _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- # else:
- # is_legal_data = False
- is_legal_data = False
- elif quantity>0:
- unit_price = total_price/quantity
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- elif isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
- total_price = float("%.2f"%(unit_price*quantity))
- _product.setValue(DOCUMENT_PRODUCT_TOTAL_PRICE,total_price,True)
- elif isinstance(unit_price,(float,int)) and isinstance(total_price,(float,int)):
- if unit_price>0:
- quantity = int(total_price//unit_price)
- _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- elif isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
- if quantity>0:
- unit_price = float("%.2f"%(total_price/quantity))
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- elif isinstance(quantity,(float,int)) and quantity>10000:
- is_legal_data = False
- if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE),(float,int)) and isinstance(win_bid_price,(float,int)):
- if _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)>win_bid_price*10 and win_bid_price>0:
- is_legal_data = False
- if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE),(float,int)) and _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)>100000000:
- is_legal_data = False
- new_id = self.get_product_id(docid,new_name,new_brand,new_specs,unit_price,quantity)
- _product.setValue(DOCUMENT_PRODUCT_ID,new_id,True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_ID,tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID),True)
- if name_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_NAME_ID,name_ots_id,True)
- if brand_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_BRAND_ID,brand_ots_id,True)
- if specs_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_SPECS_ID,specs_ots_id,True)
- _product.setValue(DOCUMENT_PRODUCT_NAME,new_name,True)
- _product.setValue(DOCUMENT_PRODUCT_BRAND,new_brand,True)
- _product.setValue(DOCUMENT_PRODUCT_SPECS,new_specs,True)
- _product.setValue(DOCUMENT_PRODUCT_STATUS,randint(201,300),True)
- _product.setValue(DOCUMENT_PRODUCT_BRANDSPECS,"%s&&%s"%(new_brand,new_specs),True)
- _product.setValue(DOCUMENT_PRODUCT_FULL_NAME,"%s&&%s&&%s"%(new_name,new_brand,new_specs),True)
- _product.setValue(DOCUMENT_PRODUCT_CREATE_TIME,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_NAME,original_name,True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_BRAND,original_brand,True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,original_specs,True)
- list_attachments,bid_filemd5s = self.get_bid_filemd5s(docid,self.ots_client)
- if len(list_attachments)>0:
- _product.setValue(DOCUMENT_PRODUCT_ATTACHMENTS,json.dumps(list_attachments,ensure_ascii=False),True)
- _product.setValue(DOCUMENT_PRODUCT_HAS_ATTACHMENTS,1,True)
- if bid_filemd5s!="":
- _product.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
- _product.setValue(DOCUMENT_PRODUCT_HAS_BIDFILE,1,True)
- _product.setValue(DOCUMENT_PRODUCT_PARAMETER_STATUS,0,True)
- if not is_legal_data:
- _status = randint(501,550)
- else:
- _flag,dump_id = self.dumplicate(_product)
- if _flag:
- _status = randint(201,300)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_NEW_ID,new_id,True)
- _product.update_row(self.ots_client)
- else:
- _status = randint(451,500)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_DUMP_ID,str(dump_id),True)
- else:
- _status = randint(401,450)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_STATUS,_status,True)
- save_product_tmp.update_row(self.ots_client)
- def check_new_brand(self,brand):
- return is_legal_brand(self.ots_client,brand)
- @staticmethod
- def get_bid_filemd5s(docid,ots_client):
- bool_query = BoolQuery(must_queries=[
- TermQuery("docids",docid)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,limit=10),
- columns_to_get=ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- list_bid_filemd5s = []
- list_attachments = []
- set_docids = set([docid])
- set_md5s = set()
- for _d in list_data:
- try:
- docids = _d.get("docids","")
- for _id in docids.split(","):
- set_docids.add(int(_id))
- except Exception as e:
- pass
- list_docids = list(set_docids)
- for _docid in list_docids:
- _d = {document_partitionkey:_docid%500+1,
- document_docid:_docid}
- _doc = Document(_d)
- _doc.fix_columns(ots_client,[document_attachment_path],True)
- page_attachments = _doc.getProperties().get(document_attachment_path)
- if page_attachments is not None and page_attachments!="":
- attachments = json.loads(page_attachments)
- for _a in attachments:
- _filemd5 = _a.get(document_attachment_path_filemd5)
- if _filemd5 in set_md5s or _filemd5 is None:
- continue
- set_md5s.add(_filemd5)
- _da = {attachment_filemd5:_filemd5}
- _attach = attachment(_da)
- if _attach.fix_columns(ots_client,[attachment_classification,attachment_filetype],True):
- _da[attachment_classification] = _attach.getProperties().get(attachment_classification)
- _da[attachment_filetype] = _attach.getProperties().get(attachment_filetype)
- list_attachments.append(_da)
- if _attach.getProperties().get(attachment_classification,"")=="招标文件":
- list_bid_filemd5s.append(_filemd5)
- return list_attachments,",".join(list(set(list_bid_filemd5s)))
- def get_value_count(self,name,brand,specs,unit_price,quantity):
- value_count = 0
- if name is not None and len(name)>0:
- value_count += 1
- if brand is not None and len(brand)>0:
- value_count += 1
- if specs is not None and len(specs)>0:
- value_count += 1
- if isinstance(unit_price,(float,int)) and unit_price>0:
- value_count += 1
- if isinstance(quantity,(float,int)) and quantity>0:
- value_count += 1
- return value_count
- def dumplicate_search_product(self,document_product):
- docid = document_product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
- name = str(document_product.getProperties().get(DOCUMENT_PRODUCT_NAME,""))
- brand = str(document_product.getProperties().get(DOCUMENT_PRODUCT_BRAND,""))
- specs = str(document_product.getProperties().get(DOCUMENT_PRODUCT_SPECS,""))
- unit_price = document_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
- quantity = document_product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY,"")
- page_time = document_product.getProperties().get(DOCUMENT_PRODUCT_PAGE_TIME)
- tenderee = str(document_product.getProperties().get(DOCUMENT_PRODUCT_TENDEREE,""))
- supplier = str(document_product.getProperties().get(DOCUMENT_PRODUCT_SUPPLIER,""))
- base_value_count = self.get_value_count(name,brand,specs,unit_price,quantity)
- list_dump_id = []
- page_time_before = page_time
- page_time_after = page_time
- try:
- page_time_before = timeAdd(page_time,-30,format="%Y-%m-%d",)
- page_time_after = timeAdd(page_time,30)
- except Exception as e:
- pass
- to_save = 1
- if len(name)>0 and len(brand)>0 and len(specs)>0 and isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- RangeQuery("page_time",page_time_before,page_time_after,True,True),
- TermQuery("brand",brand),
- TermQuery("specs",specs),
- TermQuery("unit_price",unit_price),
- TermQuery("quantity",quantity)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,limit=1),
- columns_to_get=ColumnsToGet(["name",'brand','specs'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- if len(list_data)>0:
- return list_data[0].get(DOCUMENT_PRODUCT_ID),0
- bool_query = BoolQuery(must_queries=[
- TermQuery(project_docids,str(docid)),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
- SearchQuery(bool_query,limit=10),
- ColumnsToGet([project_docids],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- set_docid = set()
- for _data in list_data:
- _docids = _data.get(project_docids,"")
- for d_id in _docids.split(","):
- d_id = d_id.strip()
- if d_id!="":
- set_docid.add(int(d_id))
- if docid in set_docid:
- set_docid.remove(docid)
- should_q = [TermQuery(DOCUMENT_PRODUCT_DOCID,did) for did in set_docid]
- if len(should_q)>0:
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- BoolQuery(should_queries=should_q),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,limit=50),
- columns_to_get=ColumnsToGet(["docid",'name','brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- dict_docid_name = {}
- match_ids = []
- for _data in list_data:
- docid1 = _data.get(DOCUMENT_PRODUCT_DOCID)
- name1 = _data.get(DOCUMENT_PRODUCT_NAME)
- brand1 = _data.get(DOCUMENT_PRODUCT_BRAND)
- specs1 = _data.get(DOCUMENT_PRODUCT_SPECS)
- unit_price1 = _data.get(DOCUMENT_PRODUCT_UNIT_PRICE)
- quantity1 = _data.get(DOCUMENT_PRODUCT_QUANTITY)
- id = _data.get(DOCUMENT_PRODUCT_ID)
- value_count1 = self.get_value_count(name1,brand1,specs1,unit_price1,quantity1)
- if name1==name:
- match_ids.append({DOCUMENT_PRODUCT_ID:id,"value_count":value_count1})
- if docid1 not in dict_docid_name:
- dict_docid_name[docid1] = []
- dict_docid_name[docid1].append(name)
- is_all_one = True
- for k,v in dict_docid_name.items():
- if len(v)!=1:
- is_all_one = False
- if is_all_one:
- match_ids.sort(key=lambda x:x.get("value_count",0),reverse=True)
- if len(match_ids)>0:
- _id = match_ids[0].get(DOCUMENT_PRODUCT_ID)
- value_count1 = match_ids[0]["value_count"]
- if base_value_count<value_count1:
- to_save = 0
- for _match in match_ids:
- list_dump_id.append(_match.get(DOCUMENT_PRODUCT_ID))
- if len(name)>0 and len(brand)>0 and len(supplier)>0 and len(tenderee)>0:
- # log("docid %s name %s page_time_before %s page_time_after %s brand %s supplier %s tenderee %s"%(str(docid),name,page_time_before,page_time_after,brand,supplier,tenderee))
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- RangeQuery("page_time",page_time_before,page_time_after,True,True),
- TermQuery(DOCUMENT_PRODUCT_BRAND,brand),
- TermQuery(DOCUMENT_PRODUCT_TENDEREE,tenderee),
- TermQuery(DOCUMENT_PRODUCT_SUPPLIER,supplier),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,limit=50),
- columns_to_get=ColumnsToGet(['name','brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _d in list_data:
- s_id = _d.get(DOCUMENT_PRODUCT_ID)
- s_name = _d.get(DOCUMENT_PRODUCT_NAME,"")
- s_brand = _d.get(DOCUMENT_PRODUCT_BRAND,"")
- s_specs = _d.get(DOCUMENT_PRODUCT_SPECS,"")
- s_unit_price = _d.get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
- s_quantity = _d.get(DOCUMENT_PRODUCT_QUANTITY,"")
- check_flag = True
- value_count1 = self.get_value_count(s_name,s_brand,s_specs,s_unit_price,s_quantity)
- if len(specs)>0 and len(s_specs)>0 and specs!=s_specs:
- check_flag = False
- elif isinstance(unit_price,(float,int)) and isinstance(s_unit_price,(float,int)) and unit_price!=s_unit_price:
- check_flag = False
- elif isinstance(quantity,(float,int)) and isinstance(s_quantity,(float,int)) and quantity!=s_quantity:
- check_flag = False
- if check_flag:
- if base_value_count<value_count1:
- to_save = 0
- list_dump_id.append(s_id)
- return list_dump_id,to_save
- def dumplicate(self,document_product):
- '''
- Duplicates the product data
- 将同一个产品的采购结果公示进行去重,结合公告进行。
- :return:True if not repeated else False
- '''
- dump_id,to_save = self.dumplicate_search_product(document_product)
- if dump_id is not None:
- document_product.setValue(DOCUMENT_PRODUCT_DUMP_ID,str(dump_id),True)
- if to_save==1:
- if dump_id is not None:
- if isinstance(dump_id,str):
- _d = {DOCUMENT_PRODUCT_ID:dump_id,
- DOCUMENT_PRODUCT_STATUS:randint(401,450),
- DOCUMENT_PRODUCT_DUMP_ID:document_product.getProperties().get(DOCUMENT_PRODUCT_ID)}
- _dp = Document_product(_d)
- _dp.update_row(self.ots_client)
- elif isinstance(dump_id,list):
- for d_id in dump_id:
- _d = {DOCUMENT_PRODUCT_ID:d_id,
- DOCUMENT_PRODUCT_STATUS:randint(401,450),
- DOCUMENT_PRODUCT_DUMP_ID:document_product.getProperties().get(DOCUMENT_PRODUCT_ID)}
- _dp = Document_product(_d)
- _dp.update_row(self.ots_client)
- return True,dump_id
- else:
- return False,dump_id
- def start_processing(self):
- scheduler = BlockingScheduler()
- scheduler.add_job(self.producer,"cron",second="*/20")
- scheduler.add_job(self.comsumer,"cron",minute="*/1")
- scheduler.add_job(self.embedding_comsumer,"cron",minute="*/1")
- scheduler.add_job(self.embedding_interface_comsumer,"cron",second="*/20")
- scheduler.start()
- def test(self):
- from BaseDataMaintenance.common.sentencesUtil import cosine_similarity
- import torch
- output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]
- id = '56bdad168c71a1fc4d57cd10bcd987f0'
- collection,_ = self.get_collection(SPECS_GRADE)
- vector = request_embedding("西门子MAGNETOMLumina")
- vector1 = request_embedding("西门子")
- print("cosine similarity",cosine_similarity(torch.from_numpy(np.array([vector])) ,torch.from_numpy(np.array([vector1]))))
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = search_embedding(Coll,embedding_index_name,[vector],self.search_params,output_fields,limit=60)
- for p in search_list:
- print(p)
- #
- # res = collection.query(
- # expr = "ots_id in ['%s']"%(id),
- # offset = 0,
- # limit = 10,
- # output_fields = output_fields,
- # consistency_level="Strong"
- # )
- # print(res)
- def start_process_product():
- pm = Product_Manager()
- pm.start_processing()
- def fix_product_data():
- '''
- # delete document_product and change the record status to 1 in document_product_temp which id=original id
- :return:
- '''
- table_name = "document_product_temp"
- table_index = "document_product_temp_index"
- columns = [DOCUMENT_PRODUCT_TMP_NEW_ID,DOCUMENT_PRODUCT_TMP_STATUS]
- # table_name = Document_product_table_name
- # table_index = Document_product_table_name+"_index"
- # columns = [DOCUMENT_PRODUCT_ORIGINAL_ID]
- ots_client = getConnect_ots()
- bool_query = BoolQuery(should_queries=[
- # RangeQuery("status",501),
- # TermQuery("docid",246032980)
- RangeQuery("status",201,501),
- # RangeQuery("status",401,451)
- # WildcardQuery(DOCUMENT_PRODUCT_ORIGINAL_SPECS,"MFUSOne")
- # TermQuery(DOCUMENT_PRODUCT_SPECS,"MFUSOne")
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_rows = getRow_ots(rows)
- print(total_count)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_rows.extend(getRow_ots(rows))
- print("%d/%d"%(len(list_rows),total_count))
- # if len(list_rows)>10000:
- # break
- task_queue = Queue()
- for d in list_rows:
- task_queue.put(d)
- def fix_missing_data(item,result_queue):
- original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- print("original_id",original_id)
- _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
- dpt = Document_product_tmp(_d)
- dpt.fix_columns(ots_client,["name","brand","specs"],True)
- _d = {DOCUMENT_PRODUCT_ID:item.get(DOCUMENT_PRODUCT_ID)}
- dp = Document_product(_d)
- #fix the project_code and original_name and bidi_filemd5s
- docid = int(item.get(DOCUMENT_PRODUCT_DOCID))
- partitionkey = docid%500+1
- # project_name = item.get(DOCUMENT_PRODUCT_PROJECT_NAME,"")
- # if project_name=="":
- # #fix project_name
- # _doc = Document({"partitionkey":partitionkey,
- # "docid":docid})
- # _doc.fix_columns(ots_client,["doctitle"],True)
- # dp.setValue(DOCUMENT_PRODUCT_DOCTITLE,_doc.getProperties().get("doctitle"),True)
- list_attachments,bid_filemd5s = Product_Manager.get_bid_filemd5s(docid,ots_client)
- if bid_filemd5s!="":
- dp.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_NAME,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,""),True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_BRAND,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,""),True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,""),True)
- dp.update_row(ots_client)
- def deleteAndReprocess(item,result_queue):
- original_id = item.get(DOCUMENT_PRODUCT_TMP_ID)
- new_id = item.get(DOCUMENT_PRODUCT_TMP_NEW_ID)
- # original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- # new_id = item.get(DOCUMENT_PRODUCT_ID)
- print("original_id",original_id,"id",item.get(DOCUMENT_PRODUCT_ID))
- # delete data and rerun
- _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
- dpt = Document_product_tmp(_d)
- dpt.update_row(ots_client)
- if new_id is not None and new_id!="":
- _d = {DOCUMENT_PRODUCT_ID:new_id}
- dp = Document_product(_d)
- dp.delete_row(ots_client)
- def handle(item,result_queue):
- win_bid_price = item.get(DOCUMENT_PRODUCT_TMP_WIN_BID_PRICE,1)
- if win_bid_price==0:
- dpt = Document_product_tmp(item)
- dpt.setValue(DOCUMENT_PRODUCT_TMP_STATUS,1,True)
- dpt.update_row(ots_client)
- mt = MultiThreadHandler(task_queue,deleteAndReprocess,None,30,1)
- mt.run()
- def test_check_brand():
- import logging
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- from queue import Queue
- brand_path = "brand.txt"
- list_brand = []
- with open(brand_path,"r",encoding="utf8") as f:
- while 1:
- line = f.readline()
- if not line:
- break
- line = line.strip()
- if len(line)>0:
- brand = {"brand":line}
- list_brand.append(brand)
- # if len(list_brand)>100:
- # break
- task_queue = Queue()
- for _d in list_brand:
- task_queue.put(_d)
- pm = Product_Manager()
- def _handle(item,result_queue):
- brand = item.get("brand")
- new_brand = clean_product_brand(brand)
- _f = pm.check_new_brand(brand)
- item["f"] = _f
- item["new_brand"] = new_brand
- mt = MultiThreadHandler(task_queue,_handle,None,30,1)
- mt.run()
- list_legal_brand = []
- list_illegal_brand = []
- for _d in list_brand:
- f = _d.get("f")
- log("brand %s flag %s"%(brand,str(f)))
- if f:
- brand = _d.get("new_brand")
- list_legal_brand.append(brand)
- else:
- brand = _d.get("brand")
- list_illegal_brand.append(brand)
- with open("../../test/legal_brand.txt", "w", encoding="utf8") as f:
- for b in list_legal_brand:
- f.write(b+"\n")
- with open("../../test/illegal_brand.txt", "w", encoding="utf8") as f:
- for b in list_illegal_brand:
- f.write(b+"\n")
- def test_match():
- a = "迈瑞晟"
- # vector = request_embedding(get_milvus_standard_name(a))
- # vector = [get_embedding_request(b) for b in a]
- pm = Product_Manager()
- _GRADE = BRAND_GRADE
- Coll,_ = pm.get_collection(_GRADE)
- print(Coll.name)
- output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id","remove_words","level"]
- # start_time = time.time()
- _id = get_milvus_product_dict_id(a)
- print(Coll.query(expr=" ots_id in ['%s'] "%(_id),output_fields=output_fields))
- # print("cost",time.time()-start_time)
- # print(Coll.compact())
- # result = search_embedding(Coll,embedding_index_name,[vector],pm.search_params,output_fields,limit=20)
- #
- # final_list = []
- # for _search in result:
- # _d = {}
- # for k in output_fields:
- # _d[k] = _search.entity.get(k)
- # final_list.append(_d)
- # final_list = remove_repeat_item(final_list,k="ots_name")
- start_time = time.time()
- # final_list = get_embedding_search(Coll,embedding_index_name,a,_GRADE,vector,pm.search_params,output_fields,limit=5)
- final_list = get_intellect_search(Coll,embedding_index_name,a,_GRADE,pm.search_params,output_fields,limit=10)
- for _search in final_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- if check_brand(a,ots_name,remove_words):
- print("similar",a,ots_name)
- else:
- print("not similar",a,ots_name)
- print("cost",time.time()-start_time)
- print(final_list)
- def rebuild_milvus():
- pdm = Product_Dict_Manager()
- from multiprocessing import Queue as PQueue
- bool_query = BoolQuery(must_queries=[
- RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3)
- ])
- ots_client = getConnect_ots()
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("name")]),limit=100,get_total_count=True),
- ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- print("%d/%d"%(len(list_data),total_count))
- # if len(list_data)>1000:
- # break
- set_name_grade = set()
- task_queue = PQueue()
- for _data in list_data:
- name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
- grade = _data.get(DOCUMENT_PRODUCT_DICT_GRADE)
- _key = "%s--%d"%(name,grade)
- if _key not in set_name_grade:
- task_queue.put(_data)
- set_name_grade.add(_key)
- log("rebuild milvus %d counts"%(task_queue.qsize()))
- def insert_into_milvus(item,result_queue):
- name = item.get(DOCUMENT_PRODUCT_DICT_NAME,"")
- grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
- if grade==SPECS_GRADE:
- name = clean_product_specs(name)
- if len(name)<2:
- return
- if len(name)<2:
- return
- parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID,"")
- Coll,_ = pdm.get_collection(grade)
- standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
- log("insert name %s grade %d"%(name,grade))
- remove_words = item.get(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,"")
- level = item.get(DOCUMENT_PRODUCT_DICT_LEVEL)
- if level is None:
- if re.search("装置|设备",name) is not None:
- level = 2
- else:
- level = 1
- insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level)
- def start_thread():
- mt = MultiThreadHandler(task_queue,insert_into_milvus,None,5)
- mt.run()
- p_count = 5
- list_p = []
- for i in range(p_count):
- p = Process(target=start_thread)
- list_p.append(p)
- for p in list_p:
- p.start()
- for p in list_p:
- p.join()
- def move_document_product():
- bool_query = BoolQuery(must_queries=[
- ExistsQuery(DOCUMENT_PRODUCT_NAME)
- ])
- ots_client = getConnect_ots()
- Document_product_table_name = "document_product"
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("name")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data.extend(getRow_ots(rows))
- print("%d/%d"%(len(list_data),total_count))
- # if len(list_data)>=1000:
- # break
- task_queue = Queue()
- for _data in list_data:
- task_queue.put(_data)
- def _handle(item,result_queue):
- D1 = Document_product(item)
- D1.update_row(ots_client)
- D1.table_name = Document_product_table_name
- D1.delete_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- current_path = os.path.dirname(__file__)
- def delete_brands():
- filename = os.path.join(current_path,"illegal_brand.txt")
- ots_client = getConnect_ots()
- list_brand = []
- with open(filename,"r",encoding="utf8") as f:
- while 1:
- brand = f.readline()
- if not brand:
- break
- brand = brand.strip()
- list_brand.append(brand)
- pm = Product_Manager()
- Coll,_ = pm.get_collection(BRAND_GRADE)
- print(Coll.name)
- Coll.compact()
- _count = 0
- task_queue = Queue()
- for brand in list_brand:
- _count += 1
- task_queue.put(brand)
- # if _count>=2:
- # break
- def _handle(brand,result_queue):
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,BRAND_GRADE),
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,brand)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- _id = get_milvus_product_dict_id(brand)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dpd = Document_product_dict(_d)
- dpd.delete_row(ots_client)
- # print(Coll.query(expr=" ots_id in ['%s']"%(_id),output_fields=["ots_id","ots_name"]))
- delete_counts = Coll.delete(expr=" ots_id in ['%s']"%(_id)).delete_count
- log("brand %s total_count %d md5:%s delete_counts:%d"%(brand,total_count,_id,delete_counts))
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- def delete_specs():
- filename = os.path.join(current_path,"illegal_specs.txt")
- ots_client = getConnect_ots()
- list_brand = []
- with open(filename,"r",encoding="utf8") as f:
- while 1:
- brand = f.readline()
- if not brand:
- break
- brand = brand.strip()
- list_brand.append(brand)
- pm = Product_Manager()
- Coll,_ = pm.get_collection(SPECS_GRADE)
- print(Coll.name)
- Coll.compact()
- _count = 0
- task_queue = Queue()
- for specs in list_brand:
- task_queue.put(specs)
- _count += 1
- # if _count>=2:
- # break
- def _handle(specs,result_queue):
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,SPECS_GRADE),
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,specs)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- _id = get_milvus_product_dict_id(specs)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dpd = Document_product_dict(_d)
- dpd.delete_row(ots_client)
- # print(Coll.query(expr=" ots_id in ['%s']"%(_id),output_fields=["ots_id","ots_name"]))
- delete_counts = Coll.delete(expr=" ots_id in ['%s']"%(_id)).delete_count
- log("brand %s total_count %d md5:%s delete_counts:%d"%(specs,total_count,_id,delete_counts))
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- Coll.compact()
- def remove_redis_keys():
- db = redis.Redis(connection_pool=pool_product)
- db.flushdb()
- def update_document_product_dict():
- import pandas as pd
- filename = "update_product.csv"
- df = pd.read_csv(filename,encoding="gbk")
- ots_client = getConnect_ots()
- for name,grade,standard_alias,remove_words,level in zip(df["name"],df["grade"],df["standard_alias"],df["remove_words"],df["level"]):
- name = name.strip()
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,name),
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count==1:
- list_data = getRow_ots(rows)
- _data = list_data[0]
- dpd = Document_product_dict(_data)
- level = 1
- if re.search("器械|设备|其他",name) is not None and level==1:
- level = 2
- if str(remove_words)=="nan":
- remove_words = ""
- dpd.setValue(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,standard_alias,True)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,remove_words,True)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_LEVEL,level,True)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED+1,True)
- dpd.update_row(ots_client)
- print(dpd.getProperties())
- def test():
- # pm = Product_Manager()
- # pm.test()
- # fix_product_data()
- # test_check_brand()
- test_match()
- # rebuild_milvus()
- # move_document_product()
- # delete_brands()
- # delete_specs()
- # remove_redis_keys()
- # update_document_product_dict()
- def clean_product_dict_interface():
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- BoolQuery(should_queries=[
- TermQuery("action","insert"),
- TermQuery("action","base")
- ])
- ])
- task_queue = Queue()
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- def _handle(item,result_queue):
- _dpd = Document_product_dict_interface(item)
- _dpd.delete_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- def fix_attachment():
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- RangeQuery("docid",1)
- ])
- task_queue = Queue()
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(["docid"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- def _handle(item,result_queue):
- _product = Document_product(item)
- docid = _product.getProperties().get("docid")
- list_attachments,bid_filemd5s = Product_Manager.get_bid_filemd5s(docid,ots_client)
- if len(list_attachments)>0:
- _product.setValue(DOCUMENT_PRODUCT_ATTACHMENTS,json.dumps(list_attachments,ensure_ascii=False),True)
- _product.setValue(DOCUMENT_PRODUCT_HAS_ATTACHMENTS,1,True)
- if bid_filemd5s!="":
- _product.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
- _product.setValue(DOCUMENT_PRODUCT_HAS_BIDFILE,1,True)
- _product.update_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- if __name__ == '__main__':
- # test()
- fix_attachment()
- # start_process_product()
- # print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))
- # print(Product_Manager.get_bid_filemd5s(155415770,getConnect_ots()))
- # name = "一"
- # ots_name = "一氧化碳分析仪"
- # print(is_similar(name,ots_name),check_product(name,ots_name))
- # print(is_legal_specs('SCM-A/SB(0.18D)'))
- # clean_product_dict_interface()
|