# -*- coding: utf-8 -*- """ Created on Fri Jun 1 18:03:03 2018 @author: DONG """ import sys import os from flask import Flask, jsonify from flask import abort from flask import request sys.path.append(os.path.dirname(__file__)+"/..") os.environ["KERAS_BACKEND"] = "tensorflow" app = Flask(__name__) app.config['JSON_AS_ASCII'] = False import logging logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s') from text_classifier_pai.main import Text_Classifier import numpy as np import ctypes import inspect from threading import Thread import traceback import json import time import uuid import re from bs4 import BeautifulSoup os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "-1" sys.path.append(os.path.abspath(".")) classifier = Text_Classifier() #自定义jsonEncoder class MyEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, np.ndarray): return obj.tolist() elif isinstance(obj, bytes): return str(obj, encoding='utf-8') elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): return float(obj) return json.JSONEncoder.default(self, obj) def _async_raise(tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(thread): _async_raise(thread.ident, SystemExit) def article_limit(soup,limit_words=30000): sub_space = re.compile("\s+") def soup_limit(_soup,_count,max_count=30000,max_gap=500): """ :param _soup: soup :param _count: 当前字数 :param max_count: 字数最大限制 :param max_gap: 超过限制后的最大误差 :return: """ _gap = _count - max_count _is_skip = False next_soup = None while len(_soup.find_all(recursive=False)) == 1 and \ _soup.get_text(strip=True) == _soup.find_all(recursive=False)[0].get_text(strip=True): _soup = _soup.find_all(recursive=False)[0] if len(_soup.find_all(recursive=False)) == 0: _soup.string = str(_soup.get_text())[:max_count-_count] _count += len(re.sub(sub_space, "", _soup.string)) _gap = _count - max_count next_soup = None else: for _soup_part in _soup.find_all(recursive=False): if not _is_skip: _count += len(re.sub(sub_space, "", _soup_part.get_text())) if _count >= max_count: _gap = _count - max_count if _gap <= max_gap: _is_skip = True else: _is_skip = True next_soup = _soup_part _count -= len(re.sub(sub_space, "", _soup_part.get_text())) continue else: _soup_part.decompose() return _count,_gap,next_soup text_count = 0 have_attachment = False attachment_part = None for child in soup.find_all(recursive=True): if child.name == 'div' and 'class' in child.attrs: if "richTextFetch" in child['class']: child.insert_before("##attachment##") attachment_part = child have_attachment = True break if not have_attachment: # 无附件 if len(re.sub(sub_space, "", soup.get_text())) > limit_words: text_count,gap,n_soup = soup_limit(soup,text_count,max_count=limit_words,max_gap=500) while n_soup: text_count, gap, n_soup = soup_limit(n_soup, text_count, max_count=limit_words, max_gap=500) else: # 有附件 _text = re.sub(sub_space, "", soup.get_text()) _text_split = _text.split("##attachment##") if len(_text_split[0])>limit_words: main_soup = attachment_part.parent main_text = main_soup.find_all(recursive=False)[0] text_count, gap, n_soup = soup_limit(main_text, text_count, max_count=limit_words, max_gap=500) while n_soup: text_count, gap, n_soup = soup_limit(n_soup, text_count, max_count=limit_words, max_gap=500) if len(_text_split[1])>limit_words: # attachment_html纯文本,无子结构 if len(attachment_part.find_all(recursive=False))==0: attachment_part.string = str(attachment_part.get_text())[:limit_words] else: attachment_text_nums = 0 attachment_skip = False for part in attachment_part.find_all(recursive=False): if not attachment_skip: last_attachment_text_nums = attachment_text_nums attachment_text_nums = attachment_text_nums + len(re.sub(sub_space, "", part.get_text())) if attachment_text_nums>=limit_words: part.string = str(part.get_text())[:limit_words-last_attachment_text_nums] attachment_skip = True else: part.decompose() return soup def run_thread(data,list_result): # data = data.decode("utf8") # data = json.loads(data,encoding="utf8") k = str(uuid.uuid4()) cost_time = dict() if "doc_id" in data: _doc_id = data['doc_id'] else: _doc_id = "" if "title" in data: _title = data["title"] else: _title = "" data_res = "" try: if "content" in data: # logging.info("get request of doc_id:%s"%(_doc_id)) k = str(uuid.uuid4()) cost_time = dict() content = data['content'] if len(content)>50000: _soup = BeautifulSoup(content,"lxml") _soup = article_limit(_soup,50000) content = str(_soup) start_time = time.time() # print('准备预处理,文章内容:',content[:20]) process, ids = classifier.process([content]) # logging.info("get preprocessed done of doc_id%s"%(_doc_id)) # print('预处理完成') cost_time["preprocess"] = time.time()-start_time # cost_time.update(_cost_time) start_time = time.time() # print('开始预测') # with self.classifier.sess.graph.as_default(): logits, ids = classifier.predict(process, ids) # print('预测完成') # logging.info("get predict done of doc_id%s"%(_doc_id)) cost_time["predict"] = time.time()-start_time start_time = time.time() # print('准备提取结果') result = classifier.get_results(logits, ids) class_name = result[0][1] # 得到预测出来的分类名称 subclass, topclass = classifier.dic_label[class_name].split(',') # 根据名称查找大类和小类名称 # print('返回类别成功') # logging.info("get result done of doc_id%s"%(_doc_id)) cost_time["result"] = time.time()-start_time data_res = {"class":topclass, "class_name":class_name, "subclass":subclass} data_res["success"] = True data_res["cost_time"] = cost_time #print(prem) # data_res = {'predict':result[0][1]} # data_res["cost_time"] = cost_time # data_res["success"] = True #return json.dumps(Preprocessing.union_result(codeName, prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False) else: data_res = {"success":False,"msg":"content not passed"} except Exception as e: traceback.print_exc() data_res = {"success":False,"msg":str(e)} logging.error('Exception:%s'%str(e)) # 以json形式返回结果 #_resp = json.dumps(data_res,cls=MyEncoder) #print(str(data["flag"])+str(data)) logging.info("done for doc_id:%s with result:%s"%(_doc_id,str(data_res))) list_result.append(data_res) @app.route('/industry_extract', methods=['POST']) def text_predict(): try: data = request.json status_code = 200 if "timeout" in data: _timeout = data["timeout"] list_result = [] t = Thread(target=run_thread,args=(data,list_result)) start_time = time.time() t.start() t.join(_timeout) if t.is_alive(): stop_thread(t) status_code = 302#超时被kill data_res = {"success":False,"msg":"timeout"} else: data_res = list_result[0] _resp = json.dumps(data_res,cls=MyEncoder,ensure_ascii=False) return _resp,201 except Exception as e: traceback.print_exc() data_res = {"success":False,"msg":"error:%s"%(str(e))} _resp = json.dumps(data_res) return _resp,500 def getPort(argv): port = 15000 for item in argv: _l = str(item).split("port=") if len(_l)>1: port = int(_l[-1]) break return port if __name__ == '__main__': port = getPort(argv=sys.argv) app.run(host='0.0.0.0', port=port, threaded=True, debug=False) ("ContentExtractor running") # app.run()