12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499 |
- from BaseDataMaintenance.common.documentFingerprint import getMD5
- from BaseDataMaintenance.common.Utils import *
- from BaseDataMaintenance.common.milvusUtil import *
- from BaseDataMaintenance.common.multiThread import MultiThreadHandler
- from BaseDataMaintenance.maintenance.product.productUtils import *
- from BaseDataMaintenance.model.ots.document_product_tmp import *
- from BaseDataMaintenance.model.ots.document_product import *
- from BaseDataMaintenance.model.ots.document_product_dict import *
- from BaseDataMaintenance.model.ots.document_product_dict_interface import *
- from BaseDataMaintenance.model.ots.document import *
- from BaseDataMaintenance.model.ots.attachment import *
- from BaseDataMaintenance.model.ots.enterprise import *
- from BaseDataMaintenance.model.ots.project import *
- from tablestore import *
- from BaseDataMaintenance.dataSource.source import getConnect_ots
- from multiprocessing import Process,Queue
- from random import randint
- from BaseDataMaintenance.maintenance.product.product_dict import Product_Dict_Manager
- from apscheduler.schedulers.blocking import BlockingScheduler
- from BaseDataMaintenance.maintenance.product.make_brand_pattern import *
- from BaseDataMaintenance.maintenance.product.product_dict import *
- import logging
- root = logging.getLogger()
- root.setLevel(logging.INFO)
- from uuid import uuid4
- from multiprocessing import Queue as PQueue
- class Product_Manager(Product_Dict_Manager):
- def __init__(self):
- super(Product_Manager, self).__init__()
- self.process_queue = PQueue()
- self.ots_client = getConnect_ots()
- self.set_id = set()
- def get_product_id(self,docid,name,brand,specs,unit_price,quantity):
- if name is None:
- name = ""
- if brand is None:
- brand = ""
- if specs is None:
- specs = ""
- if quantity is None:
- quantity = ""
- if unit_price is None or unit_price=="":
- unit_price = ""
- else:
- unit_price = "%.2f"%float(unit_price)
- product_id = getMD5(str(docid)+str(name)+str(brand)+str(specs)+str(unit_price)+str(quantity))
- return product_id
- def producer(self,process_count=3000):
- q_size = self.process_queue.qsize()
- if q_size>process_count/6:
- return
- bool_query = BoolQuery(must_queries=[RangeQuery(DOCUMENT_PRODUCT_TMP_STATUS,1,51)])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- _count = len(list_data)
- log("producer %d/%d"%(q_size,total_count))
- list_id = []
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
- if _id in self.set_id:
- continue
- list_id.append(_id)
- self.process_queue.put(_d)
- while next_token:
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_tmp_table_name,Document_product_tmp_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- for _d in list_data:
- _id = _d.get(DOCUMENT_PRODUCT_TMP_ID)
- if _id in self.set_id:
- continue
- list_id.append(_id)
- self.process_queue.put(_d)
- _count += len(list_data)
- if _count>=process_count:
- break
- self.set_id = set(list_id)
- def comsumer(self):
- def start_thread(thread_count):
- mt = MultiThreadHandler(self.process_queue,self.comsumer_handle,None,thread_count,1,False,True)
- mt.run()
- process_count = 6
- thread_count = 6
- list_process = []
- for _i in range(process_count):
- p = Process(target=start_thread,args=(thread_count,))
- list_process.append(p)
- for p in list_process:
- p.start()
- for p in list_process:
- p.join()
- def comsumer_handle(self,item,result_queue):
- try:
- self.standardize(item)
- except Exception as e:
- traceback.print_exc()
- def standardize(self,tmp_dict,output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id","remove_words","level"]):
- '''
- Standardizes the product data
- 通过匹配标准参数表进行标准化,匹配是非精确匹配,校验规则是?
- :return:
- only save the standard product
- one temp data is regard as standard product onli if match the name,contition on this,
- if the brand is matched: if can be standard then change else add new brand ;if not matched replace as ""
- and the same as specs
- auto add the connection of name-brand and brand-specs because the 3 degree tree structure
- '''
- # todo:1. 产品参数表自动添加新的数据? 1. add new contections between existing names.2. add new specs
- # 型号在进行匹配时要求差异字符串不能包含数字和字母和罗马数字,且不能忽略出现次数差异
- save_product_tmp = Document_product_tmp({DOCUMENT_PRODUCT_TMP_ID:tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID)})
- _status = 0
- document_product_tmp = Document_product_tmp(tmp_dict)
- tenderee = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_TENDEREE,"")
- name = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,"")
- brand = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,"")
- specs = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,"")
- parameters = document_product_tmp.getProperties().get(DOCUMENT_PRODUCT_TMP_PARAMETER,"")
- name = name.replace(tenderee,"")
- brand = brand.replace(tenderee,"")
- original_name = name
- original_brand = brand
- original_specs = specs
- list_candidates = [a for a in [name,brand,specs,parameters] if a!=""]
- list_candidate_brand_specs = [a for a in [brand,specs,parameters,name] if a!=""]
- if brand=="" and parameters!="":
- brand = parameters
- if specs=="" and parameters!="":
- specs = parameters
- new_name = ""
- new_brand = ""
- new_specs = ""
- name_ots_id = None
- brand_ots_id = None
- specs_ots_id = None
- if name is not None and name!="":
- Coll,_ = self.get_collection(NAME_GRADE)
- search_list = get_intellect_search(Coll,embedding_index_name,name,NAME_GRADE,self.search_params,output_fields,limit=10)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- if check_product(name,ots_name,remove_words):
- name_ots_id = get_document_product_dict_id(ots_parent_id,standard_name)
- original_name = name
- new_name = standard_name
- log("checking name %s succeed %s %s"%(name,ots_name,str(remove_words)))
- # #update alias of name
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag and _dpd.updateAlias(name):
- # _dpd.update_row(self.ots_client)
- break
- if name_ots_id is None:
- for name in list_candidates:
- Coll,_ = self.get_collection(NAME_GRADE)
- search_list = get_intellect_search(Coll,embedding_index_name,name,NAME_GRADE,self.search_params,output_fields,limit=10)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- if check_product(name,ots_name,remove_words):
- log("checking name %s succeed %s %s"%(name,ots_name,str(remove_words)))
- name_ots_id = get_document_product_dict_id(ots_parent_id,standard_name)
- original_name = name
- new_name = standard_name
- # #update alias of name
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:name_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag and _dpd.updateAlias(name):
- # _dpd.update_row(self.ots_client)
- break
- if name_ots_id is not None:
- if brand is not None and brand!="":
- s_brand = brand
- l_brand = [brand]
- Coll,_ = self.get_collection(BRAND_GRADE)
- _find = False
- for brand in l_brand:
- if len(brand)>100:
- continue
- search_list = get_intellect_search(Coll,embedding_index_name,brand,BRAND_GRADE,self.search_params,output_fields,limit=10)
- # log("search brand %s"%(brand))
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- # log("check brand %s and %s"%(brand,ots_name))
- if check_brand(brand,ots_name,remove_words):
- # log("check brand similar succeed:%s and %s"%(brand,ots_name))
- if ots_name==new_name:
- continue
- original_brand = brand
- if original_brand==original_name:
- if len(new_name)+len(ots_name)>len(original_name):
- continue
- if original_brand.find(ots_name)>=1:
- continue
- if len(original_brand)<=3:
- continue
- new_brand = standard_name
- log("checking brand %s succeed %s"%(brand,new_brand))
- # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
- if name_ots_id is not None:
- brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
- _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_brand).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_brand = Document_product_dict(_d_brand)
- # _dpd_brand.updateAlias(str(new_brand).lower())
- if not _dpd_brand.exists_row(self.ots_client):
- _dpd_brand.update_row(self.ots_client)
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(brand):
- # _dpd.update_row(self.ots_client)
- _find = True
- break
- else:
- # log("check brand similar failed:%s and %s"%(brand,ots_name))
- # add new brand?
- pass
- if _find:
- break
- if not _find:
- for brand in l_brand:
- if len(brand)>100:
- continue
- c_brand = clean_product_brand(brand)
- if self.check_new_brand(c_brand):
- if c_brand=="":
- continue
- original_brand = brand
- if original_brand==original_name:
- if len(new_name)+len(c_brand)>len(original_name):
- continue
- if new_name==original_brand:
- continue
- if original_brand.find(c_brand)>=1:
- continue
- if len(original_brand)<=3:
- continue
- new_brand = c_brand
- log("adding new brand %s"%(str(new_brand)))
- _d_brand = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(str(new_brand).lower()),
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert",
- DOCUMENT_PRODUCT_ORIGINAL_BRAND:brand
- }
- dpdi = Document_product_dict_interface(_d_brand)
- dpdi.update_row(self.ots_client)
- break
- if brand_ots_id is None:
- _find = False
- Coll,_ = self.get_collection(BRAND_GRADE)
- for brand in list_candidates:
- if _find:
- break
- l_brand = [brand]
- for brand in l_brand:
- if len(brand)>100:
- continue
- if _find:
- break
- search_list = get_intellect_search(Coll,embedding_index_name,brand,BRAND_GRADE,self.search_params,output_fields,limit=10)
- # log("search brand %s"%(brand))
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- # log("check brand %s and %s"%(brand,ots_name))
- if check_brand(brand,ots_name,remove_words):
- # log("check brand similar succeed:%s and %s"%(brand,ots_name))
- if ots_name==new_name:
- continue
- original_brand = brand
- if original_brand==original_name:
- if len(new_name)+len(ots_name)>len(original_name):
- continue
- if original_brand.find(ots_name)>=1:
- continue
- if len(original_brand)<=3:
- continue
- new_brand = standard_name
- log("checking brand %s succeed %s"%(brand,new_brand))
- # judge if the brand which parent_id is name_ots_id exists,if not insert one else update alias
- if name_ots_id is not None:
- brand_ots_id = get_document_product_dict_id(name_ots_id,new_brand)
- _d_brand = {DOCUMENT_PRODUCT_DICT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_brand,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_brand).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:BRAND_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:name_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_brand = Document_product_dict(_d_brand)
- # _dpd_brand.updateAlias(str(new_brand).lower())
- if not _dpd_brand.exists_row(self.ots_client):
- _dpd_brand.update_row(self.ots_client)
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:brand_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(brand):
- # _dpd.update_row(self.ots_client)
- _find = True
- break
- if specs is not None and specs!="":
- debug("getting sepcs %s"%(specs))
- list_specs = []
- c_specs = clean_product_specs(specs)
- list_specs.append(c_specs)
- for s in re.split("[\u4e00-\u9fff]",specs):
- if s!="" and len(s)>4:
- list_specs.append(s)
- _index = 0
- break_flag = False
- list_similar_specs = []
- for c_specs in list_specs:
- if break_flag:
- break
- _index += 1
- specs_vector = get_embedding_request(c_specs)
- if specs_vector is not None:
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=20)
- for _search in search_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- debug("checking specs %s and %s"%(specs,ots_name))
- if is_similar(specs,ots_name):
- # log("specs is_similar")
- if check_specs(c_specs,ots_name):
- break_flag = True
- original_specs = c_specs
- if standard_name==new_name or standard_name==new_brand:
- continue
- new_specs = standard_name
- log("check_specs %s succeed %s"%(specs,new_specs))
- # to update the document_product_dict which is builded for search
- if brand_ots_id is not None:
- # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_specs).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_specs = Document_product_dict(_d_specs)
- # _dpd_specs.updateAlias(str(new_specs).lower())
- if not _dpd_specs.exists_row(self.ots_client):
- _dpd_specs.update_row(self.ots_client)
- # user interface to add
- else:
- pass
- # #update alias
- # _dpd = Document_product_dict({DOCUMENT_PRODUCT_DICT_ID:specs_ots_id})
- # _flag = _dpd.fix_columns(self.ots_client,[DOCUMENT_PRODUCT_DICT_ALIAS],True)
- # if _flag:
- # if _dpd.updateAlias(specs):
- # _dpd.update_row(self.ots_client)
- break_flag = True
- break
- else:
- list_similar_specs.append(specs)
- # add new specs?
- if new_specs is not None and new_specs!="":
- pass
- else:
- debug("specs not similar")
- for specs in list_similar_specs:
- if is_legal_specs(specs) and len(specs)<MAX_NAME_LENGTH and len(specs)>=5:
- debug("is_legal_specs")
- original_specs = specs
- new_specs = clean_product_specs(specs)
- if new_specs==new_name or new_specs==new_brand:
- new_specs = ""
- continue
- # insert into document_product_dict a new record
- # to update the document_product_dict which is builded for search
- # add new specs
- if brand_ots_id is not None and name_ots_id is not None:
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- # _d = {DOCUMENT_PRODUCT_DICT_ID:_md5,
- # DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- # DOCUMENT_PRODUCT_DICT_ALIAS:"%s&&%s"%(specs,new_specs),
- # DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- # DOCUMENT_PRODUCT_DICT_STATUS:1,
- # DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- # DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- # DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- # }
- # _dpd = Document_product_dict(_d)
- # _dpd.update_row(self.ots_client)
- log("adding new specs %s"%(new_specs))
- # user interface to add
- _d = {DOCUMENT_PRODUCT_DICT_INTERFACE_ID:uuid4().hex,
- DOCUMENT_PRODUCT_DICT_INTERFACE_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_INTERFACE_ALIAS:"%s"%(new_specs.lower()),
- DOCUMENT_PRODUCT_DICT_INTERFACE_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_INTERFACE_STATUS:1,
- DOCUMENT_PRODUCT_DICT_INTERFACE_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_INTERFACE_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_INTERFACE_ACTION:"insert"
- }
- _dpdi = Document_product_dict_interface(_d)
- _dpdi.update_row(self.ots_client)
- break
- if specs_ots_id is None:
- _find = False
- for specs in list_candidate_brand_specs:
- if _find:
- break
- debug("getting sepcs %s"%(specs))
- list_specs = []
- c_specs = clean_product_specs(specs)
- list_specs.append(c_specs)
- for s in re.split("[\u4e00-\u9fff]",specs):
- if s!="" and len(s)>4:
- list_specs.append(s)
- similar_flag = None
- _index = 0
- for c_specs in list_specs:
- if _find:
- break
- _index += 1
- specs_vector = get_embedding_request(c_specs)
- if specs_vector is not None:
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = get_embedding_search(Coll,embedding_index_name,c_specs,SPECS_GRADE,[specs_vector],self.search_params,output_fields,limit=10)
- for _search in search_list:
- if _find:
- break
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- debug("checking specs %s and %s"%(specs,ots_name))
- if is_similar(c_specs,ots_name):
- # log("specs is_similar")
- if check_specs(c_specs,ots_name):
- break_flag = True
- original_specs = c_specs
- new_specs = standard_name
- if new_specs==new_name or new_specs==new_brand:
- new_specs = ""
- continue
- if brand_ots_id is not None:
- # judge if the specs which parent_id is brand_ots_id exists,insert one if not exists else update alias
- specs_ots_id = get_document_product_dict_id(brand_ots_id,new_specs)
- _d_specs = {DOCUMENT_PRODUCT_DICT_ID:specs_ots_id,
- DOCUMENT_PRODUCT_DICT_NAME:new_specs,
- DOCUMENT_PRODUCT_DICT_ALIAS:"%s"%(str(new_specs).lower()),
- DOCUMENT_PRODUCT_DICT_GRADE:SPECS_GRADE,
- DOCUMENT_PRODUCT_DICT_STATUS:1,
- DOCUMENT_PRODUCT_DICT_PARENT_ID:brand_ots_id,
- DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED:IS_SYNCHONIZED,
- DOCUMENT_PRODUCT_DICT_CREATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- DOCUMENT_PRODUCT_DICT_UPDATE_TIME:getCurrent_date(format="%Y-%m-%d %H:%M:%S"),
- }
- _dpd_specs = Document_product_dict(_d_specs)
- # _dpd_specs.updateAlias(str(new_specs).lower())
- if not _dpd_specs.exists_row(self.ots_client):
- _dpd_specs.update_row(self.ots_client)
- _find = True
- break
- # judge if the product matches the standard product
- if name_ots_id is not None:
- is_legal_data = True
- #standard the product and same to document_product table
- _product = Document_product(tmp_dict)
- docid = _product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
- unit_price = _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)
- quantity = _product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY)
- unit_price = clean_product_unit_price(unit_price)
- quantity = clean_product_quantity(quantity)
- total_price = _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- win_bid_price = _product.getProperties().get(DOCUMENT_PRODUCT_WIN_BID_PRICE)
- if isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
- if unit_price>0:
- new_quantity = total_price/unit_price
- if new_quantity!=quantity:
- # if new_quantity==total_price//unit_price:
- # quantity = int(new_quantity)
- # _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- # else:
- # is_legal_data = False
- is_legal_data = False
- elif quantity>0:
- unit_price = total_price/quantity
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- elif isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
- total_price = float("%.2f"%(unit_price*quantity))
- _product.setValue(DOCUMENT_PRODUCT_TOTAL_PRICE,total_price,True)
- elif isinstance(unit_price,(float,int)) and isinstance(total_price,(float,int)):
- if unit_price>0:
- quantity = int(total_price//unit_price)
- _product.setValue(DOCUMENT_PRODUCT_QUANTITY,quantity,True)
- elif isinstance(quantity,(float,int)) and isinstance(total_price,(float,int)):
- if quantity>0:
- unit_price = float("%.2f"%(total_price/quantity))
- _product.setValue(DOCUMENT_PRODUCT_UNIT_PRICE,unit_price,True)
- elif isinstance(quantity,(float,int)) and quantity>10000:
- is_legal_data = False
- if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE),(float,int)) and isinstance(win_bid_price,(float,int)):
- if _product.getProperties().get(DOCUMENT_PRODUCT_TOTAL_PRICE)>win_bid_price*10 and win_bid_price>0:
- is_legal_data = False
- if isinstance(_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE),(float,int)) and _product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE)>100000000:
- is_legal_data = False
- new_id = self.get_product_id(docid,new_name,new_brand,new_specs,unit_price,quantity)
- _product.setValue(DOCUMENT_PRODUCT_ID,new_id,True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_ID,tmp_dict.get(DOCUMENT_PRODUCT_TMP_ID),True)
- if name_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_NAME_ID,name_ots_id,True)
- if brand_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_BRAND_ID,brand_ots_id,True)
- if specs_ots_id is not None:
- _product.setValue(DOCUMENT_PRODUCT_DICT_SPECS_ID,specs_ots_id,True)
- _product.setValue(DOCUMENT_PRODUCT_NAME,new_name,True)
- _product.setValue(DOCUMENT_PRODUCT_BRAND,new_brand,True)
- _product.setValue(DOCUMENT_PRODUCT_SPECS,new_specs,True)
- _product.setValue(DOCUMENT_PRODUCT_STATUS,randint(201,300),True)
- _product.setValue(DOCUMENT_PRODUCT_BRANDSPECS,"%s&&%s"%(new_brand,new_specs),True)
- _product.setValue(DOCUMENT_PRODUCT_FULL_NAME,"%s&&%s&&%s"%(new_name,new_brand,new_specs),True)
- _product.setValue(DOCUMENT_PRODUCT_CREATE_TIME,getCurrent_date(format="%Y-%m-%d %H:%M:%S"),True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_NAME,original_name,True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_BRAND,original_brand,True)
- _product.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,original_specs,True)
- bid_filemd5s = self.get_bid_filemd5s(docid,self.ots_client)
- if bid_filemd5s is not None:
- _product.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
- if not is_legal_data:
- _status = randint(501,550)
- else:
- _flag,dump_id = self.dumplicate(_product)
- if _flag:
- _status = randint(201,300)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_NEW_ID,new_id,True)
- _product.update_row(self.ots_client)
- else:
- _status = randint(451,500)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_DUMP_ID,str(dump_id),True)
- else:
- _status = randint(401,450)
- save_product_tmp.setValue(DOCUMENT_PRODUCT_TMP_STATUS,_status,True)
- save_product_tmp.update_row(self.ots_client)
- def check_new_brand(self,brand):
- return is_legal_brand(self.ots_client,brand)
- @staticmethod
- def get_bid_filemd5s(docid,ots_client):
- bool_query = BoolQuery(must_queries=[
- TermQuery("docids",docid)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search("project2","project2_index",
- SearchQuery(bool_query,limit=10),
- columns_to_get=ColumnsToGet(["docids"],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- list_bid_filemd5s = []
- set_docids = set([docid])
- set_md5s = set()
- for _d in list_data:
- try:
- docids = _d.get("docids","")
- for _id in docids.split(","):
- set_docids.add(int(_id))
- except Exception as e:
- pass
- list_docids = list(set_docids)
- for _docid in list_docids:
- _d = {document_partitionkey:_docid%500+1,
- document_docid:_docid}
- _doc = Document(_d)
- _doc.fix_columns(ots_client,[document_attachment_path],True)
- page_attachments = _doc.getProperties().get(document_attachment_path)
- if page_attachments is not None and page_attachments!="":
- attachments = json.loads(page_attachments)
- for _a in attachments:
- _filemd5 = _a.get(document_attachment_path_filemd5)
- if _filemd5 in set_md5s or _filemd5 is None:
- continue
- set_md5s.add(_filemd5)
- _da = {attachment_filemd5:_filemd5}
- _attach = attachment(_da)
- _attach.fix_columns(ots_client,[attachment_classification],True)
- if _attach.getProperties().get(attachment_classification,"")=="招标文件":
- list_bid_filemd5s.append(_filemd5)
- if len(list_bid_filemd5s)==0:
- return None
- return ",".join(list(set(list_bid_filemd5s)))
- def get_value_count(self,name,brand,specs,unit_price,quantity):
- value_count = 0
- if name is not None and len(name)>0:
- value_count += 1
- if brand is not None and len(brand)>0:
- value_count += 1
- if specs is not None and len(specs)>0:
- value_count += 1
- if isinstance(unit_price,(float,int)) and unit_price>0:
- value_count += 1
- if isinstance(quantity,(float,int)) and quantity>0:
- value_count += 1
- return value_count
- def dumplicate_search_product(self,document_product):
- docid = document_product.getProperties().get(DOCUMENT_PRODUCT_DOCID)
- name = str(document_product.getProperties().get(DOCUMENT_PRODUCT_NAME,""))
- brand = str(document_product.getProperties().get(DOCUMENT_PRODUCT_BRAND,""))
- specs = str(document_product.getProperties().get(DOCUMENT_PRODUCT_SPECS,""))
- unit_price = document_product.getProperties().get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
- quantity = document_product.getProperties().get(DOCUMENT_PRODUCT_QUANTITY,"")
- page_time = document_product.getProperties().get(DOCUMENT_PRODUCT_PAGE_TIME)
- tenderee = str(document_product.getProperties().get(DOCUMENT_PRODUCT_TENDEREE,""))
- supplier = str(document_product.getProperties().get(DOCUMENT_PRODUCT_SUPPLIER,""))
- base_value_count = self.get_value_count(name,brand,specs,unit_price,quantity)
- list_dump_id = []
- page_time_before = page_time
- page_time_after = page_time
- try:
- page_time_before = timeAdd(page_time,-30,format="%Y-%m-%d",)
- page_time_after = timeAdd(page_time,30)
- except Exception as e:
- pass
- to_save = 1
- if len(name)>0 and len(brand)>0 and len(specs)>0 and isinstance(unit_price,(float,int)) and isinstance(quantity,(float,int)):
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- RangeQuery("page_time",page_time_before,page_time_after,True,True),
- TermQuery("brand",brand),
- TermQuery("specs",specs),
- TermQuery("unit_price",unit_price),
- TermQuery("quantity",quantity)
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,limit=1),
- columns_to_get=ColumnsToGet(["name",'brand','specs'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- if len(list_data)>0:
- return list_data[0].get(DOCUMENT_PRODUCT_ID),0
- bool_query = BoolQuery(must_queries=[
- TermQuery(project_docids,str(docid)),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search("project2","project2_index",
- SearchQuery(bool_query,limit=10),
- ColumnsToGet([project_docids],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- set_docid = set()
- for _data in list_data:
- _docids = _data.get(project_docids,"")
- for d_id in _docids.split(","):
- d_id = d_id.strip()
- if d_id!="":
- set_docid.add(int(d_id))
- if docid in set_docid:
- set_docid.remove(docid)
- should_q = [TermQuery(DOCUMENT_PRODUCT_DOCID,did) for did in set_docid]
- if len(should_q)>0:
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- BoolQuery(should_queries=should_q),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,limit=50),
- columns_to_get=ColumnsToGet(["docid",'name','brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- dict_docid_name = {}
- match_ids = []
- for _data in list_data:
- docid1 = _data.get(DOCUMENT_PRODUCT_DOCID)
- name1 = _data.get(DOCUMENT_PRODUCT_NAME)
- brand1 = _data.get(DOCUMENT_PRODUCT_BRAND)
- specs1 = _data.get(DOCUMENT_PRODUCT_SPECS)
- unit_price1 = _data.get(DOCUMENT_PRODUCT_UNIT_PRICE)
- quantity1 = _data.get(DOCUMENT_PRODUCT_QUANTITY)
- id = _data.get(DOCUMENT_PRODUCT_ID)
- value_count1 = self.get_value_count(name1,brand1,specs1,unit_price1,quantity1)
- if name1==name:
- match_ids.append({DOCUMENT_PRODUCT_ID:id,"value_count":value_count1})
- if docid1 not in dict_docid_name:
- dict_docid_name[docid1] = []
- dict_docid_name[docid1].append(name)
- is_all_one = True
- for k,v in dict_docid_name.items():
- if len(v)!=1:
- is_all_one = False
- if is_all_one:
- match_ids.sort(key=lambda x:x.get("value_count",0),reverse=True)
- if len(match_ids)>0:
- _id = match_ids[0].get(DOCUMENT_PRODUCT_ID)
- value_count1 = match_ids[0]["value_count"]
- if base_value_count<value_count1:
- to_save = 0
- for _match in match_ids:
- list_dump_id.append(_match.get(DOCUMENT_PRODUCT_ID))
- if len(name)>0 and len(brand)>0 and len(supplier)>0 and len(tenderee)>0:
- # log("docid %s name %s page_time_before %s page_time_after %s brand %s supplier %s tenderee %s"%(str(docid),name,page_time_before,page_time_after,brand,supplier,tenderee))
- bool_query = BoolQuery(must_queries=[TermQuery("name",name),
- RangeQuery("page_time",page_time_before,page_time_after,True,True),
- TermQuery(DOCUMENT_PRODUCT_BRAND,brand),
- TermQuery(DOCUMENT_PRODUCT_TENDEREE,tenderee),
- TermQuery(DOCUMENT_PRODUCT_SUPPLIER,supplier),
- ])
- rows,next_token,total_count,is_all_succeed = self.ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,limit=50),
- columns_to_get=ColumnsToGet(['name','brand','specs','unit_price','quantity'],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- for _d in list_data:
- s_id = _d.get(DOCUMENT_PRODUCT_ID)
- s_name = _d.get(DOCUMENT_PRODUCT_NAME,"")
- s_brand = _d.get(DOCUMENT_PRODUCT_BRAND,"")
- s_specs = _d.get(DOCUMENT_PRODUCT_SPECS,"")
- s_unit_price = _d.get(DOCUMENT_PRODUCT_UNIT_PRICE,"")
- s_quantity = _d.get(DOCUMENT_PRODUCT_QUANTITY,"")
- check_flag = True
- value_count1 = self.get_value_count(s_name,s_brand,s_specs,s_unit_price,s_quantity)
- if len(specs)>0 and len(s_specs)>0 and specs!=s_specs:
- check_flag = False
- elif isinstance(unit_price,(float,int)) and isinstance(s_unit_price,(float,int)) and unit_price!=s_unit_price:
- check_flag = False
- elif isinstance(quantity,(float,int)) and isinstance(s_quantity,(float,int)) and quantity!=s_quantity:
- check_flag = False
- if check_flag:
- if base_value_count<value_count1:
- to_save = 0
- list_dump_id.append(s_id)
- return list_dump_id,to_save
- def dumplicate(self,document_product):
- '''
- Duplicates the product data
- 将同一个产品的采购结果公示进行去重,结合公告进行。
- :return:True if not repeated else False
- '''
- dump_id,to_save = self.dumplicate_search_product(document_product)
- if dump_id is not None:
- document_product.setValue(DOCUMENT_PRODUCT_DUMP_ID,str(dump_id),True)
- if to_save==1:
- if dump_id is not None:
- if isinstance(dump_id,str):
- _d = {DOCUMENT_PRODUCT_ID:dump_id,
- DOCUMENT_PRODUCT_STATUS:randint(401,450),
- DOCUMENT_PRODUCT_DUMP_ID:document_product.getProperties().get(DOCUMENT_PRODUCT_ID)}
- _dp = Document_product(_d)
- _dp.update_row(self.ots_client)
- elif isinstance(dump_id,list):
- for d_id in dump_id:
- _d = {DOCUMENT_PRODUCT_ID:d_id,
- DOCUMENT_PRODUCT_STATUS:randint(401,450),
- DOCUMENT_PRODUCT_DUMP_ID:document_product.getProperties().get(DOCUMENT_PRODUCT_ID)}
- _dp = Document_product(_d)
- _dp.update_row(self.ots_client)
- return True,dump_id
- else:
- return False,dump_id
- def start_processing(self):
- scheduler = BlockingScheduler()
- scheduler.add_job(self.producer,"cron",second="*/20")
- scheduler.add_job(self.comsumer,"cron",minute="*/1")
- scheduler.add_job(self.embedding_comsumer,"cron",minute="*/1")
- scheduler.add_job(self.embedding_interface_comsumer,"cron",second="*/20")
- scheduler.start()
- def test(self):
- from BaseDataMaintenance.common.sentencesUtil import cosine_similarity
- import torch
- output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id"]
- id = '56bdad168c71a1fc4d57cd10bcd987f0'
- collection,_ = self.get_collection(SPECS_GRADE)
- vector = request_embedding("西门子MAGNETOMLumina")
- vector1 = request_embedding("西门子")
- print("cosine similarity",cosine_similarity(torch.from_numpy(np.array([vector])) ,torch.from_numpy(np.array([vector1]))))
- Coll,_ = self.get_collection(SPECS_GRADE)
- search_list = search_embedding(Coll,embedding_index_name,[vector],self.search_params,output_fields,limit=60)
- for p in search_list:
- print(p)
- #
- # res = collection.query(
- # expr = "ots_id in ['%s']"%(id),
- # offset = 0,
- # limit = 10,
- # output_fields = output_fields,
- # consistency_level="Strong"
- # )
- # print(res)
- def start_process_product():
- pm = Product_Manager()
- pm.start_processing()
- def fix_product_data():
- '''
- # delete document_product and change the record status to 1 in document_product_temp which id=original id
- :return:
- '''
- table_name = "document_product_temp"
- table_index = "document_product_temp_index"
- columns = [DOCUMENT_PRODUCT_TMP_NEW_ID,DOCUMENT_PRODUCT_TMP_STATUS]
- # table_name = Document_product_table_name
- # table_index = Document_product_table_name+"_index"
- # columns = [DOCUMENT_PRODUCT_ORIGINAL_ID]
- ots_client = getConnect_ots()
- bool_query = BoolQuery(should_queries=[
- # RangeQuery("status",501),
- # TermQuery("docid",246032980)
- RangeQuery("status",201,501),
- # RangeQuery("status",401,451)
- # WildcardQuery(DOCUMENT_PRODUCT_ORIGINAL_SPECS,"MFUSOne")
- # TermQuery(DOCUMENT_PRODUCT_SPECS,"MFUSOne")
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_rows = getRow_ots(rows)
- print(total_count)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(table_name,table_index,
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- columns_to_get=ColumnsToGet(columns,return_type=ColumnReturnType.SPECIFIED))
- list_rows.extend(getRow_ots(rows))
- print("%d/%d"%(len(list_rows),total_count))
- # if len(list_rows)>10000:
- # break
- task_queue = Queue()
- for d in list_rows:
- task_queue.put(d)
- def fix_missing_data(item,result_queue):
- original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- print("original_id",original_id)
- _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
- dpt = Document_product_tmp(_d)
- dpt.fix_columns(ots_client,["name","brand","specs"],True)
- _d = {DOCUMENT_PRODUCT_ID:item.get(DOCUMENT_PRODUCT_ID)}
- dp = Document_product(_d)
- #fix the project_code and original_name and bidi_filemd5s
- docid = int(item.get(DOCUMENT_PRODUCT_DOCID))
- partitionkey = docid%500+1
- # project_name = item.get(DOCUMENT_PRODUCT_PROJECT_NAME,"")
- # if project_name=="":
- # #fix project_name
- # _doc = Document({"partitionkey":partitionkey,
- # "docid":docid})
- # _doc.fix_columns(ots_client,["doctitle"],True)
- # dp.setValue(DOCUMENT_PRODUCT_DOCTITLE,_doc.getProperties().get("doctitle"),True)
- bid_filemd5s = Product_Manager.get_bid_filemd5s(docid,ots_client)
- if bid_filemd5s is not None:
- dp.setValue(DOCUMENT_PRODUCT_BID_FILEMD5S,bid_filemd5s,True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_NAME,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_NAME,""),True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_BRAND,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_BRAND,""),True)
- dp.setValue(DOCUMENT_PRODUCT_ORIGINAL_SPECS,dpt.getProperties().get(DOCUMENT_PRODUCT_TMP_SPECS,""),True)
- dp.update_row(ots_client)
- def deleteAndReprocess(item,result_queue):
- original_id = item.get(DOCUMENT_PRODUCT_TMP_ID)
- new_id = item.get(DOCUMENT_PRODUCT_TMP_NEW_ID)
- # original_id = item.get(DOCUMENT_PRODUCT_ORIGINAL_ID)
- # new_id = item.get(DOCUMENT_PRODUCT_ID)
- print("original_id",original_id,"id",item.get(DOCUMENT_PRODUCT_ID))
- # delete data and rerun
- _d = {DOCUMENT_PRODUCT_TMP_ID:original_id,DOCUMENT_PRODUCT_TMP_STATUS:1}
- dpt = Document_product_tmp(_d)
- dpt.update_row(ots_client)
- if new_id is not None and new_id!="":
- _d = {DOCUMENT_PRODUCT_ID:new_id}
- dp = Document_product(_d)
- dp.delete_row(ots_client)
- def handle(item,result_queue):
- win_bid_price = item.get(DOCUMENT_PRODUCT_TMP_WIN_BID_PRICE,1)
- if win_bid_price==0:
- dpt = Document_product_tmp(item)
- dpt.setValue(DOCUMENT_PRODUCT_TMP_STATUS,1,True)
- dpt.update_row(ots_client)
- mt = MultiThreadHandler(task_queue,deleteAndReprocess,None,30,1)
- mt.run()
- def test_check_brand():
- import logging
- root = logging.getLogger()
- root.setLevel(logging.DEBUG)
- from queue import Queue
- brand_path = "brand.txt"
- list_brand = []
- with open(brand_path,"r",encoding="utf8") as f:
- while 1:
- line = f.readline()
- if not line:
- break
- line = line.strip()
- if len(line)>0:
- brand = {"brand":line}
- list_brand.append(brand)
- # if len(list_brand)>100:
- # break
- task_queue = Queue()
- for _d in list_brand:
- task_queue.put(_d)
- pm = Product_Manager()
- def _handle(item,result_queue):
- brand = item.get("brand")
- new_brand = clean_product_brand(brand)
- _f = pm.check_new_brand(brand)
- item["f"] = _f
- item["new_brand"] = new_brand
- mt = MultiThreadHandler(task_queue,_handle,None,30,1)
- mt.run()
- list_legal_brand = []
- list_illegal_brand = []
- for _d in list_brand:
- f = _d.get("f")
- log("brand %s flag %s"%(brand,str(f)))
- if f:
- brand = _d.get("new_brand")
- list_legal_brand.append(brand)
- else:
- brand = _d.get("brand")
- list_illegal_brand.append(brand)
- with open("../../test/legal_brand.txt", "w", encoding="utf8") as f:
- for b in list_legal_brand:
- f.write(b+"\n")
- with open("../../test/illegal_brand.txt", "w", encoding="utf8") as f:
- for b in list_illegal_brand:
- f.write(b+"\n")
- def test_match():
- a = "迈瑞晟"
- # vector = request_embedding(get_milvus_standard_name(a))
- # vector = [get_embedding_request(b) for b in a]
- pm = Product_Manager()
- _GRADE = BRAND_GRADE
- Coll,_ = pm.get_collection(_GRADE)
- print(Coll.name)
- output_fields = ['ots_id','ots_name',"ots_parent_id","standard_name","standard_name_id","remove_words","level"]
- # start_time = time.time()
- _id = get_milvus_product_dict_id(a)
- print(Coll.query(expr=" ots_id in ['%s'] "%(_id),output_fields=output_fields))
- # print("cost",time.time()-start_time)
- # print(Coll.compact())
- # result = search_embedding(Coll,embedding_index_name,[vector],pm.search_params,output_fields,limit=20)
- #
- # final_list = []
- # for _search in result:
- # _d = {}
- # for k in output_fields:
- # _d[k] = _search.entity.get(k)
- # final_list.append(_d)
- # final_list = remove_repeat_item(final_list,k="ots_name")
- start_time = time.time()
- # final_list = get_embedding_search(Coll,embedding_index_name,a,_GRADE,vector,pm.search_params,output_fields,limit=5)
- final_list = get_intellect_search(Coll,embedding_index_name,a,_GRADE,pm.search_params,output_fields,limit=10)
- for _search in final_list:
- ots_id = _search.get("standard_name_id")
- ots_name = _search.get("ots_name")
- standard_name = _search.get("standard_name")
- ots_parent_id = _search.get("ots_parent_id")
- remove_words = _search.get("remove_words")
- if check_brand(a,ots_name,remove_words):
- print("similar",a,ots_name)
- else:
- print("not similar",a,ots_name)
- print("cost",time.time()-start_time)
- print(final_list)
- def rebuild_milvus():
- pdm = Product_Dict_Manager()
- from multiprocessing import Queue as PQueue
- bool_query = BoolQuery(must_queries=[
- RangeQuery(DOCUMENT_PRODUCT_DICT_GRADE,3)
- ])
- ots_client = getConnect_ots()
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("name")]),limit=100,get_total_count=True),
- ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet([DOCUMENT_PRODUCT_DICT_GRADE,DOCUMENT_PRODUCT_DICT_NAME,DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS],return_type=ColumnReturnType.SPECIFIED))
- list_data.extend(getRow_ots(rows))
- print("%d/%d"%(len(list_data),total_count))
- # if len(list_data)>1000:
- # break
- set_name_grade = set()
- task_queue = PQueue()
- for _data in list_data:
- name = _data.get(DOCUMENT_PRODUCT_DICT_NAME)
- grade = _data.get(DOCUMENT_PRODUCT_DICT_GRADE)
- _key = "%s--%d"%(name,grade)
- if _key not in set_name_grade:
- task_queue.put(_data)
- set_name_grade.add(_key)
- log("rebuild milvus %d counts"%(task_queue.qsize()))
- def insert_into_milvus(item,result_queue):
- name = item.get(DOCUMENT_PRODUCT_DICT_NAME,"")
- grade = item.get(DOCUMENT_PRODUCT_DICT_GRADE)
- if grade==SPECS_GRADE:
- name = clean_product_specs(name)
- if len(name)<2:
- return
- if len(name)<2:
- return
- parent_id = item.get(DOCUMENT_PRODUCT_DICT_PARENT_ID,"")
- Coll,_ = pdm.get_collection(grade)
- standard_alias = item.get(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,"")
- log("insert name %s grade %d"%(name,grade))
- remove_words = item.get(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,"")
- level = item.get(DOCUMENT_PRODUCT_DICT_LEVEL)
- if level is None:
- if re.search("装置|设备",name) is not None:
- level = 2
- else:
- level = 1
- insert_new_record_to_milvus(Coll,name,grade,parent_id,standard_alias,remove_words,level)
- def start_thread():
- mt = MultiThreadHandler(task_queue,insert_into_milvus,None,5)
- mt.run()
- p_count = 5
- list_p = []
- for i in range(p_count):
- p = Process(target=start_thread)
- list_p.append(p)
- for p in list_p:
- p.start()
- for p in list_p:
- p.join()
- def move_document_product():
- bool_query = BoolQuery(must_queries=[
- ExistsQuery(DOCUMENT_PRODUCT_NAME)
- ])
- ots_client = getConnect_ots()
- Document_product_table_name = "document_product"
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("name")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data = getRow_ots(rows)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_table_name,Document_product_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.ALL))
- list_data.extend(getRow_ots(rows))
- print("%d/%d"%(len(list_data),total_count))
- # if len(list_data)>=1000:
- # break
- task_queue = Queue()
- for _data in list_data:
- task_queue.put(_data)
- def _handle(item,result_queue):
- D1 = Document_product(item)
- D1.update_row(ots_client)
- D1.table_name = Document_product_table_name
- D1.delete_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- current_path = os.path.dirname(__file__)
- def delete_brands():
- filename = os.path.join(current_path,"illegal_brand.txt")
- ots_client = getConnect_ots()
- list_brand = []
- with open(filename,"r",encoding="utf8") as f:
- while 1:
- brand = f.readline()
- if not brand:
- break
- brand = brand.strip()
- list_brand.append(brand)
- pm = Product_Manager()
- Coll,_ = pm.get_collection(BRAND_GRADE)
- print(Coll.name)
- Coll.compact()
- _count = 0
- task_queue = Queue()
- for brand in list_brand:
- _count += 1
- task_queue.put(brand)
- # if _count>=2:
- # break
- def _handle(brand,result_queue):
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,BRAND_GRADE),
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,brand)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- _id = get_milvus_product_dict_id(brand)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dpd = Document_product_dict(_d)
- dpd.delete_row(ots_client)
- # print(Coll.query(expr=" ots_id in ['%s']"%(_id),output_fields=["ots_id","ots_name"]))
- delete_counts = Coll.delete(expr=" ots_id in ['%s']"%(_id)).delete_count
- log("brand %s total_count %d md5:%s delete_counts:%d"%(brand,total_count,_id,delete_counts))
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- def delete_specs():
- filename = os.path.join(current_path,"illegal_specs.txt")
- ots_client = getConnect_ots()
- list_brand = []
- with open(filename,"r",encoding="utf8") as f:
- while 1:
- brand = f.readline()
- if not brand:
- break
- brand = brand.strip()
- list_brand.append(brand)
- pm = Product_Manager()
- Coll,_ = pm.get_collection(SPECS_GRADE)
- print(Coll.name)
- Coll.compact()
- _count = 0
- task_queue = Queue()
- for specs in list_brand:
- task_queue.put(specs)
- _count += 1
- # if _count>=2:
- # break
- def _handle(specs,result_queue):
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,SPECS_GRADE),
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,specs)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- _id = get_milvus_product_dict_id(specs)
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,limit=100,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data.extend(getRow_ots(rows))
- for _d in list_data:
- dpd = Document_product_dict(_d)
- dpd.delete_row(ots_client)
- # print(Coll.query(expr=" ots_id in ['%s']"%(_id),output_fields=["ots_id","ots_name"]))
- delete_counts = Coll.delete(expr=" ots_id in ['%s']"%(_id)).delete_count
- log("brand %s total_count %d md5:%s delete_counts:%d"%(specs,total_count,_id,delete_counts))
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- Coll.compact()
- def remove_redis_keys():
- db = redis.Redis(connection_pool=pool_product)
- db.flushdb()
- def update_document_product_dict():
- import pandas as pd
- filename = "update_product.csv"
- df = pd.read_csv(filename,encoding="gbk")
- ots_client = getConnect_ots()
- for name,grade,standard_alias,remove_words,level in zip(df["name"],df["grade"],df["standard_alias"],df["remove_words"],df["level"]):
- name = name.strip()
- bool_query = BoolQuery(must_queries=[
- TermQuery(DOCUMENT_PRODUCT_DICT_NAME,name),
- TermQuery(DOCUMENT_PRODUCT_DICT_GRADE,grade)
- ])
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_table_name,Document_product_dict_table_name+"_index",
- SearchQuery(bool_query,get_total_count=True),
- ColumnsToGet(return_type=ColumnReturnType.NONE))
- if total_count==1:
- list_data = getRow_ots(rows)
- _data = list_data[0]
- dpd = Document_product_dict(_data)
- level = 1
- if re.search("器械|设备|其他",name) is not None and level==1:
- level = 2
- if str(remove_words)=="nan":
- remove_words = ""
- dpd.setValue(DOCUMENT_PRODUCT_DICT_STANDARD_ALIAS,standard_alias,True)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_REMOVE_WORDS,remove_words,True)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_LEVEL,level,True)
- dpd.setValue(DOCUMENT_PRODUCT_DICT_IS_SYNCHONIZED,IS_SYNCHONIZED+1,True)
- dpd.update_row(ots_client)
- print(dpd.getProperties())
- def test():
- # pm = Product_Manager()
- # pm.test()
- # fix_product_data()
- # test_check_brand()
- test_match()
- # rebuild_milvus()
- # move_document_product()
- # delete_brands()
- # delete_specs()
- # remove_redis_keys()
- # update_document_product_dict()
- def clean_product_dict_interface():
- ots_client = getConnect_ots()
- bool_query = BoolQuery(must_queries=[
- BoolQuery(should_queries=[
- TermQuery("action","insert"),
- TermQuery("action","base")
- ])
- ])
- task_queue = Queue()
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,sort=Sort(sorters=[FieldSort("status")]),get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- while next_token:
- rows,next_token,total_count,is_all_succeed = ots_client.search(Document_product_dict_interface_table_name,Document_product_dict_interface_table_name+"_index",
- SearchQuery(bool_query,next_token=next_token,get_total_count=True,limit=100),
- columns_to_get=ColumnsToGet(return_type=ColumnReturnType.NONE))
- list_data = getRow_ots(rows)
- for _data in list_data:
- task_queue.put(_data)
- print("%d/%d"%(task_queue.qsize(),total_count))
- def _handle(item,result_queue):
- _dpd = Document_product_dict_interface(item)
- _dpd.delete_row(ots_client)
- mt = MultiThreadHandler(task_queue,_handle,None,30)
- mt.run()
- if __name__ == '__main__':
- test()
- # start_process_product()
- # print(getMD5('11936c56f2dd1426764e317ca2e8e1a7'+'&&鱼跃'))
- # print(Product_Manager.get_bid_filemd5s(155415770,getConnect_ots()))
- # name = "一"
- # ots_name = "一氧化碳分析仪"
- # print(is_similar(name,ots_name),check_product(name,ots_name))
- # print(is_legal_specs('SCM-A/SB(0.18D)'))
- # clean_product_dict_interface()
|