# encoding=utf-8 import re import pickle import gensim import numpy as np import pandas as pd from pyhanlp import * import keras.backend as K from keras.preprocessing.sequence import pad_sequences def load(path): ''' pickle 加载pkl 文件 ''' with open(path, 'rb') as f: return pickle.load(f) def get_remove_word(): ''' 加载停用词、不重要的词 ''' stopwords_path = 'pickle_1/bidi_classify_stop_words.csv' # 停用词文件  df_stopwords = pd.read_csv(stopwords_path) remove_word = df_stopwords['stopword'].values.tolist() return remove_word def get_embedding(): ''' 加载文件,返回词典、keras tokennizer对象,词向量矩阵 ''' word_index = load('pickle_1/word_index_955871.pk') #加载词典文件 word:id tokenizer = load('pickle_1/tokenizer_955871.pk') # 加载训练后keras tokenizer对象 w2v_model_path = 'model/thr_100_model.vector' # 加载词向量文件 w2v_model = gensim.models.KeyedVectors.load_word2vec_format(w2v_model_path,binary=True) embedding_matrix = np.random.random((len(word_index) + 1, 100)) count_not_in_model = 0 count_in_model = 0 for word, i in word_index.items(): if word in w2v_model: count_in_model += 1 embedding_matrix[i] = np.asarray(w2v_model[word], dtype='float32') else: count_not_in_model += 1 return word_index, tokenizer, embedding_matrix def get_label(): ''' 加载标签字典,返回字典label_mapping {0: '安防系统', 1: '安全保护服务', 2: '安全保护设备' ; labels10 所有类别的中文名称 ''' label_mapping = load('pickle_1/label_mapping_f.pk') labels10 = list(label_mapping.values()) return label_mapping,labels10 def get_dic(): ''' 加载类别字典,估计是新旧类别: 豆类、油料和薯类种植': '农业,农、林、牧、渔业', '蔬菜、食用菌及园艺作物种植': '农业,农、林、牧、渔业' ''' dic_label_path = 'pickle_1/class_subclass_dic211.pk' dic_label = load(dic_label_path) return dic_label def model_in(r1, label_mapping, id): ''' 获取每个文章的中文类别名称 @Argus: r1:np.array 预测结果 ; label_mapping:分类类别字典 0: '安防系统 @Return:中文分类名称 ''' all_end = r1 aa2 = [] for i in range(all_end.shape[0]): c1 = label_mapping[np.argmax(all_end[i])] aa2.append(c1) union = [] for x in range(len(id)): union.append([id[x],aa2[x]]) return union def convertJlistToPlist(jList): ''' 将javaList 转为pythonlist ''' ret = [] if jList is None: return ret for i in range(jList.size()): ret.append(str(jList.get(i))) return ret def clean_RmWord(text, remove_word): ''' 去除没用的词语 ''' text_copy = text.copy() for i in text: if i in remove_word: text_copy.remove(i) text_copy = " ".join(text_copy) return text_copy def handle_doc1(article_set10_1, remove_word): ''' 句子分词并删除单字、重复、无关词语 @Argus: article_set10_1: 包含待处理字符串的Series @Return: 处理后的结果 ''' HanLP.Config = JClass('com.hankcs.hanlp.HanLP$Config') HanLP.Config.ShowTermNature = False article_set10_seg_1 = article_set10_1.map(lambda x: convertJlistToPlist(HanLP.segment(x))) article_set10_seg_1 = article_set10_seg_1.map(lambda x: ' '.join(word for word in x if len(word) > 1)) # 删除单个字 article_set10_seg_rm = article_set10_seg_1.map(lambda x: clean_RmWord(x.split(), remove_word)) # 删除无用、重复词语 article_set10_seg_rm = article_set10_seg_rm.map(lambda x: x.split()) return article_set10_seg_rm def cleanSeg(text): ''' 清除干扰字符(英文、日期、数字、标点符号) ''' text = re.sub('[a-zA-Z]', '', text) text = text.replace('\n', ' ') text = re.sub(r"-", " ", text) text = re.sub(r"\d+/\d/\d+", "", text) text = re.sub(r"[0-2]?[0-9]:[0-6][0-9]", "", text) text = re.sub(r"[\w]+@[\.\w]+", "", text) text = re.sub(r"/[a-zA-Z]*[:\//\]*[A-Za-z0-9\-_]+\.+[A-Za-z0-9\.\/%&=\?\-_]+/i", "", text) pure_text = '' for letter in text: if letter.isalpha() or letter == ' ': pure_text += letter text = ' '.join(word for word in pure_text.split() if len(word) > 1) text = text.replace(' ', '') return text def fetch_sub_data_1(data, num): ''' 获取文本前N个字符 ''' return data[:num] def data_set(text): ''' 保持顺序词语去重 ''' l2 = [] [l2.append(i) for i in text if i not in l2] return l2 def clean_word(article_set10,remove_word): """ 清理数据,清除符号、字母、数字等,统一文章长度,对句子进行分词,删除单字、重复、无关词语、停用词 :param article_set10: 原数据,list :param remove_word: 停用词表,list :return: Series """ article_set10_1 = pd.Series(article_set10) article_set10_1 = article_set10_1.map(lambda x: cleanSeg(x)) # 清除干扰字符(英文、日期、数字、标点符号) article_set10_1 = article_set10_1.map(lambda x: fetch_sub_data_1(x, 500)) # 获取文本前N个字符 # test article_set10_seg_rm = handle_doc1(article_set10_1, remove_word) # 句子分词并删除单字、重复、无关词语 # test x_train_df_10 = article_set10_seg_rm.copy() x_train_df_10 = x_train_df_10.map(lambda x: data_set(x)) # 保持顺序词语去重 return x_train_df_10 def clean_word_with_tokenizer(article_set10,remove_word,tokenizer): """ 清理数据,清除符号、字母、数字、停用词,分词 :param article_set10: 原数据,list :param remove_word: 停用词表,list :return: Series """ id = [i[0] for i in article_set10] article_set10 = [i[1] for i in article_set10] article_set10_1 = pd.Series(article_set10) article_set10_1 = article_set10_1.map(lambda x: cleanSeg(x)) article_set10_1 = article_set10_1.map(lambda x: fetch_sub_data_1(x, 500)) # test article_set10_seg_rm = handle_doc1(article_set10_1, remove_word) # print(article_set10_seg_rm) # test x_train_df_10 = article_set10_seg_rm.copy() sequences = tokenizer.texts_to_sequences(x_train_df_10) padded_sequences = pad_sequences(sequences, maxlen=100, padding='post', truncating='post',value=0.0) # left_word = [x[:-1] for x in padded_sequences] # right_word = [x[1:] for x in padded_sequences] # left_pad = pad_sequences(left_word, maxlen=100, value=0.0) # right_pad = pad_sequences(right_word, maxlen=100, padding='post', truncating='post', value=0.0) return padded_sequences, id def recall(y_true, y_pred): ''' 计算召回率 @Argus: y_true: 正确的标签 y_pred: 模型预测的标签 @Return 召回率 ''' c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) c3 = K.sum(K.round(K.clip(y_true, 0, 1))) if c3 == 0: return 0 recall = c1 / c3 return recall def f1_score(y_true, y_pred): ''' 计算F1 @Argus: y_true: 正确的标签 y_pred: 模型预测的标签 @Return F1值 ''' c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) c2 = K.sum(K.round(K.clip(y_pred, 0, 1))) c3 = K.sum(K.round(K.clip(y_true, 0, 1))) precision = c1 / c2 if c3 == 0: recall = 0 else: recall = c1 / c3 f1_score = 2 * (precision * recall) / (precision + recall) return f1_score def precision(y_true, y_pred): ''' 计算精确率 @Argus: y_true: 正确的标签 y_pred: 模型预测的标签 @Return 精确率 ''' c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1))) c2 = K.sum(K.round(K.clip(y_pred, 0, 1))) precision = c1 / c2 return precision if __name__ == '__main__': dic_label = get_dic() print(dic_label)