| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- import json
- import re
- from decimal import Decimal
- from fuzzywuzzy import fuzz
- def getDigitsDic(unit):
- '''
- @summary:拿到中文对应的数字
- '''
- DigitsDic = {"零":0, "壹":1, "贰":2, "叁":3, "肆":4, "伍":5, "陆":6, "柒":7, "捌":8, "玖":9,
- "〇":0, "一":1, "二":2, "三":3, "四":4, "五":5, "六":6, "七":7, "八":8, "九":9}
- return DigitsDic.get(unit)
- def getMultipleFactor(unit):
- '''
- @summary:拿到单位对应的值
- '''
- MultipleFactor = {"兆":Decimal(1000000000000),"亿":Decimal(100000000),"万":Decimal(10000),"仟":Decimal(1000),"千":Decimal(1000),"佰":Decimal(100),"百":Decimal(100),"拾":Decimal(10),"十":Decimal(10),"元":Decimal(1),"圆":Decimal(1),"角":round(Decimal(0.1),1),"分":round(Decimal(0.01),2)}
- return MultipleFactor.get(unit)
- def getUnifyMoney(money):
- '''
- @summary:将中文金额字符串转换为数字金额
- @param:
- money:中文金额字符串
- @return: decimal,数据金额
- '''
- if money in [None, '', '-']:
- return Decimal(0)
- money = str(money)
- MAX_MONEY = 1000000000000
- MAX_NUM = 12
- #去掉逗号
- money = re.sub("[,,]","",money)
- money = re.sub("[^0-9.零壹贰叁肆伍陆柒捌玖拾佰仟萬億圆十百千万亿元角分]","",money)
- result = Decimal(0)
- chnDigits = ["零", "壹", "贰", "叁", "肆", "伍", "陆", "柒", "捌", "玖"]
- # chnFactorUnits = ["兆", "亿", "万", "仟", "佰", "拾","圆","元","角","分"]
- chnFactorUnits = ["兆", "亿", "万", "仟", '千', "佰", '百', "拾", '十',"圆", "元", "角", "分"] # 20240611 修复大写提取错误 '陆拾陆亿伍千柒佰零叁万肆千叁佰陆拾伍元' Decimal('11607430365')
- LowMoneypattern = re.compile("^[\d,]+(\.\d+)?$")
- BigMoneypattern = re.compile("^零?(?P<BigMoney>[%s])$"%("".join(chnDigits)))
- try:
- if re.search(LowMoneypattern,money) is not None:
- return Decimal(money)
- elif re.search(BigMoneypattern,money) is not None:
- return getDigitsDic(re.search(BigMoneypattern,money).group("BigMoney"))
- for factorUnit in chnFactorUnits:
- if re.search(re.compile(".*%s.*"%(factorUnit)),money) is not None:
- subMoneys = re.split(re.compile("%s(?!.*%s.*)"%(factorUnit,factorUnit)),money)
- if re.search(re.compile("^(\d+)(\.\d+)?$"),subMoneys[0]) is not None:
- if MAX_MONEY/getMultipleFactor(factorUnit)<Decimal(subMoneys[0]):
- return Decimal(0)
- result += Decimal(subMoneys[0])*(getMultipleFactor(factorUnit))
- elif len(subMoneys[0])==1:
- if re.search(re.compile("^[%s]$"%("".join(chnDigits))),subMoneys[0]) is not None:
- result += Decimal(getDigitsDic(subMoneys[0]))*(getMultipleFactor(factorUnit))
- # subMoneys[0]中无金额单位,不可再拆分
- elif subMoneys[0]=="":
- result += 0
- elif re.search(re.compile("[%s]"%("".join(chnFactorUnits))),subMoneys[0]) is None:
- # print(subMoneys)
- # subMoneys[0] = subMoneys[0][0]
- result += Decimal(getUnifyMoney(subMoneys[0])) * (getMultipleFactor(factorUnit))
- else:
- result += Decimal(getUnifyMoney(subMoneys[0]))*(getMultipleFactor(factorUnit))
- if len(subMoneys)>1:
- if re.search(re.compile("^(\d+(,)?)+(\.\d+)?[百千万亿]?\s?(元)?$"),subMoneys[1]) is not None:
- result += Decimal(subMoneys[1])
- elif len(subMoneys[1])==1:
- if re.search(re.compile("^[%s]$"%("".join(chnDigits))),subMoneys[1]) is not None:
- result += Decimal(getDigitsDic(subMoneys[1]))
- else:
- result += Decimal(getUnifyMoney(subMoneys[1]))
- break
- except Exception as e:
- # traceback.print_exc()
- return Decimal(0)
- return result
- def text_sim(s1, s2):
- s1 = str(s1 or "").strip()
- s2 = str(s2 or "").strip()
- if not s1 and not s2:
- return 1.0
- if not s1 or not s2:
- return 0.0
- return fuzz.token_sort_ratio(s1, s2) / 100
- def num_sim(n1, n2):
- try:
- a = float(n1)
- b = float(n2)
- except:
- return 0.0
- diff = abs(a - b)
- return 1.0 - diff / max(a, b, 0.01)
- def product_similarity(p1, p2):
- score = 0.0
- # score += text_sim(p1.get("产品名称"), p2.get("产品名称")) * 0.4
- # score += text_sim(p1.get("规格型号"), p2.get("规格型号")) * 0.3
- # score += text_sim(p1.get("品牌"), p2.get("品牌")) * 0.1
- # score += num_sim(p1.get("单价"), p2.get("单价")) * 0.08
- # score += num_sim(p1.get("数量"), p2.get("数量")) * 0.06
- # score += num_sim(p1.get("总价"), p2.get("总价")) * 0.06
- score += text_sim(p1.get("产品名称"), p2.get("产品名称")) * 0.125
- score += text_sim(p1.get("规格型号"), p2.get("规格型号")) * 0.125
- score += text_sim(p1.get("品牌"), p2.get("品牌")) * 0.125
- score += num_sim(p1.get("单价"), p2.get("单价")) * 0.125
- score += num_sim(p1.get("数量"), p2.get("数量")) * 0.125
- score += num_sim(p1.get("总价"), p2.get("总价")) * 0.125
- score += text_sim(p1.get("品目编号"), p2.get("品目编号")) * 0.125
- score += text_sim(p1.get("品目名称"), p2.get("品目名称")) * 0.125
- return score
- def calculate_matching_ratio(list_a, list_b, threshold=0.6):
- if list_a is None or list_b is None:
- return 0
- used_b = [False] * len(list_b)
- match_count = 0
- for a in list_a:
- best_score = 0
- best_idx = -1
- for i, b in enumerate(list_b):
- if used_b[i]:
- continue
- score = product_similarity(a, b)
- if score > best_score:
- best_score = score
- best_idx = i
- if best_score >= threshold:
- match_count += 1
- if best_idx != -1:
- used_b[best_idx] = True
- total = max(len(list_a), len(list_b))
- return match_count / total if total != 0 else 1.0
- def calculate_complete_match_ratio(list1, list2):
- if list1 is None or list2 is None:
- return 0
- example_d = {"产品名称": "树脂等物资", "单价": "未公开", "数量": "未公开", "数量单位": "未公开", "总价": "未公开",
- "品牌": "未公开", "规格型号": "未公开", "品目编号": "未公开", "品目名称": "未公开"}
- cols = list(example_d.keys())
- str_list1 = []
- for d in list1:
- str1 = '@'.join([str(d.get(x, '')) for x in cols])
- str_list1.append(str1)
- str_list2 = []
- for d in list2:
- str2 = '@'.join([str(d.get(x, '')) for x in cols])
- str_list2.append(str2)
- match_cnt = 0
- for str1 in str_list1:
- if str1 in str_list2:
- match_cnt += 1
- all_cnt = len(list(set(str_list1) | set(str_list2)))
- ratio = round(match_cnt / all_cnt, 2)
- return ratio
- def calculate_cnt_ratio(list1, list2):
- if list1 is None:
- list1 = []
- if list2 is None:
- list2 = []
- if len(list1) == len(list2):
- return 1
- else:
- return 0
- def compare_products(product_list1, product_list2):
- product_list1 = json.dumps(product_list1, ensure_ascii=False)
- product_list2 = json.dumps(product_list2, ensure_ascii=False)
- product_list1 = re.sub('未公开', '', product_list1)
- product_list2 = re.sub('未公开', '', product_list2)
- product_list1 = json.loads(product_list1)
- product_list2 = json.loads(product_list2)
- money_cols = ['单价', '总价']
- if product_list1:
- for d in product_list1:
- for col in money_cols:
- d[col] = getUnifyMoney(d.get(col))
- if product_list2:
- for d in product_list2:
- for col in money_cols:
- d[col] = getUnifyMoney(d.get(col))
- weight_score = calculate_matching_ratio(product_list1, product_list2)
- complete_score = calculate_complete_match_ratio(product_list1, product_list2)
- cnt_score = calculate_cnt_ratio(product_list1, product_list2)
- return weight_score, complete_score, cnt_score
|