#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ @author: bidikeji @time: 2024/5/9 14:40 """ import re import os import pickle class DistrictExtractor(): def __init__(self): with open(os.path.join(os.path.dirname(__file__),'district_tuple.pkl'), 'rb') as f: district_tuple = pickle.load(f) self.p_pro, self.p_city, self.p_dis, self.idx_dic, self.full_dic, self.short_dic = district_tuple def get_area(self, text, web_name, in_content=False): p_pro, p_city, p_dis, idx_dic, full_dic, short_dic = self.p_pro, self.p_city, self.p_dis, self.idx_dic, self.full_dic, self.short_dic def get_final_addr(pro_ids, city_ids, dis_ids): ''' 先把所有匹配的全称、简称转为id,如果省份不为空,城市不为空且有城市属于省份的取该城市 :param province_l: 匹配到的所有省份 :param city_l: 匹配到的所有城市 :param district_l: 匹配到的所有区县 :return: ''' big_area = "" pred_pro = "" pred_city = "" pred_dis = "" final_pro = "" final_city = "" if len(pro_ids) >= 1: pro_l = sorted([(k, v) for k, v in pro_ids.items()], key=lambda x: x[1], reverse=True) final_pro, score = pro_l[0] if score >= 0.01: pred_pro = idx_dic[final_pro]['返回名称'] big_area = idx_dic[final_pro]['大区'] # else: # print("得分过低,过滤掉", idx_dic[final_pro]['返回名称'], score) if pred_pro != "" and len(city_ids) >= 1: city_l = sorted([(k, v) for k, v in city_ids.items()], key=lambda x: x[1], reverse=True) for it in city_l: if idx_dic[it[0]]['省'] == final_pro: final_city = it[0] pred_city = idx_dic[final_city]['返回名称'] break if final_city != "" and len(set(dis_ids)) >= 1: dis_l = sorted([(k, v) for k, v in dis_ids.items()], key=lambda x: x[1], reverse=True) for it in dis_l: if idx_dic[it[0]]['市'] == final_city: pred_dis = idx_dic[it[0]]['返回名称'] if pred_city in ['北京', '天津', '上海', '重庆']: pred_city = pred_dis pred_dis = "" return big_area, pred_pro, pred_city, pred_dis def find_areas(pettern, text): ''' 通过正则匹配字符串返回地址 :param pettern: 地址正则 广东省|广西省|... :param text: 待匹配文本 :return: ''' addr = [] for it in re.finditer(pettern, text): if re.search('[省市区县旗盟]$', it.group(0)) == None and re.search( '^([东南西北中一二三四五六七八九十大小]?(村|镇|街|路|道|社区)|酒店|宾馆)', text[it.end():]): continue if it.group(0) == '站前': # 20240314 修复类似 中铁二局新建沪苏湖铁路工程站前VI标项目 错识别为 省份:辽宁, 城市:营口,区县:站前 continue addr.append((it.group(0), it.start(), it.end())) if re.search('^([分支](公司|局|行|校|院|干?线)|\w{,3}段|地铁|(火车|高铁)?站|\w{,3}项目)', text[it.end():]): addr.append((it.group(0), it.start(), it.end())) return addr def chage_area2score(group_list, max_len): ''' 把匹配的的地址转为分数 :param group_list: [('name', b, e)] :return: ''' area_list = [] if group_list != []: for it in group_list: name, b, e = it area_list.append((name, (e - b + e) / max_len / 2)) return area_list def get_pro_city_dis_score(text, text_weight=1): text = re.sub('复合肥|海南岛|兴业银行|双河口|阳光|杭州湾', ' ', text) text = re.sub('珠海城市', '珠海', text) # 修复 426624023 珠海城市 预测为海城市 text = re.sub('怒江州', '怒江傈僳族自治州', text) # 修复 423589589 所属地域:怒江州 识别为广西 - 崇左 - 江州 province_l = find_areas(p_pro, text) city_l = find_areas(p_city, text) district_l = find_areas(p_dis, text) if len(province_l) == len(city_l) == 0: district_l = [it for it in district_l if re.search('[市县旗区]$', it[0])] # 20240428去掉只有区县地址且不是全称的匹配,避免错误 例 凌云工业股份有限公司 提取地区为广西白色凌云 province_l = chage_area2score(province_l, max_len=len(text)) city_l = chage_area2score(city_l, max_len=len(text)) district_l = chage_area2score(district_l, max_len=len(text)) pro_ids = dict() city_ids = dict() dis_ids = dict() for pro in province_l: name, score = pro assert (name in full_dic['province'] or name in short_dic['province']) if name in full_dic['province']: idx = full_dic['province'][name] if idx not in pro_ids: pro_ids[idx] = 0 pro_ids[idx] += (score + 2) else: idx = short_dic['province'][name] if idx not in pro_ids: pro_ids[idx] = 0 pro_ids[idx] += (score + 1) for city in city_l: name, score = city if name in full_dic['city']: w = 0.1 if len(full_dic['city'][name]) > 1 else 1 for idx in full_dic['city'][name]: if idx not in city_ids: city_ids[idx] = 0 # weight = idx_dic[idx]['权重'] city_ids[idx] += (score + 2) * w pro_idx = idx_dic[idx]['省'] if pro_idx in pro_ids: pro_ids[pro_idx] += (score + 2) * w else: pro_ids[pro_idx] = (score + 2) * w * 0.5 elif name in short_dic['city']: w = 0.1 if len(short_dic['city'][name]) > 1 else 1 for idx in short_dic['city'][name]: if idx not in city_ids: city_ids[idx] = 0 weight = idx_dic[idx]['权重'] city_ids[idx] += (score + 1) * w * weight pro_idx = idx_dic[idx]['省'] if pro_idx in pro_ids: pro_ids[pro_idx] += (score + 1) * w * weight else: pro_ids[pro_idx] = (score + 1) * w * weight * 0.5 for dis in district_l: name, score = dis if name in full_dic['district']: w = 0.1 if len(full_dic['district'][name]) > 1 else 1 for idx in full_dic['district'][name]: if idx not in dis_ids: dis_ids[idx] = 0 # weight = idx_dic[idx]['权重'] dis_ids[idx] += (score + 1) * w pro_idx = idx_dic[idx]['省'] if pro_idx in pro_ids: pro_ids[pro_idx] += (score + 1) * w else: pro_ids[pro_idx] = (score + 1) * w * 0.5 city_idx = idx_dic[idx]['市'] if city_idx in city_ids: city_ids[city_idx] += (score + 1) * w else: city_ids[city_idx] = (score + 1) * w * 0.5 elif name in short_dic['district']: w = 0.1 if len(short_dic['district'][name]) > 1 else 1 for idx in short_dic['district'][name]: if idx not in dis_ids: dis_ids[idx] = 0 weight = idx_dic[idx]['权重'] dis_ids[idx] += (score + 0) * w pro_idx = idx_dic[idx]['省'] if pro_idx in pro_ids: pro_ids[pro_idx] += (score + 0) * w * weight else: pro_ids[pro_idx] = (score + 0) * w * weight * 0.5 city_idx = idx_dic[idx]['市'] if city_idx in city_ids: city_ids[city_idx] += (score + 0) * w * weight else: city_ids[city_idx] = (score + 0) * w * weight * 0.5 for k, v in pro_ids.items(): pro_ids[k] = v * text_weight for k, v in city_ids.items(): city_ids[k] = v * text_weight for k, v in dis_ids.items(): dis_ids[k] = v * text_weight return pro_ids, city_ids, dis_ids area_dic = {'area': '全国', 'province': '全国', 'city': '未知', 'district': '未知', "is_in_text": False} pro_ids, city_ids, dis_ids = get_pro_city_dis_score(text) pro_ids1, city_ids1, dis_ids1 = get_pro_city_dis_score(web_name[:3], text_weight=0.2) # 20240422 修改为站源名称只取前三字,避免类似 459056219 中金岭南阳光采购平台 错提取阳光 for k in pro_ids1: if k in pro_ids: pro_ids[k] += pro_ids1[k] else: pro_ids[k] = pro_ids1[k] for k in city_ids1: if k in city_ids: city_ids[k] += city_ids1[k] else: city_ids[k] = city_ids1[k] for k in dis_ids1: if k in dis_ids: dis_ids[k] += dis_ids1[k] else: dis_ids[k] = dis_ids1[k] big_area, pred_pro, pred_city, pred_dis = get_final_addr(pro_ids, city_ids, dis_ids) if big_area != "": area_dic['area'] = big_area if pred_pro != "": area_dic['province'] = pred_pro if pred_city != "": area_dic['city'] = pred_city if pred_dis != "": area_dic['district'] = pred_dis if in_content: area_dic['is_in_text'] = True return {'district': area_dic} def predict(self, text): return self.get_area(text=text, web_name='') if __name__ == '__main__': dist = DistrictExtractor() text = '浙江省杭州市余杭区崇贤街道运河路5-4号13幢312室' print(dist.predict(text=text))