#!/usr/bin/env python # -*-coding:utf-8-*- import sys import os import json from BiddingKG.dl.foolnltk.selffool.predictor import Predictor from BiddingKG.dl.common.Utils import * from zipfile import ZipFile OOV_STR = "" data_path = os.path.dirname(__file__)+"/../data" def _load_map_file(path, char_map_name, id_map_name): with ZipFile(path) as myzip: with myzip.open('all_map.json') as myfile: content = myfile.readline() content = content.decode() data = json.loads(content) return data.get(char_map_name), data.get(id_map_name) class LexicalAnalyzer(object): def __init__(self): self.initialized = False self.map = None self.seg_model = None self.pos_model = None self.ner_model = None self.data_path = data_path self.map_file_path = os.path.join(self.data_path, "map.zip") def _load_model(self, model_namel, word_map_name, tag_name,url,authorization): seg_model_path = os.path.join(self.data_path, model_namel) char_to_id, id_to_seg = _load_map_file(self.map_file_path, word_map_name, tag_name) return Predictor(seg_model_path, char_to_id, id_to_seg,url,authorization) def _load_seg_model(self,url=None,authorization=None): self.seg_model = self._load_model("seg.pb", "char_map", "seg_map",url,authorization) def _load_pos_model(self,url=None,authorization=None): self.pos_model = self._load_model("pos.pb", "word_map", "pos_map",url,authorization) def _load_ner_model(self,url=None,authorization=None): self.ner_model = self._load_model("ner.pb", "char_map", "ner_map",url,authorization) def pos(self, seg_words_list): if not self.pos_model: self._load_pos_model() pos_labels = self.pos_model.predict(seg_words_list) return pos_labels def ner_labels(self, text_list,tokens): if not self.ner_model: self._load_ner_model() assert len(text_list)==len(tokens) ner_labels = self.ner_model.predict(text_list) out = [] for index in range(len(ner_labels)): ner = ner_labels[index] token = tokens[index] out_item = [] i = -1 for item in token: i += len(item) if ner[i]=="O": out_item.append("O") else: out_item.append(ner[i].split("_")[1]) out.append(out_item) return out def ner(self, text_list): if not self.ner_model: self._load_ner_model() ner_labels = self.ner_model.predict(text_list) #print(ner_labels) all_entitys = [] for ti, text in enumerate(text_list): ens = [] entity = "" i = 0 ner_label = ner_labels[ti] chars = list(text) for label, word in zip(ner_label, chars): i += 1 if label == "O": continue lt = label.split("_")[1] lb = label.split("_")[0] if lb == "S": ens.append((i-1, i, lt, word)) elif lb == "B": entity = "" entity += word elif lb == "M": entity += word elif lb == "E": entity += word ens.append((i - len(entity), i, lt, entity)) entity = "" if entity: ens.append((i - len(entity), i, lt, entity)) all_entitys.append(ens) return all_entitys def cut(self, text_list): if not self.seg_model: self._load_seg_model(url=selffool_seg_url,authorization=selffool_seg_authorization) all_labels = self.seg_model.predict(text_list) sent_words = [] for ti, text in enumerate(text_list): words = [] N = len(text) seg_labels = all_labels[ti] tmp_word = "" for i in range(N): label = seg_labels[i] w = text[i] if label == "B": tmp_word += w elif label == "M": tmp_word += w elif label == "E": tmp_word += w words.append(tmp_word) tmp_word = "" else: tmp_word = "" words.append(w) if tmp_word: words.append(tmp_word) sent_words.append(words) return sent_words def analysis(self, text_list): words = self.cut(text_list) pos_labels = self.pos(words) ners = self.ner(text_list) word_inf = [list(zip(ws, ps)) for ws, ps in zip(words, pos_labels)] return word_inf, ners