import json import re from decimal import Decimal from fuzzywuzzy import fuzz def getDigitsDic(unit): ''' @summary:拿到中文对应的数字 ''' DigitsDic = {"零":0, "壹":1, "贰":2, "叁":3, "肆":4, "伍":5, "陆":6, "柒":7, "捌":8, "玖":9, "〇":0, "一":1, "二":2, "三":3, "四":4, "五":5, "六":6, "七":7, "八":8, "九":9} return DigitsDic.get(unit) def getMultipleFactor(unit): ''' @summary:拿到单位对应的值 ''' MultipleFactor = {"兆":Decimal(1000000000000),"亿":Decimal(100000000),"万":Decimal(10000),"仟":Decimal(1000),"千":Decimal(1000),"佰":Decimal(100),"百":Decimal(100),"拾":Decimal(10),"十":Decimal(10),"元":Decimal(1),"圆":Decimal(1),"角":round(Decimal(0.1),1),"分":round(Decimal(0.01),2)} return MultipleFactor.get(unit) def getUnifyMoney(money): ''' @summary:将中文金额字符串转换为数字金额 @param: money:中文金额字符串 @return: decimal,数据金额 ''' if money in [None, '', '-']: return Decimal(0) money = str(money) MAX_MONEY = 1000000000000 MAX_NUM = 12 #去掉逗号 money = re.sub("[,,]","",money) money = re.sub("[^0-9.零壹贰叁肆伍陆柒捌玖拾佰仟萬億圆十百千万亿元角分]","",money) result = Decimal(0) chnDigits = ["零", "壹", "贰", "叁", "肆", "伍", "陆", "柒", "捌", "玖"] # chnFactorUnits = ["兆", "亿", "万", "仟", "佰", "拾","圆","元","角","分"] chnFactorUnits = ["兆", "亿", "万", "仟", '千', "佰", '百', "拾", '十',"圆", "元", "角", "分"] # 20240611 修复大写提取错误 '陆拾陆亿伍千柒佰零叁万肆千叁佰陆拾伍元' Decimal('11607430365') LowMoneypattern = re.compile("^[\d,]+(\.\d+)?$") BigMoneypattern = re.compile("^零?(?P[%s])$"%("".join(chnDigits))) try: if re.search(LowMoneypattern,money) is not None: return Decimal(money) elif re.search(BigMoneypattern,money) is not None: return getDigitsDic(re.search(BigMoneypattern,money).group("BigMoney")) for factorUnit in chnFactorUnits: if re.search(re.compile(".*%s.*"%(factorUnit)),money) is not None: subMoneys = re.split(re.compile("%s(?!.*%s.*)"%(factorUnit,factorUnit)),money) if re.search(re.compile("^(\d+)(\.\d+)?$"),subMoneys[0]) is not None: if MAX_MONEY/getMultipleFactor(factorUnit)1: if re.search(re.compile("^(\d+(,)?)+(\.\d+)?[百千万亿]?\s?(元)?$"),subMoneys[1]) is not None: result += Decimal(subMoneys[1]) elif len(subMoneys[1])==1: if re.search(re.compile("^[%s]$"%("".join(chnDigits))),subMoneys[1]) is not None: result += Decimal(getDigitsDic(subMoneys[1])) else: result += Decimal(getUnifyMoney(subMoneys[1])) break except Exception as e: # traceback.print_exc() return Decimal(0) return result def text_sim(s1, s2): s1 = str(s1 or "").strip() s2 = str(s2 or "").strip() if not s1 and not s2: return 1.0 if not s1 or not s2: return 0.0 return fuzz.token_sort_ratio(s1, s2) / 100 def num_sim(n1, n2): try: a = float(n1) b = float(n2) except: return 0.0 diff = abs(a - b) return 1.0 - diff / max(a, b, 0.01) def product_similarity(p1, p2): score = 0.0 # score += text_sim(p1.get("产品名称"), p2.get("产品名称")) * 0.4 # score += text_sim(p1.get("规格型号"), p2.get("规格型号")) * 0.3 # score += text_sim(p1.get("品牌"), p2.get("品牌")) * 0.1 # score += num_sim(p1.get("单价"), p2.get("单价")) * 0.08 # score += num_sim(p1.get("数量"), p2.get("数量")) * 0.06 # score += num_sim(p1.get("总价"), p2.get("总价")) * 0.06 score += text_sim(p1.get("产品名称"), p2.get("产品名称")) * 0.125 score += text_sim(p1.get("规格型号"), p2.get("规格型号")) * 0.125 score += text_sim(p1.get("品牌"), p2.get("品牌")) * 0.125 score += num_sim(p1.get("单价"), p2.get("单价")) * 0.125 score += num_sim(p1.get("数量"), p2.get("数量")) * 0.125 score += num_sim(p1.get("总价"), p2.get("总价")) * 0.125 score += text_sim(p1.get("品目编号"), p2.get("品目编号")) * 0.125 score += text_sim(p1.get("品目名称"), p2.get("品目名称")) * 0.125 return score def calculate_matching_ratio(list_a, list_b, threshold=0.6): if list_a is None or list_b is None: return 0 used_b = [False] * len(list_b) match_count = 0 for a in list_a: best_score = 0 best_idx = -1 for i, b in enumerate(list_b): if used_b[i]: continue score = product_similarity(a, b) if score > best_score: best_score = score best_idx = i if best_score >= threshold: match_count += 1 if best_idx != -1: used_b[best_idx] = True total = max(len(list_a), len(list_b)) return match_count / total if total != 0 else 1.0 def calculate_complete_match_ratio(list1, list2): if list1 is None or list2 is None: return 0 example_d = {"产品名称": "树脂等物资", "单价": "未公开", "数量": "未公开", "数量单位": "未公开", "总价": "未公开", "品牌": "未公开", "规格型号": "未公开", "品目编号": "未公开", "品目名称": "未公开"} cols = list(example_d.keys()) str_list1 = [] for d in list1: str1 = '@'.join([str(d.get(x, '')) for x in cols]) str_list1.append(str1) str_list2 = [] for d in list2: str2 = '@'.join([str(d.get(x, '')) for x in cols]) str_list2.append(str2) match_cnt = 0 for str1 in str_list1: if str1 in str_list2: match_cnt += 1 all_cnt = len(list(set(str_list1) | set(str_list2))) ratio = round(match_cnt / all_cnt, 2) return ratio def calculate_cnt_ratio(list1, list2): if list1 is None: list1 = [] if list2 is None: list2 = [] if len(list1) == len(list2): return 1 else: return 0 def compare_products(product_list1, product_list2): product_list1 = json.dumps(product_list1, ensure_ascii=False) product_list2 = json.dumps(product_list2, ensure_ascii=False) product_list1 = re.sub('未公开', '', product_list1) product_list2 = re.sub('未公开', '', product_list2) product_list1 = json.loads(product_list1) product_list2 = json.loads(product_list2) money_cols = ['单价', '总价'] if product_list1: for d in product_list1: for col in money_cols: d[col] = getUnifyMoney(d.get(col)) if product_list2: for d in product_list2: for col in money_cols: d[col] = getUnifyMoney(d.get(col)) weight_score = calculate_matching_ratio(product_list1, product_list2) complete_score = calculate_complete_match_ratio(product_list1, product_list2) cnt_score = calculate_cnt_ratio(product_list1, product_list2) return weight_score, complete_score, cnt_score