123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- # -*- coding: utf-8 -*-
- """
- Created on Fri Jun 1 18:03:03 2018
- @author: DONG
- """
- import sys
- import os
- from flask import Flask, jsonify
- from flask import abort
- from flask import request
- sys.path.append(os.path.dirname(__file__)+"/..")
- os.environ["KERAS_BACKEND"] = "tensorflow"
- app = Flask(__name__)
- app.config['JSON_AS_ASCII'] = False
- import logging
- logging.basicConfig(level = logging.INFO,format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- from text_classifier_pai.main import Text_Classifier
- import numpy as np
- import ctypes
- import inspect
- from threading import Thread
- import traceback
- import json
- import time
- import uuid
- import re
- from bs4 import BeautifulSoup
- os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- sys.path.append(os.path.abspath("."))
- classifier = Text_Classifier()
- #自定义jsonEncoder
- class MyEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, np.ndarray):
- return obj.tolist()
- elif isinstance(obj, bytes):
- return str(obj, encoding='utf-8')
- elif isinstance(obj, (np.float_, np.float16, np.float32,
- np.float64)):
- return float(obj)
- return json.JSONEncoder.default(self, obj)
- def _async_raise(tid, exctype):
- """raises the exception, performs cleanup if needed"""
- tid = ctypes.c_long(tid)
- if not inspect.isclass(exctype):
- exctype = type(exctype)
- res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
- if res == 0:
- raise ValueError("invalid thread id")
- elif res != 1:
- ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
- raise SystemError("PyThreadState_SetAsyncExc failed")
- def stop_thread(thread):
- _async_raise(thread.ident, SystemExit)
- def article_limit(soup,limit_words=30000):
- sub_space = re.compile("\s+")
- def soup_limit(_soup,_count,max_count=30000,max_gap=500):
- """
- :param _soup: soup
- :param _count: 当前字数
- :param max_count: 字数最大限制
- :param max_gap: 超过限制后的最大误差
- :return:
- """
- _gap = _count - max_count
- _is_skip = False
- next_soup = None
- while len(_soup.find_all(recursive=False)) == 1 and \
- _soup.get_text(strip=True) == _soup.find_all(recursive=False)[0].get_text(strip=True):
- _soup = _soup.find_all(recursive=False)[0]
- if len(_soup.find_all(recursive=False)) == 0:
- _soup.string = str(_soup.get_text())[:max_count-_count]
- _count += len(re.sub(sub_space, "", _soup.string))
- _gap = _count - max_count
- next_soup = None
- else:
- for _soup_part in _soup.find_all(recursive=False):
- if not _is_skip:
- _count += len(re.sub(sub_space, "", _soup_part.get_text()))
- if _count >= max_count:
- _gap = _count - max_count
- if _gap <= max_gap:
- _is_skip = True
- else:
- _is_skip = True
- next_soup = _soup_part
- _count -= len(re.sub(sub_space, "", _soup_part.get_text()))
- continue
- else:
- _soup_part.decompose()
- return _count,_gap,next_soup
- text_count = 0
- have_attachment = False
- attachment_part = None
- for child in soup.find_all(recursive=True):
- if child.name == 'div' and 'class' in child.attrs:
- if "richTextFetch" in child['class']:
- child.insert_before("##attachment##")
- attachment_part = child
- have_attachment = True
- break
- if not have_attachment:
- # 无附件
- if len(re.sub(sub_space, "", soup.get_text())) > limit_words:
- text_count,gap,n_soup = soup_limit(soup,text_count,max_count=limit_words,max_gap=500)
- while n_soup:
- text_count, gap, n_soup = soup_limit(n_soup, text_count, max_count=limit_words, max_gap=500)
- else:
- # 有附件
- _text = re.sub(sub_space, "", soup.get_text())
- _text_split = _text.split("##attachment##")
- if len(_text_split[0])>limit_words:
- main_soup = attachment_part.parent
- main_text = main_soup.find_all(recursive=False)[0]
- text_count, gap, n_soup = soup_limit(main_text, text_count, max_count=limit_words, max_gap=500)
- while n_soup:
- text_count, gap, n_soup = soup_limit(n_soup, text_count, max_count=limit_words, max_gap=500)
- if len(_text_split[1])>limit_words:
- # attachment_html纯文本,无子结构
- if len(attachment_part.find_all(recursive=False))==0:
- attachment_part.string = str(attachment_part.get_text())[:limit_words]
- else:
- attachment_text_nums = 0
- attachment_skip = False
- for part in attachment_part.find_all(recursive=False):
- if not attachment_skip:
- last_attachment_text_nums = attachment_text_nums
- attachment_text_nums = attachment_text_nums + len(re.sub(sub_space, "", part.get_text()))
- if attachment_text_nums>=limit_words:
- part.string = str(part.get_text())[:limit_words-last_attachment_text_nums]
- attachment_skip = True
- else:
- part.decompose()
- return soup
- def run_thread(data,list_result):
- # data = data.decode("utf8")
- # data = json.loads(data,encoding="utf8")
- k = str(uuid.uuid4())
- cost_time = dict()
- if "doc_id" in data:
- _doc_id = data['doc_id']
- else:
- _doc_id = ""
- if "title" in data:
- _title = data["title"]
- else:
- _title = ""
- data_res = ""
- try:
- if "content" in data:
- # logging.info("get request of doc_id:%s"%(_doc_id))
- k = str(uuid.uuid4())
- cost_time = dict()
- content = data['content']
- if len(content)>50000:
- _soup = BeautifulSoup(content,"lxml")
- _soup = article_limit(_soup,50000)
- content = str(_soup)
- start_time = time.time()
- # print('准备预处理,文章内容:',content[:20])
- process, ids = classifier.process([content])
- # logging.info("get preprocessed done of doc_id%s"%(_doc_id))
- # print('预处理完成')
- cost_time["preprocess"] = time.time()-start_time
- # cost_time.update(_cost_time)
- start_time = time.time()
- # print('开始预测')
- # with self.classifier.sess.graph.as_default():
- logits, ids = classifier.predict(process, ids)
- # print('预测完成')
- # logging.info("get predict done of doc_id%s"%(_doc_id))
- cost_time["predict"] = time.time()-start_time
- start_time = time.time()
- # print('准备提取结果')
- result = classifier.get_results(logits, ids)
- class_name = result[0][1] # 得到预测出来的分类名称
- subclass, topclass = classifier.dic_label[class_name].split(',') # 根据名称查找大类和小类名称
- # print('返回类别成功')
- # logging.info("get result done of doc_id%s"%(_doc_id))
- cost_time["result"] = time.time()-start_time
- data_res = {"class":topclass, "class_name":class_name, "subclass":subclass}
- data_res["success"] = True
- data_res["cost_time"] = cost_time
- #print(prem)
- # data_res = {'predict':result[0][1]}
- # data_res["cost_time"] = cost_time
- # data_res["success"] = True
- #return json.dumps(Preprocessing.union_result(codeName, prem)[0][1],cls=MyEncoder,sort_keys=True,indent=4,ensure_ascii=False)
- else:
- data_res = {"success":False,"msg":"content not passed"}
- except Exception as e:
- traceback.print_exc()
- data_res = {"success":False,"msg":str(e)}
- logging.error('Exception:%s'%str(e))
- # 以json形式返回结果
- #_resp = json.dumps(data_res,cls=MyEncoder)
- #print(str(data["flag"])+str(data))
- logging.info("done for doc_id:%s with result:%s"%(_doc_id,str(data_res)))
- list_result.append(data_res)
- @app.route('/industry_extract', methods=['POST'])
- def text_predict():
- try:
- data = request.json
- status_code = 200
- if "timeout" in data:
- _timeout = data["timeout"]
- list_result = []
- t = Thread(target=run_thread,args=(data,list_result))
- start_time = time.time()
- t.start()
- t.join(_timeout)
- if t.is_alive():
- stop_thread(t)
- status_code = 302#超时被kill
- data_res = {"success":False,"msg":"timeout"}
- else:
- data_res = list_result[0]
- _resp = json.dumps(data_res,cls=MyEncoder,ensure_ascii=False)
- return _resp,201
- except Exception as e:
- traceback.print_exc()
- data_res = {"success":False,"msg":"error:%s"%(str(e))}
- _resp = json.dumps(data_res)
- return _resp,500
- def getPort(argv):
- port = 15000
- for item in argv:
- _l = str(item).split("port=")
- if len(_l)>1:
- port = int(_l[-1])
- break
- return port
- if __name__ == '__main__':
- port = getPort(argv=sys.argv)
- app.run(host='0.0.0.0', port=port, threaded=True, debug=False)
- ("ContentExtractor running")
- # app.run()
|