123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- from BaseDataMaintenance.maintenance.product.product_setting import *
- import Levenshtein
- import re
- # 判断是不是入参字符串为全中文
- from BaseDataMaintenance.dataSource.source import getConnect_redis_product
- from BaseDataMaintenance.dataSource.pool import ConnectorPool
- from BaseDataMaintenance.common.Utils import log
- from BaseDataMaintenance.common.documentFingerprint import getMD5
- from BaseDataMaintenance.common.milvusUtil import search_embedding
- pool_product = ConnectorPool(10,30,getConnect_redis_product)
- def get_milvus_search(coll,index_name,name,vector,search_params,output_fields,limit=3):
- if name is None or name=="":
- return None
- db = pool_product.getConnector()
- try:
- _md5 = getMD5(str(name))+"_milvus"
- _search_list = db.get(_md5)
- if _search_list is not None:
- return json.loads(_search_list)
- else:
- list_result = []
- result = coll.search(vector,index_name,search_params,top_k=limit,output_fields=output_fields,limit=limit)
- for hits in result:
- for hit in hits:
- list_result.append(hit)
- final_list = []
- for _search in list_result:
- _d = {}
- for k in output_fields:
- _d[k] = _search.entity.get(k)
- final_list.append(_d)
- db.set(_md5,json.dumps(final_list))
- db.expire(_md5,2*60)
- return final_list
- except Exception as e:
- log("getExtract_json_fromRedis error %s"%(str(e)))
- raise RuntimeError("get milvus search error")
- finally:
- try:
- if db.connection.check_health():
- pool_product.putConnector(db)
- except Exception as e:
- pass
- return None
- return list_result
- def get_embedding_request(sentence,retry_times=3):
- if sentence is None or sentence=="":
- return None
- db = pool_product.getConnector()
- try:
- _md5 = getMD5(str(sentence))+"_embedding"
- _embedding = db.get(_md5)
- if _embedding is not None:
- return json.loads(_embedding)
- else:
- _embedding = request_embedding(sentence,retry_times=retry_times)
- if _embedding is not None:
- db.set(_md5,json.dumps(_embedding))
- return _embedding
- except Exception as e:
- log("getExtract_json_fromRedis error %s"%(str(e)))
- raise RuntimeError("get embedding request error")
- finally:
- try:
- if db.connection.check_health():
- pool_product.putConnector(db)
- except Exception as e:
- pass
- return None
- def judge_pur_chinese(keyword):
- """
- 中文字符的编码范围为: u'\u4e00' -- u'\u9fff:只要在此范围内就可以判断为中文字符串
- @param keyword:
- @return:
- """
- # 定义一个需要删除的标点符号字符串列表
- remove_chars = '[·’!"\#$%&\'()#!()*+,-./:;<=>?\@,:?¥★、….>【】[]《》?“”‘’\[\\]^_`{|}~]+'
- # 利用re.sub来删除中文字符串中的标点符号
- strings = re.sub(remove_chars, "", keyword) # 将keyword中文字符串中remove_chars中包含的标点符号替换为空字符串
- for ch in strings:
- if u'\u4e00' <= ch <= u'\u9fff':
- pass
- else:
- return False
- return True
- def get_chinese_string(string):
- list_s = []
- for s in re.split("[^\u4e00-\u9fff]",string):
- if s!="" and len(s)>=2:
- list_s.append(s)
- return list_s
- def is_area_brand(brand,area_set):
- brand = re.sub("[省市区县等]","",brand)
- if len(brand)>12:
- return 0
- for _i in range(2,len(brand)):
- ns = brand[:_i]
- ns1 = brand[_i:]
- if ns in area_set and (ns1 in area_set or ns1==""):
- return 2
- if ns in area_set and len(brand)-_i<=5 and len(brand)-_i>=2:
- return 1
- return 0
- def jaccard_score(source,target):
- source_set = set([s for s in source])
- target_set = set([s for s in target])
- if len(source_set)==0 or len(target_set)==0:
- return 0
- return max(len(source_set&target_set)/len(source_set),len(source_set&target_set)/len(target_set))
- from fuzzywuzzy import fuzz
- def is_similar(source,target,_radio=None):
- source = str(source).lower()
- target = str(target).lower()
- max_len = max(len(source),len(target))
- min_len = min(len(source),len(target))
- min_ratio = 90
- if min_len>=3:
- min_ratio = 87
- if min_len>=5:
- min_ratio = 85
- if _radio is not None:
- min_ratio = _radio
- # dis_len = abs(len(source)-len(target))
- # min_dis = min(max_len*0.2,4)
- if min_len==0 and max_len>0:
- return False
- if max_len<=2:
- if source==target:
- return True
- if min_len<2:
- return False
- #判断相似度
- similar = fuzz.ratio(source,target)
- if similar>=min_ratio:
- return True
- similar_jaro = Levenshtein.jaro(source,target)
- if similar_jaro*100>=min_ratio:
- return True
- similar_jarow = Levenshtein.jaro_winkler(source,target)
- if similar_jarow*100>=90:
- return True
- if min_len>=5:
- if len(source)==max_len and str(source).find(target)>=0:
- return True
- elif len(target)==max_len and target.find(source)>=0:
- return True
- elif jaccard_score(source, target)==1 and judge_pur_chinese(source) and judge_pur_chinese(target):
- return True
- # 全中文判断是否包含
- if len(source)==max_len and judge_pur_chinese(target):
- if str(source).find(target)>=0:
- return True
- if len(target)==max_len and judge_pur_chinese(source):
- if target.find(source)>=0:
- return True
- return False
- def is_contain(source,target,min_len=2):
- if len(source)>=len(target) and target in source and len(target)>=min_len:
- return True
- if len(target)>len(source) and source in target and len(source)>=min_len:
- return True
- return False
- def check_product(source,target):
- if is_contain(source,target,min_len=3):
- return True
- return False
- def check_brand(source,target):
- source = str(source).lower()
- target = str(target).lower()
- if is_contain(source,target):
- return True
- SPECS_CHECK_SET = set([i for i in 'abcdefghijklmnopqrstuvwxyz']) | set([i for i in '0123456789.']) | set([i for i in 'IⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ'])
- NOT_SPECS_PATTERN = re.compile("[^%s]"%("".join(list(SPECS_CHECK_SET))))
- def has_same_specs_count(source, target):
- source = str(source).lower()
- target = str(target).lower()
- # just take care of type and count,lack of order
- dict_source = {}
- dict_target = {}
- for s in source:
- if s in SPECS_CHECK_SET:
- if s not in dict_source:
- dict_source[s] = 0
- dict_source[s] += 1
- for s in target:
- if s in SPECS_CHECK_SET:
- if s not in dict_target:
- dict_target[s] = 0
- dict_target[s] += 1
- union_keys = set(list(dict_source.keys())) & set(list(dict_target.keys()))
- if len(dict_source.keys())!= len(union_keys):
- return False
- for k,v in dict_source.items():
- if v!=dict_target.get(k):
- return False
- return True
- SPECS_PATTERN = re.compile("[^A-Za-z0-9-\\/()().]")
- def is_legal_specs(specs):
- if specs is None or specs=="":
- return False
- specs = str(specs).lower()
- if re.search(SPECS_PATTERN,specs) is not None:
- return False
- # for s in specs:
- # if re.search(SPECS_PATTERN,s) is not None:
- # return False
- return True
- def check_specs(source,target):
- '''
- check if the source specs is the same as the target
- same only if the chars in SPECS_CHECK_SET have the same counts
- :param source:
- :param target:
- :return:
- '''
- source = str(source).lower()
- target = str(target).lower()
- source = re.sub(NOT_SPECS_PATTERN,'',source)
- target = re.sub(NOT_SPECS_PATTERN,'',target)
- if source==target and len(source)>0:
- return True
- if has_same_specs_count(source,target):
- _index = 0
- for _i in range(min(len(source),len(target))):
- _index = -(_i+1)
- if source[_index]!=target[_index]:
- break
- if abs(_index)>min(len(source),len(target))//2:
- return True
- return False
- import json
- import requests
- session = requests.Session()
- def request_embedding(sentence,retry_times=3):
- for _ in range(retry_times):
- resp = session.post(embedding_url,json={"sentence":sentence})
- if resp.status_code==200:
- content = resp.content.decode("utf-8")
- _d = json.loads(content)
- if _d.get("success"):
- return _d.get("vector")
- return None
- def clean_product_name(product_name):
- '''
- clean before insert
- :param product_name:
- :return:
- '''
- return product_name
- def clean_product_brand(product_brand):
- '''
- clean before insert
- :param product_brand:
- :return:
- '''
- _search = re.search("品牌[::;;](?P<brand>.{2,8}?)([.。、;::]|规格|型号|生产厂家|厂家)",product_brand)
- if _search is not None:
- product_brand = _search.groupdict().get("brand")
- brand = re.sub("[/\\,,、.|等]|一批|/无|品牌",'',product_brand)
- return brand
- def clean_product_specs(product_specs):
- '''
- clean before insert
- :param product_specs:
- :return:
- '''
- _specs = re.sub(SPECS_PATTERN,'',product_specs)
- if len(_specs)>0:
- return _specs
- return product_specs
- def clean_product_unit_price(product_unit_price):
- '''
- clean before insert
- :param product_unit_price:
- :return:
- '''
- try:
- if product_unit_price is not None and product_unit_price!="":
- _price = float(product_unit_price)
- return _price
- except Exception as e:
- return ""
- return ""
- def clean_product_quantity(product_quantity):
- '''
- :param product_quantity:
- :return:
- '''
- try:
- if product_quantity is not None and product_quantity!="":
- _quantity = int(product_quantity)
- return _quantity
- except Exception as e:
- return ""
- return ""
- if __name__ == '__main__':
- print(is_similar('128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10','VolusonE10'))
- print(re.split("[^\u4e00-\u9fff]",'128排RevolutionCTES彩色多普勒超声诊断仪VolusonE10'))
- import Levenshtein
- print(Levenshtein.ratio('助听器','助行器'))
- a = "无锡贝尔森品牌"
- print(clean_product_brand(a))
|