123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- # encoding=utf-8
- import re
- import pickle
- import gensim
- import numpy as np
- import pandas as pd
- from pyhanlp import *
- import keras.backend as K
- from keras.preprocessing.sequence import pad_sequences
- def load(path):
- '''
- pickle 加载pkl 文件
- '''
- with open(path, 'rb') as f:
- return pickle.load(f)
- def get_remove_word():
- '''
- 加载停用词、不重要的词
- '''
- stopwords_path = 'pickle_1/bidi_classify_stop_words.csv' # 停用词文件
- df_stopwords = pd.read_csv(stopwords_path)
- remove_word = df_stopwords['stopword'].values.tolist()
- return remove_word
- def get_embedding():
- '''
- 加载文件,返回词典、keras tokennizer对象,词向量矩阵
- '''
- word_index = load('pickle_1/word_index_955871.pk') #加载词典文件 word:id
- tokenizer = load('pickle_1/tokenizer_955871.pk') # 加载训练后keras tokenizer对象
- w2v_model_path = 'model/thr_100_model.vector' # 加载词向量文件
- w2v_model = gensim.models.KeyedVectors.load_word2vec_format(w2v_model_path,binary=True)
- embedding_matrix = np.random.random((len(word_index) + 1, 100))
- count_not_in_model = 0
- count_in_model = 0
- for word, i in word_index.items():
- if word in w2v_model:
- count_in_model += 1
- embedding_matrix[i] = np.asarray(w2v_model[word], dtype='float32')
- else:
- count_not_in_model += 1
- return word_index, tokenizer, embedding_matrix
- def get_label():
- '''
- 加载标签字典,返回字典label_mapping {0: '安防系统', 1: '安全保护服务', 2: '安全保护设备' ; labels10 所有类别的中文名称
- '''
- label_mapping = load('pickle_1/label_mapping_f.pk')
- labels10 = list(label_mapping.values())
- return label_mapping,labels10
- def get_dic():
- '''
- 加载类别字典,估计是新旧类别: 豆类、油料和薯类种植': '农业,农、林、牧、渔业', '蔬菜、食用菌及园艺作物种植': '农业,农、林、牧、渔业'
- '''
- dic_label_path = 'pickle_1/class_subclass_dic211.pk'
- dic_label = load(dic_label_path)
- return dic_label
- def model_in(r1, label_mapping, id):
- '''
- 获取每个文章的中文类别名称
- @Argus: r1:np.array 预测结果 ; label_mapping:分类类别字典 0: '安防系统
- @Return:中文分类名称
- '''
- all_end = r1
- aa2 = []
- for i in range(all_end.shape[0]):
- c1 = label_mapping[np.argmax(all_end[i])]
- aa2.append(c1)
- union = []
- for x in range(len(id)):
- union.append([id[x],aa2[x]])
- return union
- def convertJlistToPlist(jList):
- '''
- 将javaList 转为pythonlist
- '''
- ret = []
- if jList is None:
- return ret
- for i in range(jList.size()):
- ret.append(str(jList.get(i)))
- return ret
- def clean_RmWord(text, remove_word):
- '''
- 去除没用的词语
- '''
- text_copy = text.copy()
- for i in text:
- if i in remove_word:
- text_copy.remove(i)
- text_copy = " ".join(text_copy)
- return text_copy
- def handle_doc1(article_set10_1, remove_word):
- '''
- 句子分词并删除单字、重复、无关词语
- @Argus: article_set10_1: 包含待处理字符串的Series
- @Return: 处理后的结果
- '''
- HanLP.Config = JClass('com.hankcs.hanlp.HanLP$Config')
- HanLP.Config.ShowTermNature = False
- article_set10_seg_1 = article_set10_1.map(lambda x: convertJlistToPlist(HanLP.segment(x)))
- article_set10_seg_1 = article_set10_seg_1.map(lambda x: ' '.join(word for word in x if len(word) > 1)) # 删除单个字
- article_set10_seg_rm = article_set10_seg_1.map(lambda x: clean_RmWord(x.split(), remove_word)) # 删除无用、重复词语
- article_set10_seg_rm = article_set10_seg_rm.map(lambda x: x.split())
- return article_set10_seg_rm
- def cleanSeg(text):
- '''
- 清除干扰字符(英文、日期、数字、标点符号)
- '''
- text = re.sub('[a-zA-Z]', '', text)
- text = text.replace('\n', ' ')
- text = re.sub(r"-", " ", text)
- text = re.sub(r"\d+/\d/\d+", "", text)
- text = re.sub(r"[0-2]?[0-9]:[0-6][0-9]", "", text)
- text = re.sub(r"[\w]+@[\.\w]+", "", text)
- text = re.sub(r"/[a-zA-Z]*[:\//\]*[A-Za-z0-9\-_]+\.+[A-Za-z0-9\.\/%&=\?\-_]+/i", "", text)
- pure_text = ''
- for letter in text:
- if letter.isalpha() or letter == ' ':
- pure_text += letter
- text = ' '.join(word for word in pure_text.split() if len(word) > 1)
- text = text.replace(' ', '')
- return text
- def fetch_sub_data_1(data, num):
- '''
- 获取文本前N个字符
- '''
- return data[:num]
- def data_set(text):
- '''
- 保持顺序词语去重
- '''
- l2 = []
- [l2.append(i) for i in text if i not in l2]
- return l2
- def clean_word(article_set10,remove_word):
- """
- 清理数据,清除符号、字母、数字等,统一文章长度,对句子进行分词,删除单字、重复、无关词语、停用词
- :param article_set10: 原数据,list
- :param remove_word: 停用词表,list
- :return: Series
- """
- article_set10_1 = pd.Series(article_set10)
- article_set10_1 = article_set10_1.map(lambda x: cleanSeg(x)) # 清除干扰字符(英文、日期、数字、标点符号)
- article_set10_1 = article_set10_1.map(lambda x: fetch_sub_data_1(x, 500)) # 获取文本前N个字符
- # test
- article_set10_seg_rm = handle_doc1(article_set10_1, remove_word) # 句子分词并删除单字、重复、无关词语
- # test
- x_train_df_10 = article_set10_seg_rm.copy()
- x_train_df_10 = x_train_df_10.map(lambda x: data_set(x)) # 保持顺序词语去重
- return x_train_df_10
- def clean_word_with_tokenizer(article_set10,remove_word,tokenizer):
- """
- 清理数据,清除符号、字母、数字、停用词,分词
- :param article_set10: 原数据,list
- :param remove_word: 停用词表,list
- :return: Series
- """
- id = [i[0] for i in article_set10]
- article_set10 = [i[1] for i in article_set10]
- article_set10_1 = pd.Series(article_set10)
- article_set10_1 = article_set10_1.map(lambda x: cleanSeg(x))
- article_set10_1 = article_set10_1.map(lambda x: fetch_sub_data_1(x, 500))
- # test
- article_set10_seg_rm = handle_doc1(article_set10_1, remove_word)
- # print(article_set10_seg_rm)
- # test
- x_train_df_10 = article_set10_seg_rm.copy()
- sequences = tokenizer.texts_to_sequences(x_train_df_10)
- padded_sequences = pad_sequences(sequences, maxlen=100, padding='post', truncating='post',value=0.0)
- # left_word = [x[:-1] for x in padded_sequences]
- # right_word = [x[1:] for x in padded_sequences]
- # left_pad = pad_sequences(left_word, maxlen=100, value=0.0)
- # right_pad = pad_sequences(right_word, maxlen=100, padding='post', truncating='post', value=0.0)
- return padded_sequences, id
- def recall(y_true, y_pred):
- '''
- 计算召回率
- @Argus:
- y_true: 正确的标签
- y_pred: 模型预测的标签
- @Return
- 召回率
- '''
- c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
- c3 = K.sum(K.round(K.clip(y_true, 0, 1)))
- if c3 == 0:
- return 0
- recall = c1 / c3
- return recall
- def f1_score(y_true, y_pred):
- '''
- 计算F1
- @Argus:
- y_true: 正确的标签
- y_pred: 模型预测的标签
- @Return
- F1值
- '''
- c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
- c2 = K.sum(K.round(K.clip(y_pred, 0, 1)))
- c3 = K.sum(K.round(K.clip(y_true, 0, 1)))
- precision = c1 / c2
- if c3 == 0:
- recall = 0
- else:
- recall = c1 / c3
- f1_score = 2 * (precision * recall) / (precision + recall)
- return f1_score
- def precision(y_true, y_pred):
- '''
- 计算精确率
- @Argus:
- y_true: 正确的标签
- y_pred: 模型预测的标签
- @Return
- 精确率
- '''
- c1 = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
- c2 = K.sum(K.round(K.clip(y_pred, 0, 1)))
- precision = c1 / c2
- return precision
- if __name__ == '__main__':
- dic_label = get_dic()
- print(dic_label)
|