#-*- coding: utf-8 -*- import copy import difflib import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from format_convert.convert_doc import doc2text from format_convert.convert_docx import docx2text from format_convert.convert_image import picture2text from format_convert.convert_pdf import pdf2text, PDFConvert from format_convert.convert_rar import rar2text from format_convert.convert_swf import swf2text from format_convert.convert_txt import txt2text from format_convert.convert_xls import xls2text from format_convert.convert_xlsx import xlsx2text from format_convert.convert_zip import zip2text import codecs import gc import hashlib import io import json import multiprocessing import sys import subprocess import PyPDF2 import lxml import pdfminer from PIL import Image from format_convert import get_memory_info from ocr import ocr_interface from ocr.ocr_interface import ocr, OcrModels from otr import otr_interface from otr.otr_interface import otr, OtrModels import re import shutil import signal import sys import base64 import time import traceback import uuid from os.path import basename import cv2 import fitz import pandas import docx import zipfile import mimetypes import filetype # import pdfplumber import psutil import requests import rarfile from PyPDF2 import PdfFileReader, PdfFileWriter import xml.dom.minidom import subprocess import logging from pdfminer.pdfparser import PDFParser from pdfminer.pdfdocument import PDFDocument from pdfminer.pdfpage import PDFPage from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter from pdfminer.converter import PDFPageAggregator from pdfminer.layout import LTTextBoxHorizontal, LAParams, LTFigure, LTImage, LTCurve, LTText, LTChar import logging import chardet from bs4 import BeautifulSoup from format_convert.libreoffice_interface import office_convert from format_convert.swf.export import SVGExporter logging.getLogger("pdfminer").setLevel(logging.WARNING) from format_convert.table_correct import * from format_convert.swf.movie import SWF import logging # import timeout_decorator from format_convert import timeout_decorator logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # txt doc docx xls xlsx pdf zip rar swf jpg jpeg png # def judge_error_code(_list, code=[-1, -2, -3, -4, -5, -7]): # for c in code: # if _list == [c]: # return True # return False # # # def set_timeout(signum, frame): # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # # raise TimeoutError # # # def log_traceback(func_name): # logging.info(func_name) # etype, value, tb = sys.exc_info() # for line in traceback.TracebackException( # type(value), value, tb, limit=None).format(chain=True): # logging.info(line) # # # def judge_format(path): # guess1 = mimetypes.guess_type(path) # _type = None # if guess1[0]: # _type = guess1[0] # else: # guess2 = filetype.guess(path) # if guess2: # _type = guess2.mime # # if _type == "application/pdf": # return "pdf" # if _type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": # return "docx" # if _type == "application/x-zip-compressed" or _type == "application/zip": # return "zip" # if _type == "application/x-rar-compressed" or _type == "application/rar": # return "rar" # if _type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": # return "xlsx" # if _type == "application/msword": # return "doc" # if _type == "image/png": # return "png" # if _type == "image/jpeg": # return "jpg" # # # 猜不到,返回None # return None # # # @get_memory_info.memory_decorator # def txt2text(path): # logging.info("into txt2text") # try: # # 判断字符编码 # with open(path, "rb") as ff: # data = ff.read() # encode = chardet.detect(data).get("encoding") # print("txt2text judge code is", encode) # # try: # if encode is None: # logging.info("txt2text cannot judge file code!") # return [-3] # with open(path, "r", encoding=encode) as ff: # txt_text = ff.read() # return [txt_text] # except: # logging.info("txt2text cannot open file with code " + encode) # return [-3] # except Exception as e: # print("txt2text", traceback.print_exc()) # logging.info("txt2text error!") # return [-1] # # # @get_memory_info.memory_decorator # def doc2text(path, unique_type_dir): # logging.info("into doc2text") # try: # # 调用office格式转换 # file_path = from_office_interface(path, unique_type_dir, 'docx') # # if file_path == [-3]: # # return [-3] # if judge_error_code(file_path): # return file_path # # text = docx2text(file_path, unique_type_dir) # return text # except Exception as e: # logging.info("doc2text error!") # print("doc2text", traceback.print_exc()) # # log_traceback("doc2text") # return [-1] # # # @get_memory_info.memory_decorator # def read_xml_order(path, save_path): # logging.info("into read_xml_order") # try: # try: # f = zipfile.ZipFile(path) # for file in f.namelist(): # if "word/document.xml" == str(file): # f.extract(file, save_path) # f.close() # except Exception as e: # # print("docx format error!", e) # logging.info("docx format error!") # return [-3] # # # DOMTree = xml.dom.minidom.parse(save_path + "word/document.xml") # # collection = DOMTree.documentElement # # try: # collection = xml_analyze(save_path + "word/document.xml") # except TimeoutError: # logging.info("read_xml_order timeout") # return [-4] # # body = collection.getElementsByTagName("w:body")[0] # order_list = [] # for line in body.childNodes: # # print(str(line)) # if "w:p" in str(line): # text = line.getElementsByTagName("w:t") # picture = line.getElementsByTagName("wp:docPr") # if text: # order_list.append("w:t") # if picture: # order_list.append("wp:docPr") # # for line1 in line.childNodes: # if "w:r" in str(line1): # # print("read_xml_order", "w:r") # picture1 = line1.getElementsByTagName("w:pict") # if picture1: # order_list.append("wp:docPr") # # if "w:tbl" in str(line): # order_list.append("w:tbl") # read_xml_table(path, save_path) # return order_list # except Exception as e: # logging.info("read_xml_order error!") # print("read_xml_order", traceback.print_exc()) # # log_traceback("read_xml_order") # return [-1] # # # @get_memory_info.memory_decorator # def read_xml_table(path, save_path): # logging.info("into read_xml_table") # try: # # print("into read_xml_table") # try: # f = zipfile.ZipFile(path) # for file in f.namelist(): # if "word/document.xml" == str(file): # f.extract(file, save_path) # f.close() # except Exception as e: # # print("docx format error!", e) # logging.info("docx format error!") # return [-3] # # # DOMTree = xml.dom.minidom.parse(save_path + "word/document.xml") # # collection = DOMTree.documentElement # # try: # collection = xml_analyze(save_path + "word/document.xml") # except TimeoutError: # logging.info("read_xml_table timeout") # return [-4] # # body = collection.getElementsByTagName("w:body")[0] # table_text_list = [] # # print("body.childNodes", body.childNodes) # for line in body.childNodes: # if "w:tbl" in str(line): # # print("str(line)", str(line)) # table_text = '' + "\n" # tr_list = line.getElementsByTagName("w:tr") # # print("line.childNodes", line.childNodes) # tr_index = 0 # tr_text_list = [] # tr_text_list_colspan = [] # for tr in tr_list: # table_text = table_text + "" + "\n" # tc_list = tr.getElementsByTagName("w:tc") # tc_index = 0 # tc_text_list = [] # for tc in tc_list: # tc_text = "" # # # 获取一格占多少列 # col_span = tc.getElementsByTagName("w:gridSpan") # if col_span: # col_span = int(col_span[0].getAttribute("w:val")) # else: # col_span = 1 # # # 获取是否是合并单元格的下一个空单元格 # is_merge = tc.getElementsByTagName("w:vMerge") # if is_merge: # is_merge = is_merge[0].getAttribute("w:val") # if is_merge == "continue": # col_span_index = 0 # real_tc_index = 0 # # # if get_platform() == "Windows": # # print("read_xml_table tr_text_list", tr_text_list) # # print("read_xml_table tr_index", tr_index) # # if 0 <= tr_index - 1 < len(tr_text_list): # for tc_colspan in tr_text_list[tr_index - 1]: # if col_span_index < tc_index: # col_span_index += tc_colspan[1] # real_tc_index += 1 # # # print("tr_index-1, real_tc_index", tr_index-1, real_tc_index) # # print(tr_text_list[tr_index-1]) # if real_tc_index < len(tr_text_list[tr_index - 1]): # tc_text = tr_text_list[tr_index - 1][real_tc_index][0] # # table_text = table_text + "" + "\n" # tc_index += 1 # tc_text_list.append([tc_text, col_span]) # table_text += "" + "\n" # tr_index += 1 # tr_text_list.append(tc_text_list) # table_text += "
" + "\n" # p_list = tc.getElementsByTagName("w:p") # # for p in p_list: # t = p.getElementsByTagName("w:t") # if t: # for tt in t: # # print("tt", tt.childNodes) # if len(tt.childNodes) > 0: # tc_text += tt.childNodes[0].nodeValue # tc_text += "\n" # # table_text = table_text + tc_text + "
" + "\n" # table_text_list.append(table_text) # return table_text_list # # except Exception as e: # logging.info("read_xml_table error") # print("read_xml_table", traceback.print_exc()) # # log_traceback("read_xml_table") # return [-1] # # # @get_memory_info.memory_decorator # @timeout_decorator.timeout(300, timeout_exception=TimeoutError) # def xml_analyze(path): # # 解析xml # DOMTree = xml.dom.minidom.parse(path) # collection = DOMTree.documentElement # return collection # # # def read_docx_table(document): # table_text_list = [] # for table in document.tables: # table_text = "\n" # print("==================") # for row in table.rows: # table_text += "\n" # for cell in row.cells: # table_text += "\n" # table_text += "\n" # table_text += "
" + cell.text + "
\n" # print(table_text) # table_text_list.append(table_text) # return table_text_list # # # @get_memory_info.memory_decorator # def docx2text(path, unique_type_dir): # logging.info("into docx2text") # try: # try: # doc = docx.Document(path) # except Exception as e: # print("docx format error!", e) # print(traceback.print_exc()) # logging.info("docx format error!") # return [-3] # # # 遍历段落 # # print("docx2text extract paragraph") # paragraph_text_list = [] # for paragraph in doc.paragraphs: # if paragraph.text != "": # paragraph_text_list.append("
" + paragraph.text + "
" + "\n") # # print("paragraph_text", paragraph.text) # # # 遍历表 # try: # table_text_list = read_xml_table(path, unique_type_dir) # except TimeoutError: # return [-4] # # if judge_error_code(table_text_list): # return table_text_list # # # 顺序遍历图片 # # print("docx2text extract image") # image_text_list = [] # temp_image_path = unique_type_dir + "temp_image.png" # pattern = re.compile('rId\d+') # for graph in doc.paragraphs: # for run in graph.runs: # if run.text == '': # try: # if not pattern.search(run.element.xml): # continue # content_id = pattern.search(run.element.xml).group(0) # content_type = doc.part.related_parts[content_id].content_type # except Exception as e: # print("docx no image!", e) # continue # if not content_type.startswith('image'): # continue # # # 写入临时文件 # img_data = doc.part.related_parts[content_id].blob # with open(temp_image_path, 'wb') as f: # f.write(img_data) # # # if get_platform() == "Windows": # # print("img_data", img_data) # # if img_data is None: # continue # # # 识别图片文字 # image_text = picture2text(temp_image_path) # if image_text == [-2]: # return [-2] # if image_text == [-1]: # return [-1] # if image_text == [-3]: # continue # # image_text = image_text[0] # image_text_list.append(add_div(image_text)) # # # 解析document.xml,获取文字顺序 # # print("docx2text extract order") # order_list = read_xml_order(path, unique_type_dir) # if order_list == [-2]: # return [-2] # if order_list == [-1]: # return [-1] # # text = "" # print("len(order_list)", len(order_list)) # print("len(paragraph_text_list)", len(paragraph_text_list)) # print("len(image_text_list)", len(image_text_list)) # print("len(table_text_list)", len(table_text_list)) # # # log("docx2text output in order") # for tag in order_list: # if tag == "w:t": # if len(paragraph_text_list) > 0: # text += paragraph_text_list.pop(0) # if tag == "wp:docPr": # if len(image_text_list) > 0: # text += image_text_list.pop(0) # if tag == "w:tbl": # if len(table_text_list) > 0: # text += table_text_list.pop(0) # return [text] # except Exception as e: # # print("docx2text", e, global_type) # logging.info("docx2text error!") # print("docx2text", traceback.print_exc()) # # log_traceback("docx2text") # return [-1] # # # def add_div(text): # if text == "" or text is None: # return text # # if get_platform() == "Windows": # print("add_div", text) # if re.findall("
", text): # return text # # text = "
" + text + "\n" # text = re.sub("\n", "
\n
", text) # # text += "
" # if text[-5:] == "
": # print("add_div has cut", text[-30:]) # text = text[:-5] # return text # # # @get_memory_info.memory_decorator # def pdf2Image(path, save_dir): # logging.info("into pdf2Image") # try: # try: # doc = fitz.open(path) # except Exception as e: # logging.info("pdf format error!") # # print("pdf format error!", e) # return [-3] # # # output_image_list = [] # output_image_dict = {} # page_count = doc.page_count # for page_no in range(page_count): # # 限制pdf页数,只取前10页后10页 # if page_count > 20: # if 10 <= page_no < page_count-10: # # logging.info("pdf2Image: pdf pages count " + str(doc.page_count) # # + ", only get 70 pages") # continue # # try: # page = doc.loadPage(page_no) # output = save_dir + "_page" + str(page_no) + ".png" # rotate = int(0) # # 每个尺寸的缩放系数为1.3,这将为我们生成分辨率提高2.6的图像。 # # 此处若是不做设置,默认图片大小为:792X612, dpi=96 # # (1.33333333 --> 1056x816) (2 --> 1584x1224) # # (1.183, 2.28 --> 1920x1080) # zoom_x = 3. # zoom_y = 3. # # mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) # mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) # pix = page.getPixmap(matrix=mat, alpha=False) # pix.writePNG(output) # pdf_image = cv2.imread(output) # print("pdf_image", page_no, pdf_image.shape) # # output_image_list.append([page_no, output]) # output_image_dict[int(page_no)] = output # except ValueError as e: # traceback.print_exc() # if str(e) == "page not in document": # logging.info("pdf2Image page not in document! continue..." + str(page_no)) # continue # elif "encrypted" in str(e): # logging.info("pdf2Image document need password " + str(page_no)) # return [-7] # except RuntimeError as e: # if "cannot find page" in str(e): # logging.info("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e)) # continue # else: # traceback.print_exc() # return [-3] # return [output_image_dict] # # except Exception as e: # logging.info("pdf2Image error!") # print("pdf2Image", traceback.print_exc()) # return [-1] # # # ocr_result_flag = 0 # def image_preprocess(image_np, image_path, use_ocr=True): # logging.info("into image_preprocess") # try: # # 长 宽 # # resize_size = (1024, 768) # # 限制图片大小 # # resize_image(image_path, resize_size) # # # 图片倾斜校正,写入原来的图片路径 # g_r_i = get_rotated_image(image_np, image_path) # if g_r_i == [-1]: # return [-1], [], [], 0 # # # otr需要图片resize, 写入另一个路径 # image_np = cv2.imread(image_path) # best_h, best_w = get_best_predict_size(image_np) # image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA) # image_resize_path = image_path[:-4] + "_resize" + image_path[-4:] # cv2.imwrite(image_resize_path, image_resize) # # # 调用otr模型接口 # with open(image_resize_path, "rb") as f: # image_bytes = f.read() # points, split_lines, bboxes, outline_points = from_otr_interface(image_bytes) # if judge_error_code(points): # return points, [], [], 0 # # # 将resize后得到的bbox根据比例还原 # ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) # for i in range(len(bboxes)): # bbox = bboxes[i] # bboxes[i] = [(int(bbox[0][0]*ratio[1]), int(bbox[0][1]*ratio[0])), # (int(bbox[1][0]*ratio[1]), int(bbox[1][1]*ratio[0]))] # for i in range(len(split_lines)): # line = split_lines[i] # split_lines[i] = [(int(line[0][0]*ratio[1]), int(line[0][1]*ratio[0])), # (int(line[1][0]*ratio[1]), int(line[1][1]*ratio[0]))] # for i in range(len(points)): # point = points[i] # points[i] = (int(point[0]*ratio[1]), int(point[1]*ratio[0])) # # for i in range(len(outline_points)): # point = outline_points[i] # outline_points[i] = [(int(point[0][0]*ratio[1]), int(point[0][1]*ratio[0])), # (int(point[1][0]*ratio[1]), int(point[1][1]*ratio[0]))] # # # 查看是否能输出正确框 # for box in bboxes: # cv2.rectangle(image_np, box[0], box[1], (0, 255, 0), 2) # # cv2.namedWindow('bbox', 0) # # cv2.imshow("bbox", image_np) # # cv2.waitKey(0) # # # 调用ocr模型接口 # with open(image_path, "rb") as f: # image_bytes = f.read() # # 有表格 # if len(bboxes) >= 2: # text_list, bbox_list = from_ocr_interface(image_bytes, True) # if judge_error_code(text_list): # return text_list, [], [], 0 # # # for i in range(len(text_list)): # # print(text_list[i], bbox_list[i]) # # 查看是否能输出正确框 # # # for box in bbox_list: # # cv2.rectangle(image_np, (int(box[0][0]), int(box[0][1])), # # (int(box[2][0]), int(box[2][1])), (255, 0, 0), 1) # # cv2.namedWindow('bbox', 0) # # cv2.imshow("bbox", image_np) # # cv2.waitKey(0) # # text, column_list = get_formatted_table(text_list, bbox_list, bboxes, split_lines) # if judge_error_code(text): # return text, [], [], 0 # is_table = 1 # return text, column_list, outline_points, is_table # # # 无表格 # else: # if use_ocr: # text = from_ocr_interface(image_bytes) # if judge_error_code(text): # return text, [], [], 0 # # is_table = 0 # return text, [], [], is_table # else: # is_table = 0 # return None, [], [], is_table # # except Exception as e: # logging.info("image_preprocess error") # print("image_preprocess", traceback.print_exc()) # return [-1], [], [], 0 # # # def get_best_predict_size2(image_np): # sizes = [1280, 1152, 1024, 896, 768, 640, 512, 384, 256, 128] # # min_len = 10000 # best_height = sizes[0] # for height in sizes: # if abs(image_np.shape[0] - height) < min_len: # min_len = abs(image_np.shape[0] - height) # best_height = height # # min_len = 10000 # best_width = sizes[0] # for width in sizes: # if abs(image_np.shape[1] - width) < min_len: # min_len = abs(image_np.shape[1] - width) # best_width = width # # return best_height, best_width # # # def get_best_predict_size(image_np, times=64): # sizes = [] # for i in range(1, 100): # if i*times <= 3000: # sizes.append(i*times) # sizes.sort(key=lambda x: x, reverse=True) # # min_len = 10000 # best_height = sizes[0] # for height in sizes: # if abs(image_np.shape[0] - height) < min_len: # min_len = abs(image_np.shape[0] - height) # best_height = height # # min_len = 10000 # best_width = sizes[0] # for width in sizes: # if abs(image_np.shape[1] - width) < min_len: # min_len = abs(image_np.shape[1] - width) # best_width = width # # return best_height, best_width # # # @get_memory_info.memory_decorator # def pdf2text(path, unique_type_dir): # logging.info("into pdf2text") # try: # # pymupdf pdf to image # save_dir = path.split(".")[-2] + "_" + path.split(".")[-1] # output_image_dict = pdf2Image(path, save_dir) # if judge_error_code(output_image_dict): # return output_image_dict # output_image_dict = output_image_dict[0] # output_image_no_list = list(output_image_dict.keys()) # output_image_no_list.sort(key=lambda x: x) # # # 获取每页pdf提取的文字、表格的列数、轮廓点、是否含表格、页码 # # page_info_list = [] # page_info_dict = {} # has_table_dict = {} # no_table_dict = {} # for page_no in output_image_no_list: # img_path = output_image_dict.get(page_no) # print("pdf page", page_no, "in total", output_image_no_list[-1]) # # 读不出来的跳过 # try: # img = cv2.imread(img_path) # img_size = img.shape # except: # logging.info("pdf2text read image in page fail! continue...") # continue # # # 每张图片处理 # text, column_list, outline_points, is_table = image_preprocess(img, img_path, # use_ocr=False) # if judge_error_code(text): # return text # # # page_info_list.append([text, column_list, outline_points, is_table, # # page_no, img_size]) # page_info = [text, column_list, outline_points, is_table, img_size] # page_info_dict[int(page_no)] = page_info # # 包含table的和不包含table的 # if is_table: # has_table_dict[int(page_no)] = page_info # else: # no_table_dict[int(page_no)] = page_info # # has_table_no_list = list(has_table_dict.keys()) # has_table_no_list.sort(key=lambda x: x) # page_no_list = list(page_info_dict.keys()) # page_no_list.sort(key=lambda x: x) # # # 页码表格连接 # table_connect_list, connect_text_list = page_table_connect(has_table_dict) # if judge_error_code(table_connect_list): # return table_connect_list # # # 连接的页码 # table_connect_page_no_list = [] # for area in connect_text_list: # table_connect_page_no_list.append(area[1]) # print("pdf2text table_connect_list", table_connect_list) # print("connect_text_list", connect_text_list) # # # pdfminer 方式 # try: # fp = open(path, 'rb') # # 用文件对象创建一个PDF文档分析器 # parser = PDFParser(fp) # # 创建一个PDF文档 # doc = PDFDocument(parser) # # 连接分析器,与文档对象 # rsrcmgr = PDFResourceManager() # device = PDFPageAggregator(rsrcmgr, laparams=LAParams()) # interpreter = PDFPageInterpreter(rsrcmgr, device) # # # 判断是否能读pdf # for page in PDFPage.create_pages(doc): # break # except pdfminer.psparser.PSEOF as e: # # pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别 # logging.info("pdf2text " + str(e) + " use ocr read pdf!") # text_list = [] # for page_no in page_no_list: # logging.info("pdf2text ocr page_no " + str(page_no)) # page_info = page_info_dict.get(page_no) # # 表格 # if page_info[3]: # # 判断表格是否跨页连接 # area_no = 0 # jump_page = 0 # for area in table_connect_list: # if page_no in area: # # 只记录一次text # if page_no == area[0]: # image_text = connect_text_list[area_no][0] # text_list.append([image_text, page_no, 0]) # jump_page = 1 # area_no += 1 # # # 是连接页的跳过后面步骤 # if jump_page: # continue # # # 直接取text # image_text = page_info_dict.get(page_no)[0] # text_list.append([image_text, page_no, 0]) # # 非表格 # else: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # text_list.append([image_text, page_no, 0]) # # text_list.sort(key=lambda z: z[1]) # text = "" # for t in text_list: # text += t[0] # return [text] # except Exception as e: # logging.info("pdf format error!") # traceback.print_exc() # return [-3] # # text_list = [] # page_no = 0 # pages = PDFPage.create_pages(doc) # pages = list(pages) # page_count = len(pages) # for page in pages: # logging.info("pdf2text pymupdf page_no " + str(page_no)) # # 限制pdf页数,只取前100页 # # if page_no >= 70: # # logging.info("pdf2text: pdf pages only get 70 pages") # # break # if page_count > 20: # if 10 <= page_no < page_count-10: # page_no += 1 # continue # # # 判断页码在含表格页码中,直接拿已生成的text # if page_no in has_table_no_list: # # 判断表格是否跨页连接 # area_no = 0 # jump_page = 0 # for area in table_connect_list: # if page_no in area: # # 只记录一次text # if page_no == area[0]: # image_text = connect_text_list[area_no][0] # text_list.append([image_text, page_no, 0]) # jump_page = 1 # area_no += 1 # # # 是连接页的跳过后面步骤 # if jump_page: # page_no += 1 # continue # # # 直接取text # image_text = has_table_dict.get(page_no)[0] # text_list.append([image_text, page_no, 0]) # page_no += 1 # continue # # # 不含表格的解析pdf # else: # if get_platform() == "Windows": # try: # interpreter.process_page(page) # layout = device.get_result() # except Exception: # logging.info("pdf2text pdfminer read pdf page error! continue...") # continue # # else: # # 设置超时时间 # try: # # 解析pdf中的不含表格的页 # if get_platform() == "Windows": # origin_pdf_analyze = pdf_analyze.__wrapped__ # layout = origin_pdf_analyze(interpreter, page, device) # else: # layout = pdf_analyze(interpreter, page, device) # except TimeoutError as e: # logging.info("pdf2text pdfminer read pdf page time out!") # return [-4] # except Exception: # logging.info("pdf2text pdfminer read pdf page error! continue...") # continue # # # 判断该页有没有文字对象,没有则有可能是有水印 # only_image = 1 # image_count = 0 # for x in layout: # if isinstance(x, LTTextBoxHorizontal): # only_image = 0 # if isinstance(x, LTFigure): # image_count += 1 # # # 如果该页图片数量过多,直接ocr整页识别 # logging.info("pdf2text image_count " + str(image_count)) # if image_count >= 3: # image_text = page_info_dict.get(page_no)[0] # if image_text is None: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # if judge_error_code(image_text): # return image_text # page_info_dict[page_no][0] = image_text # # text_list.append([image_text, page_no, 0]) # page_no += 1 # continue # # order_list = [] # for x in layout: # # 该对象是否是ocr识别 # ocr_flag = 0 # # if get_platform() == "Windows": # # print("x", page_no, x) # print() # # if isinstance(x, LTTextBoxHorizontal): # image_text = x.get_text() # # # 无法识别编码,用ocr # if re.search('[(]cid:[0-9]+[)]', image_text): # print(re.search('[(]cid:[0-9]+[)]', image_text)) # image_text = page_info_dict.get(page_no)[0] # if image_text is None: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # if judge_error_code(image_text): # return image_text # page_info_dict[page_no][0] = image_text # image_text = add_div(image_text) # # order_list.append([image_text, page_no, x.bbox[1]]) # order_list = [[image_text, page_no, x.bbox[1]]] # break # else: # image_text = add_div(image_text) # order_list.append([image_text, page_no, x.bbox[1]]) # continue # # if isinstance(x, LTFigure): # for image in x: # if isinstance(image, LTImage): # try: # print("pdf2text LTImage size", page_no, image.width, image.height) # image_stream = image.stream.get_data() # # # 小的图忽略 # if image.width <= 300 and image.height <= 300: # continue # # # 有些水印导致pdf分割、读取报错 # # if image.width <= 200 and image.height<=200: # # continue # # # img_test = Image.open(io.BytesIO(image_stream)) # # img_test.save('temp/LTImage.jpg') # # # 查看提取的图片高宽,太大则抛错用pdf输出图进行ocr识别 # img_test = Image.open(io.BytesIO(image_stream)) # if img_test.size[1] > 2000 or img_test.size[0] > 1500: # print("pdf2text LTImage stream output size", img_test.size) # raise Exception # # 比较小的图则直接保存用ocr识别 # else: # img_test.save('temp/LTImage.jpg') # with open('temp/LTImage.jpg', "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # if judge_error_code(image_text): # return image_text # # except pdfminer.pdftypes.PDFNotImplementedError: # # with open(output_image_list[page_no], "rb") as ff: # # image_stream = ff.read() # except Exception: # logging.info("pdf2text pdfminer read image in page " + str(page_no) + # " fail! use pymupdf read image...") # print(traceback.print_exc()) # image_text = page_info_dict.get(page_no)[0] # if image_text is None: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # if judge_error_code(image_text): # return image_text # page_info_dict[page_no][0] = image_text # ocr_flag = 1 # # # 判断只拿到了水印图: 无文字输出且只有图片对象 # if image_text == "" and only_image: # # 拆出该页pdf # try: # logging.info("pdf2text guess pdf has watermark") # split_path = get_single_pdf(path, page_no) # except: # # 如果拆分抛异常,则大概率不是水印图,用ocr识别图片 # logging.info("pdf2text guess pdf has no watermark") # image_text = page_info_dict.get(page_no)[0] # if image_text is None: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # order_list.append([image_text, page_no, -1]) # page_info_dict[page_no][0] = image_text # ocr_flag = 1 # continue # if judge_error_code(split_path): # return split_path # # # 调用office格式转换 # file_path = from_office_interface(split_path, unique_type_dir, 'html', 3) # # if file_path == [-3]: # # return [-3] # if judge_error_code(file_path): # return file_path # # # 获取html文本 # image_text = get_html_p(file_path) # if judge_error_code(image_text): # return image_text # # if get_platform() == "Windows": # print("image_text", page_no, x.bbox[1], image_text) # with open("temp" + str(x.bbox[0]) + ".jpg", "wb") as ff: # ff.write(image_stream) # image_text = add_div(image_text) # if ocr_flag: # order_list.append([image_text, page_no, -1]) # else: # order_list.append([image_text, page_no, x.bbox[1]]) # # order_list.sort(key=lambda z: z[2], reverse=True) # # # 有ocr参与识别 # if order_list[-1][2] == -1: # ocr_order_list = [order_list[-1]] # not_ocr_order_list = [] # not_ocr_text = "" # # 去重,因读取失败而重复获取 # for order in order_list: # if order[2] != -1: # not_ocr_order_list.append(order) # not_ocr_text += order[0] # if string_similarity(ocr_order_list[0][0], not_ocr_text) >= 0.85: # order_list = not_ocr_order_list # else: # order_list = ocr_order_list # # for order in order_list: # text_list.append(order) # page_no += 1 # # text = "" # for t in text_list: # # text += add_div(t[0]) # if t[0] is not None: # text += t[0] # return [text] # except UnicodeDecodeError as e: # logging.info("pdf2text pdfminer create pages failed! " + str(e)) # return [-3] # except Exception as e: # logging.info("pdf2text error!") # print("pdf2text", traceback.print_exc()) # return [-1] # # # def string_similarity(str1, str2): # # 去掉
和回车 # str1 = re.sub("
", "", str1) # str1 = re.sub("
", "", str1) # str1 = re.sub("\n", "", str1) # str2 = re.sub("
", "", str2) # str2 = re.sub("
", "", str2) # str2 = re.sub("\n", "", str2) # # print("********************************") # # print("str1", str1) # # print("********************************") # # print("str2", str2) # # print("********************************") # score = difflib.SequenceMatcher(None, str1, str2).ratio() # print("string_similarity", score) # return score # # # @get_memory_info.memory_decorator # @timeout_decorator.timeout(300, timeout_exception=TimeoutError) # def pdf_analyze(interpreter, page, device): # logging.info("into pdf_analyze") # # 解析pdf中的不含表格的页 # pdf_time = time.time() # print("pdf_analyze interpreter process...") # interpreter.process_page(page) # print("pdf_analyze device get_result...") # layout = device.get_result() # logging.info("pdf2text read time " + str(time.time()-pdf_time)) # return layout # # # def get_html_p(html_path): # logging.info("into get_html_p") # try: # with open(html_path, "r") as ff: # html_str = ff.read() # # soup = BeautifulSoup(html_str, 'lxml') # text = "" # for p in soup.find_all("p"): # p_text = p.text # p_text = p_text.strip() # if p.string != "": # text += p_text # text += "\n" # return text # except Exception as e: # logging.info("get_html_p error!") # print("get_html_p", traceback.print_exc()) # return [-1] # # # def get_single_pdf(path, page_no): # logging.info("into get_single_pdf") # try: # # print("path, ", path) # pdf_origin = PdfFileReader(path, strict=False) # # pdf_new = PdfFileWriter() # pdf_new.addPage(pdf_origin.getPage(page_no)) # # path_new = path.split(".")[0] + "_split.pdf" # with open(path_new, "wb") as ff: # pdf_new.write(ff) # return path_new # except PyPDF2.utils.PdfReadError as e: # raise e # except Exception as e: # logging.info("get_single_pdf error! page " + str(page_no)) # print("get_single_pdf", traceback.print_exc()) # raise e # # # def page_table_connect2(has_table_list, page_info_list): # logging.info("into page_table_connect") # try: # # 判断是否有页码的表格相连 # table_connect_list = [] # temp_list = [] # # 离图片顶部或底部距离,页面高度的1/7 # threshold = 7 # # for i in range(1, len(has_table_list)): # page_info = has_table_list[i] # last_page_info = has_table_list[i - 1] # # # 页码需相连 # if page_info[4] - last_page_info[4] == 1: # # # 上一页最后一个区域的列数和下一页第一个区域列数都为0,且相等 # if not last_page_info[1][-1] and not page_info[1][0] and \ # last_page_info[1][-1] == page_info[1][0]: # # # 上一页的轮廓点要离底部一定距离内,下一页的轮廓点要离顶部一定距离内 # if last_page_info[5][0] - last_page_info[2][-1][1][1] \ # <= int(last_page_info[5][0]/threshold) \ # and page_info[2][0][0][1] - 0 \ # <= int(page_info[5][0]/threshold): # temp_list.append(last_page_info[4]) # temp_list.append(page_info[4]) # continue # # # 条件不符合的,存储之前保存的连接页码 # if len(temp_list) > 1: # temp_list = list(set(temp_list)) # temp_list.sort(key=lambda x: x) # table_connect_list.append(temp_list) # temp_list = [] # if len(temp_list) > 1: # temp_list = list(set(temp_list)) # temp_list.sort(key=lambda x: x) # table_connect_list.append(temp_list) # temp_list = [] # # # 连接两页内容 # connect_text_list = [] # for area in table_connect_list: # first_page_no = area[0] # for page in page_info_list: # if page[4] == first_page_no: # area_page_text = str(page[0]) # break # for i in range(1, len(area)): # current_page_no = area[i] # for page in page_info_list: # if page[4] == current_page_no: # current_page_text = str(page[0]) # break # # # 连接两个table # table_prefix = re.finditer('', current_page_text) # index_list = [] # for t in table_prefix: # index_list.append(t.span()) # # delete_index = index_list[0] # current_page_text = current_page_text[:delete_index[0]] \ # + current_page_text[delete_index[1]:] # # table_suffix = re.finditer('
', area_page_text) # index_list = [] # for t in table_suffix: # index_list.append(t.span()) # # delete_index = index_list[-1] # area_page_text = area_page_text[:delete_index[0]] \ # + area_page_text[delete_index[1]:] # area_page_text = area_page_text + current_page_text # connect_text_list.append([area_page_text, area]) # # return table_connect_list, connect_text_list # except Exception as e: # # print("page_table_connect", e) # logging.info("page_table_connect error!") # print("page_table_connect", traceback.print_exc()) # return [-1], [-1] # # # def page_table_connect(has_table_dict): # logging.info("into page_table_connect") # if not has_table_dict: # return [], [] # # try: # # 判断是否有页码的表格相连 # table_connect_list = [] # temp_list = [] # # 离图片顶部或底部距离,页面高度的1/7 # threshold = 7 # page_no_list = list(has_table_dict.keys()) # page_no_list.sort(key=lambda x: x) # for i in range(1, len(page_no_list)): # page_info = has_table_dict.get(page_no_list[i]) # last_page_info = has_table_dict.get(page_no_list[i-1]) # # 页码需相连 # if page_no_list[i] - page_no_list[i-1] == 1: # # 上一页最后一个区域的列数和下一页第一个区域列数都为0,且相等 # if not last_page_info[1][-1] and not page_info[1][0] and \ # last_page_info[1][-1] == page_info[1][0]: # # # 上一页的轮廓点要离底部一定距离内,下一页的轮廓点要离顶部一定距离内 # if last_page_info[4][0] - last_page_info[2][-1][1][1] \ # <= int(last_page_info[4][0]/threshold) \ # and page_info[2][0][0][1] - 0 \ # <= int(page_info[4][0]/threshold): # temp_list.append(page_no_list[i-1]) # temp_list.append(page_no_list[i]) # continue # # # 条件不符合的,存储之前保存的连接页码 # if len(temp_list) > 1: # temp_list = list(set(temp_list)) # temp_list.sort(key=lambda x: x) # table_connect_list.append(temp_list) # temp_list = [] # if len(temp_list) > 1: # temp_list = list(set(temp_list)) # temp_list.sort(key=lambda x: x) # table_connect_list.append(temp_list) # temp_list = [] # # # 连接两页内容 # connect_text_list = [] # for area in table_connect_list: # first_page_no = area[0] # area_page_text = str(has_table_dict.get(first_page_no)[0]) # for i in range(1, len(area)): # current_page_no = area[i] # current_page_text = str(has_table_dict.get(current_page_no)[0]) # # # 连接两个table # table_prefix = re.finditer('', current_page_text) # index_list = [] # for t in table_prefix: # index_list.append(t.span()) # # delete_index = index_list[0] # current_page_text = current_page_text[:delete_index[0]] \ # + current_page_text[delete_index[1]:] # # table_suffix = re.finditer('
', area_page_text) # index_list = [] # for t in table_suffix: # index_list.append(t.span()) # # delete_index = index_list[-1] # area_page_text = area_page_text[:delete_index[0]] \ # + area_page_text[delete_index[1]:] # area_page_text = area_page_text + current_page_text # connect_text_list.append([area_page_text, area]) # # return table_connect_list, connect_text_list # except Exception as e: # # print("page_table_connect", e) # logging.info("page_table_connect error!") # print("page_table_connect", traceback.print_exc()) # return [-1], [-1] # # # @get_memory_info.memory_decorator # def zip2text(path, unique_type_dir): # logging.info("into zip2text") # try: # zip_path = unique_type_dir # # try: # zip_file = zipfile.ZipFile(path) # zip_list = zip_file.namelist() # # print("zip list namelist", zip_list) # # if get_platform() == "Windows": # if os.path.exists(zip_list[0]): # print("zip2text exists") # # # 循环解压文件到指定目录 # file_list = [] # for f in zip_list: # file_list.append(zip_file.extract(f, path=zip_path)) # # zip_file.extractall(path=zip_path) # zip_file.close() # # # 获取文件名 # # file_list = [] # # for root, dirs, files in os.walk(zip_path, topdown=False): # # for name in dirs: # # file_list.append(os.path.join(root, name) + os.sep) # # for name in files: # # file_list.append(os.path.join(root, name)) # # # # # if get_platform() == "Windows": # # # print("file_list", file_list) # # # # # 过滤掉doc缓存文件 # # temp_list = [] # # for f in file_list: # # if re.search("~\$", f): # # continue # # else: # # temp_list.append(f) # # file_list = temp_list # # except Exception as e: # logging.info("zip format error!") # print("zip format error!", traceback.print_exc()) # return [-3] # # # 内部文件重命名 # # file_list = inner_file_rename(file_list) # file_list = rename_inner_files(zip_path) # if judge_error_code(file_list): # return file_list # # if get_platform() == "Windows": # print("============= zip file list") # # print(file_list) # # text = [] # for file in file_list: # if os.path.isdir(file): # continue # # # 无文件后缀,猜格式 # if len(file.split(".")) <= 1: # logging.info(str(file) + " has no type! Guess type...") # _type = judge_format(file) # if _type is None: # logging.info(str(file) + "cannot guess type!") # sub_text = [""] # else: # logging.info(str(file) + " guess type: " + _type) # new_file = str(file) + "." + _type # os.rename(file, new_file) # file = new_file # sub_text = getText(_type, file) # # 有文件后缀,截取 # else: # _type = file.split(".")[-1] # sub_text = getText(_type, file) # # if judge_error_code(sub_text, code=[-3]): # continue # if judge_error_code(sub_text): # return sub_text # # text = text + sub_text # return text # except Exception as e: # logging.info("zip2text error!") # print("zip2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def rar2text(path, unique_type_dir): # logging.info("into rar2text") # try: # rar_path = unique_type_dir # # try: # # shell调用unrar解压 # _signal = os.system("unrar x " + path + " " + rar_path) # print("rar2text _signal", _signal) # # =0, 解压成功 # if _signal != 0: # raise Exception # except Exception as e: # logging.info("rar format error!") # print("rar format error!", e) # return [-3] # # # 获取文件名 # # file_list = [] # # for root, dirs, files in os.walk(rar_path, topdown=False): # # for name in dirs: # # file_list.append(os.path.join(root, name) + os.sep) # # for name in files: # # file_list.append(os.path.join(root, name)) # # if get_platform() == "Windows": # print("============= rar file list") # # # 内部文件重命名 # # file_list = inner_file_rename(file_list) # file_list = rename_inner_files(rar_path) # if judge_error_code(file_list): # return file_list # # text = [] # for file in file_list: # if os.path.isdir(file): # continue # # # 无文件后缀,猜格式 # if len(file.split(".")) <= 1: # logging.info(str(file) + " has no type! Guess type...") # _type = judge_format(file) # if _type is None: # logging.info(str(file) + "cannot guess type!") # sub_text = [""] # else: # logging.info(str(file) + " guess type: " + _type) # new_file = str(file) + "." + _type # os.rename(file, new_file) # file = new_file # sub_text = getText(_type, file) # # 有文件后缀,截取 # else: # _type = file.split(".")[-1] # sub_text = getText(_type, file) # # if judge_error_code(sub_text, code=[-3]): # continue # if judge_error_code(sub_text): # return sub_text # # # print("sub text", sub_text, file, _type) # text = text + sub_text # return text # except Exception as e: # logging.info("rar2text error!") # print("rar2text", traceback.print_exc()) # return [-1] # # # def inner_file_rename(path_list): # logging.info("into inner_file_rename") # try: # # 先过滤文件名中的点 '.' # path_list.sort(key=lambda x: len(x), reverse=True) # for i in range(len(path_list)): # old_path = path_list[i] # # 对于目录,判断最后一级是否需过滤,重命名 # if os.path.isdir(old_path): # ps = old_path.split(os.sep) # old_p = ps[-2] # if '.' in old_p: # new_p = re.sub("\\.", "", old_p) # new_path = "" # for p in ps[:-2]: # new_path += p + os.sep # new_path += new_p + os.sep # # # 重命名,更新 # # print("has .", path_list[i], new_path) # os.rename(old_path, new_path) # for j in range(len(path_list)): # if old_path in path_list[j]: # path_list[j] = re.sub(old_p, new_p, path_list[j]) + os.sep # # # 将path分割,按分割个数排名 # path_len_list = [] # for p in path_list: # p_ss = p.split(os.sep) # temp_p_ss = [] # for pp in p_ss: # if pp == "": # continue # temp_p_ss.append(pp) # p_ss = temp_p_ss # path_len_list.append([p, p_ss, len(p_ss)]) # # # 从路径分割少的开始改名,即从根目录开始改 # path_len_list.sort(key=lambda x: x[2]) # # # for p in path_len_list: # # print("---", p[1]) # # # 判断不用变的目录在第几级 # no_change_level = 0 # loop = 0 # for p_s in path_len_list[0][1]: # if p_s[-4:] == "_rar" or p_s[-4:] == "_zip": # no_change_level += loop # loop = 0 # loop += 1 # no_change_level += 1 # # # 每个 # new_path_list = [] # for path_len in path_len_list: # # 前n个是固定路径 # new_path = "" # for i in range(no_change_level): # new_path += path_len[1][i] + os.sep # old_path = new_path # # if not get_platform() == "Windows": # old_path = os.sep + old_path # new_path = os.sep + new_path # # print("path_len[1][3:]", path_len[1][3:]) # # count = 0 # for p in path_len[1][no_change_level:]: # # 新路径全部转换hash # new_path += str(hash(p)) # # # 最后一个不加os.sep,并且旧路径最后一个不转换hash # if count < len(path_len[1][no_change_level:]) - 1: # old_path += str(hash(p)) + os.sep # new_path += os.sep # else: # old_path += p # count += 1 # # # path是文件夹再加os.sep # if os.path.isdir(path_len[0]): # new_path += os.sep # old_path += os.sep # # path是文件再加文件名后缀 # else: # p_ss = path_len[1][-1].split(".") # if len(p_ss) > 1: # path_suffix = "." + p_ss[-1] # new_path += path_suffix # # print("inner_file_rename", old_path, "to", new_path) # os.rename(old_path, new_path) # new_path_list.append(new_path) # # return new_path_list # except Exception as e: # logging.info("inner_file_rename error!") # print("inner_file_rename", traceback.print_exc()) # return [-1] # # # def rename_inner_files(root_path): # try: # logging.info("into rename_inner_files") # # 获取解压文件夹下所有文件+文件夹,不带根路径 # path_list = [] # for root, dirs, files in os.walk(root_path, topdown=False): # for name in dirs: # p = os.path.join(root, name) + os.sep # p = re.sub(root_path, "", p) # path_list.append(p) # for name in files: # p = os.path.join(root, name) # p = re.sub(root_path, "", p) # path_list.append(p) # # # 按路径长度排序 # path_list.sort(key=lambda x: len(x), reverse=True) # # # 循环改名 # for old_path in path_list: # # 按路径分隔符分割 # ss = old_path.split(os.sep) # # 判断是否文件夹 # is_dir = 0 # file_type = "" # if os.path.isdir(root_path + old_path): # ss = ss[:-1] # is_dir = 1 # else: # if "." in old_path: # file_type = "." + old_path.split(".")[-1] # else: # file_type = "" # # # 最后一级需要用hash改名 # new_path = "" # # new_path = re.sub(ss[-1], str(hash(ss[-1])), old_path) + file_type # current_level = 0 # for s in ss: # # 路径拼接 # if current_level < len(ss) - 1: # new_path += s + os.sep # else: # new_path += str(hash(s)) + file_type # current_level += 1 # # new_ab_path = root_path + new_path # old_ab_path = root_path + old_path # os.rename(old_ab_path, new_ab_path) # # # 重新获取解压文件夹下所有文件+文件夹 # new_path_list = [] # for root, dirs, files in os.walk(root_path, topdown=False): # for name in dirs: # new_path_list.append(os.path.join(root, name) + os.sep) # for name in files: # new_path_list.append(os.path.join(root, name)) # # print("new_path_list", new_path_list) # return new_path_list # except: # traceback.print_exc() # return [-1] # # # @get_memory_info.memory_decorator # def xls2text(path, unique_type_dir): # logging.info("into xls2text") # try: # # 调用libreoffice格式转换 # file_path = from_office_interface(path, unique_type_dir, 'xlsx') # # if file_path == [-3]: # # return [-3] # if judge_error_code(file_path): # return file_path # # text = xlsx2text(file_path, unique_type_dir) # # if text == [-1]: # # return [-1] # # if text == [-3]: # # return [-3] # if judge_error_code(text): # return text # # return text # except Exception as e: # logging.info("xls2text error!") # print("xls2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def xlsx2text(path, unique_type_dir): # logging.info("into xlsx2text") # try: # try: # # sheet_name=None, 即拿取所有sheet,存为dict # df_dict = pandas.read_excel(path, header=None, keep_default_na=False, sheet_name=None) # except Exception as e: # logging.info("xlsx format error!") # # print("xlsx format error!", e) # return [-3] # # df_list = [sheet for sheet in df_dict.values()] # sheet_text = "" # for df in df_list: # text = '' + "\n" # for index, row in df.iterrows(): # text = text + "" # for r in row: # text = text + "" + "\n" # # print(text) # text = text + "" + "\n" # text = text + "
" + str(r) + "
" + "\n" # sheet_text += text # # return [sheet_text] # except Exception as e: # logging.info("xlsx2text error!") # print("xlsx2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def swf2text(path, unique_type_dir): # logging.info("into swf2text") # try: # try: # with open(path, 'rb') as f: # swf_file = SWF(f) # svg_exporter = SVGExporter() # svg = swf_file.export(svg_exporter) # # with open('swf_export.jpg', 'wb') as f: # # f.write(svg.read()) # swf_str = str(svg.getvalue(), encoding='utf-8') # except Exception as e: # logging.info("swf format error!") # traceback.print_exc() # return [-3] # # # 正则匹配图片的信息位置 # result0 = re.finditer(']*)', swf_str) # image_bytes_list = [] # i = 0 # image_path_prefix = path.split(".")[-2] + "_" + path.split(".")[-1] # image_path_list = [] # for r in result0: # # 截取图片信息所在位置 # swf_str0 = swf_str[r.span()[0]:r.span()[1] + 1] # # # 正则匹配得到图片的base64编码 # result1 = re.search('xlink:href="data:(.[^>]*)', swf_str0) # swf_str1 = swf_str0[result1.span()[0]:result1.span()[1]] # reg1_prefix = 'b\'' # result1 = re.search(reg1_prefix + '(.[^\']*)', swf_str1) # swf_str1 = swf_str1[result1.span()[0] + len(reg1_prefix):result1.span()[1]] # # # base64_str -> base64_bytes -> no "\\" base64_bytes -> bytes -> image # base64_bytes_with_double = bytes(swf_str1, "utf-8") # base64_bytes = codecs.escape_decode(base64_bytes_with_double, "hex-escape")[0] # image_bytes = base64.b64decode(base64_bytes) # image_bytes_list.append(image_bytes) # image_path = image_path_prefix + "_page_" + str(i) + ".png" # with open(image_path, 'wb') as f: # f.write(image_bytes) # # image_path_list.append(image_path) # # 正则匹配得到图片的宽高 # # reg2_prefix = 'width="' # # result2 = re.search(reg2_prefix + '(\d+)', swf_str0) # # swf_str2 = swf_str0[result2.span()[0]+len(reg2_prefix):result2.span()[1]] # # width = swf_str2 # # reg2_prefix = 'height="' # # result2 = re.search(reg2_prefix + '(\d+)', swf_str0) # # swf_str2 = swf_str0[result2.span()[0]+len(reg2_prefix):result2.span()[1]] # # height = swf_str2 # i += 1 # # text_list = [] # # print("image_path_list", image_path_list) # for image_path in image_path_list: # text = picture2text(image_path) # # print("text", text) # # if judge_error_code(text, code=[-3]): # continue # if judge_error_code(text): # return text # # text = text[0] # text_list.append(text) # # text = "" # for t in text_list: # text += t # # return [text] # except Exception as e: # logging.info("swf2text error!") # print("swf2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def picture2text(path, html=False): # logging.info("into picture2text") # try: # # 判断图片中表格 # img = cv2.imread(path) # if img is None: # return [-3] # # # if get_platform() == "Windows": # # print("picture2text img", img) # # text, column_list, outline_points, is_table = image_preprocess(img, path) # if judge_error_code(text): # return text # # if text == [-5]: # # return [-5] # # if text == [-2]: # # return [-2] # # if text == [-1]: # # return [-1] # # if html: # text = add_div(text) # return [text] # except Exception as e: # logging.info("picture2text error!") # print("picture2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def from_ocr_interface(image_stream, is_table=False): # logging.info("into from_ocr_interface") # try: # base64_stream = base64.b64encode(image_stream) # # # 调用接口 # try: # r = ocr(data=base64_stream, ocr_model=globals().get("global_ocr_model")) # except TimeoutError: # if is_table: # return [-5], [-5] # else: # return [-5] # except requests.exceptions.ConnectionError as e: # if is_table: # return [-2], [-2] # else: # return [-2] # # _dict = r # text_list = eval(_dict.get("text")) # bbox_list = eval(_dict.get("bbox")) # if text_list is None: # text_list = [] # if bbox_list is None: # bbox_list = [] # # if is_table: # return text_list, bbox_list # else: # if text_list and bbox_list: # text = get_sequential_data(text_list, bbox_list, html=True) # if judge_error_code(text): # return text # # if text == [-1]: # # return [-1] # else: # text = "" # return text # except Exception as e: # logging.info("from_ocr_interface error!") # # print("from_ocr_interface", e, global_type) # if is_table: # return [-1], [-1] # else: # return [-1] # # # @get_memory_info.memory_decorator # def from_otr_interface(image_stream): # logging.info("into from_otr_interface") # try: # base64_stream = base64.b64encode(image_stream) # # # 调用接口 # try: # r = otr(data=base64_stream, otr_model=globals().get("global_otr_model")) # except TimeoutError: # return [-5], [-5], [-5], [-5] # except requests.exceptions.ConnectionError as e: # logging.info("from_otr_interface") # print("from_otr_interface", traceback.print_exc()) # return [-2], [-2], [-2], [-2] # # # 处理结果 # _dict = r # points = eval(_dict.get("points")) # split_lines = eval(_dict.get("split_lines")) # bboxes = eval(_dict.get("bboxes")) # outline_points = eval(_dict.get("outline_points")) # # print("from_otr_interface len(bboxes)", len(bboxes)) # if points is None: # points = [] # if split_lines is None: # split_lines = [] # if bboxes is None: # bboxes = [] # if outline_points is None: # outline_points = [] # return points, split_lines, bboxes, outline_points # except Exception as e: # logging.info("from_otr_interface error!") # print("from_otr_interface", traceback.print_exc()) # return [-1], [-1], [-1], [-1] # # # def from_office_interface(src_path, dest_path, target_format, retry_times=1): # try: # # Win10跳出超时装饰器 # if get_platform() == "Windows": # # origin_office_convert = office_convert.__wrapped__ # # file_path = origin_office_convert(src_path, dest_path, target_format, retry_times) # file_path = office_convert(src_path, dest_path, target_format, retry_times) # else: # # 将装饰器包装为一个类,否则多进程Pickle会报错 it's not the same object as xxx 问题, # # timeout_decorator_obj = my_timeout_decorator.TimeoutClass(office_convert, 180, TimeoutError) # # file_path = timeout_decorator_obj.run(src_path, dest_path, target_format, retry_times) # # file_path = office_convert(src_path, dest_path, target_format, retry_times) # # if judge_error_code(file_path): # return file_path # return file_path # except TimeoutError: # logging.info("from_office_interface timeout error!") # return [-5] # except: # logging.info("from_office_interface error!") # print("from_office_interface", traceback.print_exc()) # return [-1] # # # def get_sequential_data(text_list, bbox_list, html=False): # logging.info("into get_sequential_data") # try: # text = "" # order_list = [] # for i in range(len(text_list)): # length_start = bbox_list[i][0][0] # length_end = bbox_list[i][1][0] # height_start = bbox_list[i][0][1] # height_end = bbox_list[i][-1][1] # # print([length_start, length_end, height_start, height_end]) # order_list.append([text_list[i], length_start, length_end, height_start, height_end]) # # text = text + infomation['text'] + "\n" # # if get_platform() == "Windows": # print("get_sequential_data", order_list) # if not order_list: # if get_platform() == "Windows": # print("get_sequential_data", "no order list") # return "" # # # 根据bbox的坐标对输出排序 # order_list.sort(key=lambda x: (x[3], x[1])) # # # 根据bbox分行分列 # # col_list = [] # # height_end = int((order_list[0][4] + order_list[0][3]) / 2) # # for i in range(len(order_list)): # # if height_end - threshold <= order_list[i][3] <= height_end + threshold: # # col_list.append(order_list[i]) # # else: # # row_list.append(col_list) # # col_list = [] # # height_end = int((order_list[i][4] + order_list[i][3]) / 2) # # col_list.append(order_list[i]) # # if i == len(order_list) - 1: # # row_list.append(col_list) # # row_list = [] # used_box = [] # threshold = 5 # for box in order_list: # if box in used_box: # continue # # height_center = (box[4] + box[3]) / 2 # row = [] # for box2 in order_list: # if box2 in used_box: # continue # height_center2 = (box2[4] + box2[3]) / 2 # if height_center - threshold <= height_center2 <= height_center + threshold: # if box2 not in row: # row.append(box2) # used_box.append(box2) # row.sort(key=lambda x: x[0]) # row_list.append(row) # # for row in row_list: # if not row: # continue # if len(row) <= 1: # text = text + row[0][0] + "\n" # else: # sub_text = "" # row.sort(key=lambda x: x[1]) # for col in row: # sub_text = sub_text + col[0] + " " # sub_text = sub_text + "\n" # text += sub_text # # if html: # text = "
" + text # text = re.sub("\n", "
\n
", text) # text += "
" # # if text[-5:] == "
": # # text = text[:-5] # return text # # except Exception as e: # logging.info("get_sequential_data error!") # print("get_sequential_data", traceback.print_exc()) # return [-1] # # # def get_formatted_table(text_list, text_bbox_list, table_bbox_list, split_line): # logging.info("into get_formatted_table") # try: # # 重新定义text_bbox_list,[point, point, text] # text_bbox_list = [[text_bbox_list[i][0], text_bbox_list[i][2], text_list[i]] for i in # range(len(text_bbox_list))] # # 按纵坐标排序 # text_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # # # print("text_bbox_list", text_bbox_list) # # print("table_bbox_list", table_bbox_list) # # # bbox位置 threshold # threshold = 5 # # # 根据split_line分区,可能有个区多个表格 [(), ()] # area_text_bbox_list = [] # area_table_bbox_list = [] # # print("get_formatted_table, split_line", split_line) # for j in range(1, len(split_line)): # last_y = split_line[j - 1][0][1] # current_y = split_line[j][0][1] # temp_text_bbox_list = [] # temp_table_bbox_list = [] # # # 找出该区域下text bbox # for text_bbox in text_bbox_list: # # 计算 text bbox 中心点 # text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, # (text_bbox[1][1] + text_bbox[0][1]) / 2) # if last_y - threshold <= text_bbox_center[1] <= current_y + threshold: # temp_text_bbox_list.append(text_bbox) # area_text_bbox_list.append(temp_text_bbox_list) # # # 找出该区域下table bbox # for table_bbox in table_bbox_list: # # 计算 table bbox 中心点 # table_bbox_center = ((table_bbox[1][0] + table_bbox[0][0]) / 2, # (table_bbox[1][1] + table_bbox[0][1]) / 2) # if last_y < table_bbox_center[1] < current_y: # temp_table_bbox_list.append(table_bbox) # area_table_bbox_list.append(temp_table_bbox_list) # # # for j in range(len(area_text_bbox_list)): # # print("area_text_bbox_list", j, area_text_bbox_list[j]) # # # 对每个区域分别进行两个bbox匹配,生成表格 # area_text_list = [] # area_column_list = [] # for j in range(len(area_text_bbox_list)): # # 每个区域的table bbox 和text bbox # temp_table_bbox_list = area_table_bbox_list[j] # temp_text_bbox_list = area_text_bbox_list[j] # # # 判断该区域有无表格bbox # # 若无表格,将该区域文字连接 # if not temp_table_bbox_list: # # 找出该区域的所有text bbox # only_text_list = [] # only_bbox_list = [] # for text_bbox in temp_text_bbox_list: # only_text_list.append(text_bbox[2]) # only_bbox_list.append([text_bbox[0], text_bbox[1]]) # only_text = get_sequential_data(only_text_list, only_bbox_list, True) # if only_text == [-1]: # return [-1], [-1] # area_text_list.append(only_text) # area_column_list.append(0) # continue # # # 有表格 # # 文本对应的表格格子 # text_in_table = {} # for i in range(len(temp_text_bbox_list)): # text_bbox = temp_text_bbox_list[i] # # # 计算 text bbox 中心点 # text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, # (text_bbox[1][1] + text_bbox[0][1]) / 2) # # # 判断中心点在哪个table bbox中 # for table_bbox in temp_table_bbox_list: # # 中心点在table bbox中,将text写入字典 # if table_bbox[0][0] <= text_bbox_center[0] <= table_bbox[1][0] and \ # table_bbox[0][1] <= text_bbox_center[1] <= table_bbox[1][1]: # if str(table_bbox) in text_in_table.keys(): # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # else: # text_in_table[str(table_bbox)] = text_bbox[2] # break # # # 如果未找到text bbox匹配的table bbox,加大threshold匹配 # # elif (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]): # # if str(table_bbox) in text_in_table.keys(): # # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # # else: # # text_in_table[str(table_bbox)] = text_bbox[2] # # break # # # 对表格格子进行分行分列,并计算总计多少小列 # # 放入坐标 # all_col_list = [] # all_row_list = [] # for i in range(len(temp_table_bbox_list)): # table_bbox = temp_table_bbox_list[i] # # # 放入所有坐标x # if table_bbox[0][0] not in all_col_list: # all_col_list.append(table_bbox[0][0]) # if table_bbox[1][0] not in all_col_list: # all_col_list.append(table_bbox[1][0]) # # # 放入所有坐标y # if table_bbox[0][1] not in all_row_list: # all_row_list.append(table_bbox[0][1]) # if table_bbox[1][1] not in all_row_list: # all_row_list.append(table_bbox[1][1]) # all_col_list.sort(key=lambda x: x) # all_row_list.sort(key=lambda x: x) # # # 分行 # row_list = [] # rows = [] # temp_table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0], x[1][1], x[1][0])) # y_row = temp_table_bbox_list[0][0][1] # for i in range(len(temp_table_bbox_list)): # table_bbox = temp_table_bbox_list[i] # # if y_row - threshold <= table_bbox[0][1] <= y_row + threshold: # rows.append(table_bbox) # else: # y_row = table_bbox[0][1] # if rows: # rows.sort(key=lambda x: x[0][0]) # row_list.append(rows) # rows = [] # rows.append(table_bbox) # # print("*" * 30) # # print(row_list) # # if i == len(temp_table_bbox_list) - 1: # if rows: # rows.sort(key=lambda x: x[0][0]) # row_list.append(rows) # # # 生成表格,包括文字和格子宽度 # area_column = [] # text = '' + "\n" # for row in row_list: # text += "" + "\n" # for col in row: # # 计算bbox y坐标之间有多少其他点,+1即为所占行数 # row_span = 1 # for y in all_row_list: # if col[0][1] < y < col[1][1]: # if y - col[0][1] >= 2 and col[1][1] - y >= 2: # row_span += 1 # # # 计算bbox x坐标之间有多少其他点,+1即为所占列数 # col_span = 1 # for x in all_col_list: # if col[0][0] < x < col[1][0]: # if x - col[0][0] >= 2 and col[1][0] - x >= 2: # col_span += 1 # # text += "" + "\n" # text += "" + "\n" # text += "
" # # if str(col) in text_in_table.keys(): # text += text_in_table.get(str(col)) # else: # text += "" # text += "
" + "\n" # # # 计算最大column # max_col_num = 0 # for row in row_list: # col_num = 0 # for col in row: # col_num += 1 # if max_col_num < col_num: # max_col_num = col_num # # area_text_list.append(text) # area_column_list.append(max_col_num) # # text = "" # if get_platform() == "Windows": # print("get_formatted_table area_text_list", area_text_list) # for area_text in area_text_list: # text += area_text # return text, area_column_list # except Exception as e: # logging.info("get_formatted_table error!") # print("get_formatted_table", traceback.print_exc()) # return [-1], [-1] port_num = [0] def choose_port(): process_num = 4 if port_num[0] % process_num == 0: _url = local_url + ":15011" elif port_num[0] % process_num == 1: _url = local_url + ":15012" elif port_num[0] % process_num == 2: _url = local_url + ":15013" elif port_num[0] % process_num == 3: _url = local_url + ":15014" port_num[0] = port_num[0] + 1 return _url def getText(_type, path_or_stream): print("file type - " + _type) logging.info("file type - " + _type) try: ss = path_or_stream.split(".") unique_type_dir = ss[-2] + "_" + ss[-1] + os.sep except: unique_type_dir = path_or_stream + "_" + _type + os.sep if _type == "pdf": # return pdf2text(path_or_stream, unique_type_dir) return PDFConvert(path_or_stream).get_html() if _type == "docx": return docx2text(path_or_stream, unique_type_dir) if _type == "zip": return zip2text(path_or_stream, unique_type_dir) if _type == "rar": return rar2text(path_or_stream, unique_type_dir) if _type == "xlsx": return xlsx2text(path_or_stream, unique_type_dir) if _type == "xls": return xls2text(path_or_stream, unique_type_dir) if _type == "doc": return doc2text(path_or_stream, unique_type_dir) if _type == "jpg" or _type == "png" or _type == "jpeg": return picture2text(path_or_stream) if _type == "swf": return swf2text(path_or_stream, unique_type_dir) if _type == "txt": return txt2text(path_or_stream) return [""] def to_html(path, text): with open(path, 'w',encoding="utf8") as f: f.write("") f.write('') f.write("") f.write(text) f.write("") def resize_image(image_path, size): try: image_np = cv2.imread(image_path) # print(image_np.shape) width = image_np.shape[1] height = image_np.shape[0] h_w_rate = height / width # width_standard = 900 # height_standard = 1400 width_standard = size[1] height_standard = size[0] width_new = int(height_standard / h_w_rate) height_new = int(width_standard * h_w_rate) if width > width_standard: image_np = cv2.resize(image_np, (width_standard, height_new)) elif height > height_standard: image_np = cv2.resize(image_np, (width_new, height_standard)) cv2.imwrite(image_path, image_np) # print("resize_image", image_np.shape) return except Exception as e: logging.info("resize_image") print("resize_image", e, global_type) return def remove_red_seal(image_np): """ 去除红色印章 """ # 获得红色通道 blue_c, green_c, red_c = cv2.split(image_np) # 多传入一个参数cv2.THRESH_OTSU,并且把阈值thresh设为0,算法会找到最优阈值 thresh, ret = cv2.threshold(red_c, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # print("remove_red_seal thresh", thresh) # 实测调整为95%效果好一些 filter_condition = int(thresh * 0.98) thresh1, red_thresh = cv2.threshold(red_c, filter_condition, 255, cv2.THRESH_BINARY) # 把图片转回 3 通道 image_and = np.expand_dims(red_thresh, axis=2) image_and = np.concatenate((image_and, image_and, image_and), axis=-1) # print(image_and.shape) # 膨胀 gray = cv2.cvtColor(image_and, cv2.COLOR_RGB2GRAY) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) erode = cv2.erode(gray, kernel) cv2.imshow("erode", erode) cv2.waitKey(0) image_and = np.bitwise_and(cv2.bitwise_not(blue_c), cv2.bitwise_not(erode)) result_img = cv2.bitwise_not(image_and) cv2.imshow("remove_red_seal", result_img) cv2.waitKey(0) return result_img def remove_underline(image_np): """ 去除文字下划线 """ # 灰度化 gray = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY) # 二值化 binary = cv2.adaptiveThreshold(~gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 15, 10) # Sobel kernel_row = np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], np.float32) kernel_col = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], np.float32) # binary = cv2.filter2D(binary, -1, kernel=kernel) binary_row = cv2.filter2D(binary, -1, kernel=kernel_row) binary_col = cv2.filter2D(binary, -1, kernel=kernel_col) cv2.imshow("custom_blur_demo", binary) cv2.waitKey(0) rows, cols = binary.shape # 识别横线 scale = 5 kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (cols // scale, 1)) erodedcol = cv2.erode(binary_row, kernel, iterations=1) cv2.imshow("Eroded Image", erodedcol) cv2.waitKey(0) dilatedcol = cv2.dilate(erodedcol, kernel, iterations=1) cv2.imshow("dilate Image", dilatedcol) cv2.waitKey(0) return def getMDFFromFile(path): _length = 0 try: _md5 = hashlib.md5() with open(path, "rb") as ff: while True: data = ff.read(4096) if not data: break _length += len(data) _md5.update(data) return _md5.hexdigest(), _length except Exception as e: traceback.print_exc() return None, _length def add_html_format(text_list): new_text_list = [] for t in text_list: html_t = "\n" html_t += '\n' html_t += "\n" html_t += t html_t += "\n\n" new_text_list.append(html_t) return new_text_list @timeout_decorator.timeout(1200, timeout_exception=TimeoutError) def unique_temp_file_process(stream, _type): logging.info("into unique_temp_file_process") try: # 每个调用在temp中创建一个唯一空间 uid1 = uuid.uuid1().hex unique_space_path = _path + os.sep + "temp" + os.sep + uid1 + os.sep # unique_space_path = "/mnt/fangjiasheng/" + "temp/" + uid1 + "/" # 判断冲突 if not os.path.exists(unique_space_path): if not os.path.exists(_path + os.sep + "temp"): os.mkdir(_path + os.sep + "temp" + os.sep) os.mkdir(unique_space_path) else: uid2 = uuid.uuid1().hex if not os.path.exists(_path + os.sep + "temp"): os.mkdir(_path + os.sep + "temp" + os.sep) os.mkdir(_path + os.sep + "temp" + os.sep + uid2 + os.sep) # os.mkdir("/mnt/" + "temp/" + uid2 + "/") # 在唯一空间中,对传入的文件也保存为唯一 uid3 = uuid.uuid1().hex file_path = unique_space_path + uid3 + "." + _type with open(file_path, "wb") as ff: ff.write(stream) # 跳过一些编号 pass_md5 = getMDFFromFile(file_path) print("getMDFFromFile", pass_md5) if pass_md5 == '84dba5a65339f338d3ebdf9f33fae13e'\ or pass_md5 == '3d9f9f4354582d85b21b060ebd5786db'\ or pass_md5 == 'b52da40f24c6b29dfc2ebeaefe4e41f1' \ or pass_md5 == 'eefb925b7ccec1467be20b462fde2a09': raise Exception text = getText(_type, file_path) return text except Exception as e: # print("Convert error! Delete temp file. ", e, global_type) logging.info("unique_temp_file_process") print("unique_temp_file_process:", traceback.print_exc()) return [-1] finally: print("======================================") print("File md5:", getMDFFromFile(file_path)) try: if get_platform() == "Linux": # 删除该唯一空间下所有文件 if os.path.exists(unique_space_path): shutil.rmtree(unique_space_path) print() except Exception as e: logging.info("Delete Files Failed!") # print("Delete Files Failed!") return [-1] print("Finally") # to_html(_path + "6.html", text[0]) # to_html(unique_space_path + "result.html", text[0]) # return text logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def log(msg): """ @summary:打印信息 """ logger.info(msg) def cut_str(text_list, only_text_list, max_bytes_length=2000000): logging.info("into cut_str") try: # 计算有格式总字节数 bytes_length = 0 for text in text_list: bytes_length += len(bytes(text, encoding='utf-8')) print("text_list", bytes_length) # 小于直接返回 if bytes_length < max_bytes_length: print("return text_list no cut") return text_list # 全部文件连接,重新计算无格式字节数 all_text = "" bytes_length = 0 for text in only_text_list: bytes_length += len(bytes(text, encoding='utf-8')) all_text += text # print("only_text_list", bytes_length) # 小于直接返回 if bytes_length < max_bytes_length: print("return only_text_list no cut") return only_text_list # 截取字符 all_text = all_text[:int(max_bytes_length/3)] print("text bytes ", len(bytes(all_text, encoding='utf-8'))) print("return only_text_list has cut") return [all_text] except Exception as e: logging.info("cut_str " + str(e)) return ["-1"] @get_memory_info.memory_decorator def convert(data, ocr_model, otr_model): """ 接口返回值: {[str], 1}: 处理成功 {[-1], 0}: 逻辑处理错误 {[-2], 0}: 接口调用错误 {[-3], 1}: 文件格式错误,无法打开 {[-4], 0}: 各类文件调用第三方包读取超时 {[-5], 0}: 整个转换过程超时 {[-6], 0}: 阿里云UDF队列超时 {[-7], 1}: 文件需密码,无法打开 :return: {"result": [], "is_success": int} """ # 控制内存 # soft, hard = resource.getrlimit(resource.RLIMIT_AS) # resource.setrlimit(resource.RLIMIT_AS, (15 * 1024 ** 3, hard)) logging.info("into convert") start_time = time.time() try: # 模型加入全局变量 globals().update({"global_ocr_model": ocr_model}) globals().update({"global_otr_model": otr_model}) stream = base64.b64decode(data.get("file")) _type = data.get("type") if get_platform() == "Windows": # 解除超时装饰器,直接访问原函数 origin_unique_temp_file_process = unique_temp_file_process.__wrapped__ text = origin_unique_temp_file_process(stream, _type) else: # Linux 通过装饰器设置整个转换超时时间 try: text = unique_temp_file_process(stream, _type) except TimeoutError: logging.info("convert time out! 1200 sec") text = [-5] # if text == [-1]: # print({"failed result": [-1], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-1"], "result_text": ["-1"], "is_success": 0} # if text == [-2]: # print({"failed result": [-2], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-2"], "result_text": ["-2"], "is_success": 0} # if text == [-3]: # print({"failed result": [-3], "is_success": 1}, time.time() - start_time) # return {"result_html": ["-3"], "result_text": ["-3"], "is_success": 1} # if text == [-4]: # print({"failed result": [-4], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-4"], "result_text": ["-4"], "is_success": 0} # if text == [-5]: # print({"failed result": [-5], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-5"], "result_text": ["-5"], "is_success": 0} # if text == [-7]: # print({"failed result": [-7], "is_success": 1}, time.time() - start_time) # return {"result_html": ["-7"], "result_text": ["-7"], "is_success": 1} # if text == [-8]: # print({"failed result": [-8], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-8"], "result_text": ["-8"], "is_success": 1} error_code = [[-x] for x in range(1, 9)] still_success_code = [[-3], [-7]] if text in error_code: if text in still_success_code: print({"failed result": text, "is_success": 1}, time.time() - start_time) return {"result_html": [str(text[0])], "result_text": [str(text[0])], "is_success": 1} else: print({"failed result": text, "is_success": 0}, time.time() - start_time) return {"result_html": [str(text[0])], "result_text": [str(text[0])], "is_success": 0} # 结果保存result.html # if get_platform() == "Windows": text_str = "" for t in text: text_str += t to_html("../result.html", text_str) # 取纯文本 only_text = [] for t in text: new_t = BeautifulSoup(t, "lxml").get_text() new_t = re.sub("\n", "", new_t) only_text.append(new_t) # 判断长度,过长截取 text = cut_str(text, only_text) only_text = cut_str(only_text, only_text) if len(only_text) == 0: only_text = [""] if only_text[0] == '' and len(only_text) <= 1: print({"finished result": ["", 0], "is_success": 1}, time.time() - start_time) else: print({"finished result": [str(only_text)[:20], len(str(text))], "is_success": 1}, time.time() - start_time) return {"result_html": text, "result_text": only_text, "is_success": 1} except Exception as e: print({"failed result": [-1], "is_success": 0}, time.time() - start_time) print("convert", traceback.print_exc()) return {"result_html": ["-1"], "result_text": ["-1"], "is_success": 0} global_type = "" local_url = "http://127.0.0.1" if get_platform() == "Windows": _path = os.path.abspath(os.path.dirname(__file__)) else: _path = "/home/admin" if not os.path.exists(_path): _path = os.path.dirname(os.path.abspath(__file__)) if __name__ == '__main__': print(os.path.abspath(__file__) + "/../../") # if len(sys.argv) == 2: # port = int(sys.argv[1]) # else: # port = 15015 # app.run(host='0.0.0.0', port=port, threaded=True, debug=False) # log("format_conversion running") # convert("", "ocr_model", "otr_model") # _str = "啊" # str1 = "" # str2 = "" # for i in range(900000): # str1 += _str # list1 = [str1] # for i in range(700000): # str2 += _str # list2 = [str2] # cut_str(list1, list2) # file_path = "C:/Users/Administrator/Desktop/error1.png" # file_path = "D:/Project/table-detect-master/train_data/label_1.jpg" # file_path = "D:/Project/table-detect-master/test_files/1.png" # file_path = "D:/Project/table-detect-master/test_files/table2.jpg" if get_platform() == "Windows": file_path = "C:/Users/Administrator/Desktop/error3.pdf" # file_path = "D:/BIDI_DOC/比地_文档/2022/Test_Interface/94961e1987d1090e.xls" # file_path = "C:/Users/Administrator/Desktop/Test_ODPS/1624875783055.pdf" else: file_path = "1.doc" file_path = "files/1629873875150.png" with open(file_path, "rb") as f: file_bytes = f.read() file_base64 = base64.b64encode(file_bytes) data = {"file": file_base64, "type": file_path.split(".")[-1], "filemd5": 100} ocr_model = ocr_interface.OcrModels().get_model() otr_model = otr_interface.OtrModels().get_model() result = convert(data, ocr_model, otr_model) # print("*"*40) # result = convert(data, ocr_model, otr_model) # print(result)