#-*- coding: utf-8 -*- import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") import codecs import gc import hashlib import io import json import multiprocessing import sys import subprocess import PyPDF2 import lxml import pdfminer from PIL import Image from format_convert import get_memory_info from ocr import ocr_interface from ocr.ocr_interface import ocr, OcrModels from otr import otr_interface from otr.otr_interface import otr, OtrModels import re import shutil import signal import sys import base64 import time import traceback import uuid from os.path import basename import cv2 import fitz import pandas import docx import zipfile import mimetypes import filetype # import pdfplumber import psutil import requests import rarfile from PyPDF2 import PdfFileReader, PdfFileWriter import xml.dom.minidom import subprocess import logging from pdfminer.pdfparser import PDFParser from pdfminer.pdfdocument import PDFDocument from pdfminer.pdfpage import PDFPage from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter from pdfminer.converter import PDFPageAggregator from pdfminer.layout import LTTextBoxHorizontal, LAParams, LTFigure, LTImage, LTCurve, LTText, LTChar import logging import chardet from bs4 import BeautifulSoup from format_convert.libreoffice_interface import office_convert from format_convert.swf.export import SVGExporter logging.getLogger("pdfminer").setLevel(logging.WARNING) from format_convert.table_correct import * from format_convert.swf.movie import SWF import logging # import timeout_decorator from format_convert import timeout_decorator logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # txt doc docx xls xlsx pdf zip rar swf jpg jpeg png def judge_error_code(_list, code=[-1, -2, -3, -4, -5, -7]): for c in code: if _list == [c]: return True return False def set_timeout(signum, frame): print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") print("=======================set_timeout") raise TimeoutError def log_traceback(func_name): logging.info(func_name) etype, value, tb = sys.exc_info() for line in traceback.TracebackException( type(value), value, tb, limit=None).format(chain=True): logging.info(line) def judge_format(path): guess1 = mimetypes.guess_type(path) _type = None if guess1[0]: _type = guess1[0] else: guess2 = filetype.guess(path) if guess2: _type = guess2.mime if _type == "application/pdf": return "pdf" if _type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": return "docx" if _type == "application/x-zip-compressed" or _type == "application/zip": return "zip" if _type == "application/x-rar-compressed" or _type == "application/rar": return "rar" if _type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": return "xlsx" if _type == "application/msword": return "doc" if _type == "image/png": return "png" if _type == "image/jpeg": return "jpg" # 猜不到,返回None return None @get_memory_info.memory_decorator def txt2text(path): logging.info("into txt2text") try: # 判断字符编码 with open(path, "rb") as ff: data = ff.read() encode = chardet.detect(data).get("encoding") print("txt2text judge code is", encode) try: if encode is None: logging.info("txt2text cannot judge file code!") return [-3] with open(path, "r", encoding=encode) as ff: txt_text = ff.read() return [txt_text] except: logging.info("txt2text cannot open file with code " + encode) return [-3] except Exception as e: print("txt2text", traceback.print_exc()) logging.info("txt2text error!") return [-1] @get_memory_info.memory_decorator def doc2text(path, unique_type_dir): logging.info("into doc2text") try: # 调用office格式转换 file_path = from_office_interface(path, unique_type_dir, 'docx') # if file_path == [-3]: # return [-3] if judge_error_code(file_path): return file_path text = docx2text(file_path, unique_type_dir) return text except Exception as e: logging.info("doc2text error!") print("doc2text", traceback.print_exc()) # log_traceback("doc2text") return [-1] @get_memory_info.memory_decorator def read_xml_order(path, save_path): logging.info("into read_xml_order") try: try: f = zipfile.ZipFile(path) for file in f.namelist(): if "word/document.xml" == str(file): f.extract(file, save_path) f.close() except Exception as e: # print("docx format error!", e) logging.info("docx format error!") return [-3] # DOMTree = xml.dom.minidom.parse(save_path + "word/document.xml") # collection = DOMTree.documentElement try: collection = xml_analyze(save_path + "word/document.xml") except TimeoutError: logging.info("read_xml_order timeout") return [-4] body = collection.getElementsByTagName("w:body")[0] order_list = [] for line in body.childNodes: # print(str(line)) if "w:p" in str(line): text = line.getElementsByTagName("w:t") picture = line.getElementsByTagName("wp:docPr") if text: order_list.append("w:t") if picture: order_list.append("wp:docPr") for line1 in line.childNodes: if "w:r" in str(line1): # print("read_xml_order", "w:r") picture1 = line1.getElementsByTagName("w:pict") if picture1: order_list.append("wp:docPr") if "w:tbl" in str(line): order_list.append("w:tbl") read_xml_table(path, save_path) return order_list except Exception as e: logging.info("read_xml_order error!") print("read_xml_order", traceback.print_exc()) # log_traceback("read_xml_order") return [-1] @get_memory_info.memory_decorator def read_xml_table(path, save_path): logging.info("into read_xml_table") try: # print("into read_xml_table") try: f = zipfile.ZipFile(path) for file in f.namelist(): if "word/document.xml" == str(file): f.extract(file, save_path) f.close() except Exception as e: # print("docx format error!", e) logging.info("docx format error!") return [-3] # DOMTree = xml.dom.minidom.parse(save_path + "word/document.xml") # collection = DOMTree.documentElement try: collection = xml_analyze(save_path + "word/document.xml") except TimeoutError: logging.info("read_xml_table timeout") return [-4] body = collection.getElementsByTagName("w:body")[0] table_text_list = [] # print("body.childNodes", body.childNodes) for line in body.childNodes: if "w:tbl" in str(line): # print("str(line)", str(line)) table_text = '' + "\n" tr_list = line.getElementsByTagName("w:tr") # print("line.childNodes", line.childNodes) tr_index = 0 tr_text_list = [] tr_text_list_colspan = [] for tr in tr_list: table_text = table_text + "" + "\n" tc_list = tr.getElementsByTagName("w:tc") tc_index = 0 tc_text_list = [] for tc in tc_list: tc_text = "" # 获取一格占多少列 col_span = tc.getElementsByTagName("w:gridSpan") if col_span: col_span = int(col_span[0].getAttribute("w:val")) else: col_span = 1 # 获取是否是合并单元格的下一个空单元格 is_merge = tc.getElementsByTagName("w:vMerge") if is_merge: is_merge = is_merge[0].getAttribute("w:val") if is_merge == "continue": col_span_index = 0 real_tc_index = 0 # if get_platform() == "Windows": # print("read_xml_table tr_text_list", tr_text_list) # print("read_xml_table tr_index", tr_index) if 0 <= tr_index - 1 < len(tr_text_list): for tc_colspan in tr_text_list[tr_index - 1]: if col_span_index < tc_index: col_span_index += tc_colspan[1] real_tc_index += 1 # print("tr_index-1, real_tc_index", tr_index-1, real_tc_index) # print(tr_text_list[tr_index-1]) if real_tc_index < len(tr_text_list[tr_index - 1]): tc_text = tr_text_list[tr_index - 1][real_tc_index][0] table_text = table_text + "" + "\n" tc_index += 1 tc_text_list.append([tc_text, col_span]) table_text += "" + "\n" tr_index += 1 tr_text_list.append(tc_text_list) table_text += "
" + "\n" p_list = tc.getElementsByTagName("w:p") for p in p_list: t = p.getElementsByTagName("w:t") if t: for tt in t: # print("tt", tt.childNodes) if len(tt.childNodes) > 0: tc_text += tt.childNodes[0].nodeValue tc_text += "\n" table_text = table_text + tc_text + "
" + "\n" table_text_list.append(table_text) return table_text_list except Exception as e: logging.info("read_xml_table error") print("read_xml_table", traceback.print_exc()) # log_traceback("read_xml_table") return [-1] @get_memory_info.memory_decorator @timeout_decorator.timeout(300, timeout_exception=TimeoutError) def xml_analyze(path): # 解析xml DOMTree = xml.dom.minidom.parse(path) collection = DOMTree.documentElement return collection def read_docx_table(document): table_text_list = [] for table in document.tables: table_text = "\n" print("==================") for row in table.rows: table_text += "\n" for cell in row.cells: table_text += "\n" table_text += "\n" table_text += "
" + cell.text + "
\n" print(table_text) table_text_list.append(table_text) return table_text_list @get_memory_info.memory_decorator def docx2text(path, unique_type_dir): logging.info("into docx2text") try: try: doc = docx.Document(path) except Exception as e: print("docx format error!", e) print(traceback.print_exc()) logging.info("docx format error!") return [-3] # 遍历段落 # print("docx2text extract paragraph") paragraph_text_list = [] for paragraph in doc.paragraphs: if paragraph.text != "": paragraph_text_list.append("
" + paragraph.text + "
" + "\n") # print("paragraph_text", paragraph.text) # 遍历表 try: table_text_list = read_xml_table(path, unique_type_dir) except TimeoutError: return [-4] if judge_error_code(table_text_list): return table_text_list # 顺序遍历图片 # print("docx2text extract image") image_text_list = [] temp_image_path = unique_type_dir + "temp_image.png" pattern = re.compile('rId\d+') for graph in doc.paragraphs: for run in graph.runs: if run.text == '': try: if not pattern.search(run.element.xml): continue content_id = pattern.search(run.element.xml).group(0) content_type = doc.part.related_parts[content_id].content_type except Exception as e: print("docx no image!", e) continue if not content_type.startswith('image'): continue # 写入临时文件 img_data = doc.part.related_parts[content_id].blob with open(temp_image_path, 'wb') as f: f.write(img_data) # if get_platform() == "Windows": # print("img_data", img_data) if img_data is None: continue # 识别图片文字 image_text = picture2text(temp_image_path) if image_text == [-2]: return [-2] if image_text == [-1]: return [-1] if image_text == [-3]: continue image_text = image_text[0] image_text_list.append(add_div(image_text)) # 解析document.xml,获取文字顺序 # print("docx2text extract order") order_list = read_xml_order(path, unique_type_dir) if order_list == [-2]: return [-2] if order_list == [-1]: return [-1] text = "" print("len(order_list)", len(order_list)) print("len(paragraph_text_list)", len(paragraph_text_list)) print("len(image_text_list)", len(image_text_list)) print("len(table_text_list)", len(table_text_list)) # log("docx2text output in order") for tag in order_list: if tag == "w:t": if len(paragraph_text_list) > 0: text += paragraph_text_list.pop(0) if tag == "wp:docPr": if len(image_text_list) > 0: text += image_text_list.pop(0) if tag == "w:tbl": if len(table_text_list) > 0: text += table_text_list.pop(0) return [text] except Exception as e: # print("docx2text", e, global_type) logging.info("docx2text error!") print("docx2text", traceback.print_exc()) # log_traceback("docx2text") return [-1] def add_div(text): if text == "" or text is None: return text if get_platform() == "Windows": print("add_div", text) if re.findall("
", text): return text text = "
" + text + "\n" text = re.sub("\n", "
\n
", text) # text += "
" if text[-5:] == "
": print("add_div has cut", text[-30:]) text = text[:-5] return text @get_memory_info.memory_decorator def pdf2Image(path, save_dir): logging.info("into pdf2Image") try: try: doc = fitz.open(path) except Exception as e: logging.info("pdf format error!") # print("pdf format error!", e) return [-3] output_image_list = [] for page_no in range(doc.page_count): # 限制pdf页数,只取前100页 if page_no >= 70: logging.info("pdf2Image: pdf pages count " + str(doc.page_count) + ", only get 70 pages") break try: page = doc.loadPage(page_no) output = save_dir + "_page" + str(page_no) + ".png" rotate = int(0) # 每个尺寸的缩放系数为1.3,这将为我们生成分辨率提高2.6的图像。 # 此处若是不做设置,默认图片大小为:792X612, dpi=96 # (1.33333333-->1056x816) (2-->1584x1224) zoom_x = 1.33333333 zoom_y = 1.33333333 # mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) pix = page.getPixmap(matrix=mat, alpha=False) pix.writePNG(output) output_image_list.append(output) except ValueError as e: traceback.print_exc() if str(e) == "page not in document": logging.info("pdf2Image page not in document! continue..." + str(page_no)) continue elif "encrypted" in str(e): logging.info("pdf2Image document need password " + str(page_no)) return [-7] except RuntimeError as e: if "cannot find page" in str(e): logging.info("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e)) continue else: traceback.print_exc() return [-3] return output_image_list except Exception as e: logging.info("pdf2Image error!") print("pdf2Image", traceback.print_exc()) return [-1] def image_preprocess(image_np, image_path, use_ocr=True): logging.info("into image_preprocess") try: # 长 宽 # resize_size = (1024, 768) # 限制图片大小 # resize_image(image_path, resize_size) # 图片倾斜校正,写入原来的图片路径 g_r_i = get_rotated_image(image_np, image_path) if g_r_i == [-1]: return [-1], [], [], 0 # otr需要图片resize, 写入另一个路径 image_np = cv2.imread(image_path) best_h, best_w = get_best_predict_size(image_np) image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA) image_resize_path = image_path[:-4] + "_resize" + image_path[-4:] cv2.imwrite(image_resize_path, image_resize) # 调用otr模型接口 with open(image_resize_path, "rb") as f: image_bytes = f.read() points, split_lines, bboxes, outline_points = from_otr_interface(image_bytes) if judge_error_code(points): return points, [], [], 0 # 将resize后得到的bbox根据比例还原 ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) for i in range(len(bboxes)): bbox = bboxes[i] bboxes[i] = [(int(bbox[0][0]*ratio[1]), int(bbox[0][1]*ratio[0])), (int(bbox[1][0]*ratio[1]), int(bbox[1][1]*ratio[0]))] # 查看是否能输出正确框 # for box in bboxes: # cv2.rectangle(image_np, box[0], box[1], (0, 255, 0), 3) # cv2.imshow("bbox", image_np) # cv2.waitKey(0) # 调用ocr模型接口 with open(image_path, "rb") as f: image_bytes = f.read() # 有表格 if len(bboxes) >= 2: text_list, bbox_list = from_ocr_interface(image_bytes, True) if judge_error_code(text_list): return text_list, [], [], 0 # for i in range(len(text_list)): # print(text_list[i], bbox_list[i]) # 查看是否能输出正确框 # for box in bbox_list: # cv2.rectangle(image_np, (int(box[0][0]), int(box[0][1])), # (int(box[2][0]), int(box[2][1])), (0, 255, 0), 1) # cv2.imshow("bbox", image_np) # cv2.waitKey(0) text, column_list = get_formatted_table(text_list, bbox_list, bboxes, split_lines) if judge_error_code(text): return text, [], [], 0 is_table = 1 return text, column_list, outline_points, is_table # 无表格 else: if use_ocr: text = from_ocr_interface(image_bytes) if judge_error_code(text): return text, [], [], 0 is_table = 0 return text, [], [], is_table else: is_table = 0 return None, [], [], is_table except Exception as e: logging.info("image_preprocess error") print("image_preprocess", traceback.print_exc()) return [-1], [], [], 0 def get_best_predict_size(image_np): sizes = [1280, 1152, 1024, 896, 768, 640, 512, 384, 256, 128] min_len = 10000 best_height = sizes[0] for height in sizes: if abs(image_np.shape[0] - height) < min_len: min_len = abs(image_np.shape[0] - height) best_height = height min_len = 10000 best_width = sizes[0] for width in sizes: if abs(image_np.shape[1] - width) < min_len: min_len = abs(image_np.shape[1] - width) best_width = width return best_height, best_width @get_memory_info.memory_decorator def pdf2text(path, unique_type_dir): logging.info("into pdf2text") try: # pymupdf pdf to image save_dir = path.split(".")[-2] + "_" + path.split(".")[-1] output_image_list = pdf2Image(path, save_dir) if judge_error_code(output_image_list): return output_image_list # 获取每页pdf提取的文字、表格的列数、轮廓点、是否含表格、页码 page_info_list = [] page_no = 0 for img_path in output_image_list: print("pdf page", page_no, "in total", len(output_image_list)) # 读不出来的跳过 try: img = cv2.imread(img_path) img_size = img.shape except: logging.info("pdf2text read image in page fail! continue...") continue # print("pdf2text img_size", img_size) text, column_list, outline_points, is_table = image_preprocess(img, img_path, use_ocr=False) if judge_error_code(text): return text page_info_list.append([text, column_list, outline_points, is_table, page_no, img_size]) page_no += 1 # print("pdf2text", page_info_list) # 包含table的和不包含table的 has_table_list = [] has_table_page_no_list = [] no_table_list = [] no_table_page_no_list = [] for page_info in page_info_list: # 含表格不含表格分开 if not page_info[3]: no_table_list.append(page_info) no_table_page_no_list.append(page_info[4]) else: has_table_list.append(page_info) has_table_page_no_list.append(page_info[4]) # 页码表格连接 table_connect_list, connect_text_list = page_table_connect(has_table_list, page_info_list) # table_connect_list, connect_text_list = [], [] if judge_error_code(table_connect_list): return table_connect_list # 连接的页码 table_connect_page_no_list = [] for area in connect_text_list: table_connect_page_no_list.append(area[1]) # print("pdf2text table_connect_list", table_connect_list) # pdfminer 方式 try: fp = open(path, 'rb') # 用文件对象创建一个PDF文档分析器 parser = PDFParser(fp) # 创建一个PDF文档 doc = PDFDocument(parser) # 连接分析器,与文档对象 rsrcmgr = PDFResourceManager() device = PDFPageAggregator(rsrcmgr, laparams=LAParams()) interpreter = PDFPageInterpreter(rsrcmgr, device) # 判断是否能读pdf for page in PDFPage.create_pages(doc): break except pdfminer.psparser.PSEOF as e: # pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别 logging.info("pdf2text " + str(e) + " use ocr read pdf!") text_list = [] for page_info in page_info_list: page_no = page_info[4] # 表格 if page_info[3]: # 判断表格是否跨页连接 area_no = 0 jump_page = 0 for area in table_connect_list: if page_no in area: # 只记录一次text if page_no == area[0]: image_text = connect_text_list[area_no][0] text_list.append([image_text, page_no, 0]) jump_page = 1 area_no += 1 # 是连接页的跳过后面步骤 if jump_page: continue # 直接取text image_text = page_info_list[page_no][0] text_list.append([image_text, page_no, 0]) # 非表格 else: with open(output_image_list[page_no], "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) text_list.append([image_text, page_no, 0]) text_list.sort(key=lambda z: z[1]) text = "" for t in text_list: text += t[0] return [text] except Exception as e: logging.info("pdf format error!") traceback.print_exc() return [-3] text_list = [] page_no = 0 pages = PDFPage.create_pages(doc) for page in pages: logging.info("pdf2text page_no " + str(page_no)) # 限制pdf页数,只取前100页 if page_no >= 70: logging.info("pdf2text: pdf pages only get 100 pages") break # 判断页码在含表格页码中,直接拿已生成的text if page_no in has_table_page_no_list: # 判断表格是否跨页连接 area_no = 0 jump_page = 0 for area in table_connect_list: if page_no in area: # 只记录一次text if page_no == area[0]: image_text = connect_text_list[area_no][0] text_list.append([image_text, page_no, 0]) jump_page = 1 area_no += 1 # 是连接页的跳过后面步骤 if jump_page: page_no += 1 continue # 直接取text image_text = page_info_list[page_no][0] text_list.append([image_text, page_no, 0]) page_no += 1 continue # 不含表格的解析pdf else: if get_platform() == "Windows": try: interpreter.process_page(page) layout = device.get_result() except Exception: logging.info("pdf2text pdfminer read pdf page error! continue...") continue else: # 设置超时时间 try: # 解析pdf中的不含表格的页 if get_platform() == "Windows": origin_pdf_analyze = pdf_analyze.__wrapped__ layout = origin_pdf_analyze(interpreter, page, device) else: layout = pdf_analyze(interpreter, page, device) except TimeoutError as e: logging.info("pdf2text pdfminer read pdf page time out!") return [-4] except Exception: logging.info("pdf2text pdfminer read pdf page error! continue...") continue # 判断该页有没有文字对象,没有则有可能是有水印 only_image = 1 image_count = 0 for x in layout: if isinstance(x, LTTextBoxHorizontal): only_image = 0 if isinstance(x, LTFigure): image_count += 1 # 如果该页图片数量过多,直接ocr整页识别 logging.info("pdf2text image_count" + str(image_count)) if image_count >= 3: with open(output_image_list[page_no], "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) if judge_error_code(image_text): return image_text text_list.append([image_text, page_no, 0]) page_no += 1 continue order_list = [] for x in layout: if get_platform() == "Windows": # print("x", page_no, x) print() if isinstance(x, LTTextBoxHorizontal): image_text = x.get_text() # 无法识别编码,用ocr if re.search('[(]cid:[0-9]+[)]', image_text): print(re.search('[(]cid:[0-9]+[)]', image_text)) with open(output_image_list[page_no], "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) if judge_error_code(image_text): return image_text image_text = add_div(image_text) order_list.append([image_text, page_no, x.bbox[1]]) break else: image_text = add_div(image_text) order_list.append([image_text, page_no, x.bbox[1]]) continue if isinstance(x, LTFigure): for image in x: if isinstance(image, LTImage): try: print(image.width, image.height) image_stream = image.stream.get_data() # 有些水印导致pdf分割、读取报错 # if image.width <= 200 and image.height<=200: # continue # img_test = Image.open(io.BytesIO(image_stream)) # img_test.save('temp/LTImage.jpg') # 查看提取的图片高宽,太大则抛错用另一张图 img_test = Image.open(io.BytesIO(image_stream)) if img_test.size[1] > 2000 or img_test.size[0] > 1500: print("pdf2text LTImage size", img_test.size) raise Exception img_test.save('temp/LTImage.jpg') # except pdfminer.pdftypes.PDFNotImplementedError: # with open(output_image_list[page_no], "rb") as ff: # image_stream = ff.read() except Exception: logging.info("pdf2text pdfminer read image in page fail! use pymupdf read image...") print(traceback.print_exc()) with open(output_image_list[page_no], "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) if judge_error_code(image_text): return image_text # 判断只拿到了水印图: 无文字输出且只有图片对象 if image_text == "" and only_image: # 拆出该页pdf try: logging.info("pdf2text guess pdf has watermark") split_path = get_single_pdf(path, page_no) except: # 如果拆分抛异常,则大概率不是水印图,用ocr识别图片 logging.info("pdf2text guess pdf has no watermark") with open(output_image_list[page_no], "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) image_text = image_text order_list.append([image_text, page_no, x.bbox[1]]) continue if judge_error_code(split_path): return split_path # 调用office格式转换 file_path = from_office_interface(split_path, unique_type_dir, 'html', 3) # if file_path == [-3]: # return [-3] if judge_error_code(file_path): return file_path # 获取html文本 image_text = get_html_p(file_path) if judge_error_code(image_text): return image_text if get_platform() == "Windows": print("image_text", page_no, image_text) with open("temp" + str(x.bbox[0]) + ".jpg", "wb") as ff: ff.write(image_stream) image_text = add_div(image_text) order_list.append([image_text, page_no, x.bbox[1]]) if get_platform() == "Windows": print("order_list", page_no, order_list) order_list.sort(key=lambda z: z[2], reverse=True) text_list += order_list page_no += 1 text = "" for t in text_list: # text += add_div(t[0]) text += t[0] return [text] except UnicodeDecodeError as e: logging.info("pdf2text pdfminer create pages failed! " + str(e)) return [-3] except Exception as e: logging.info("pdf2text error!") print("pdf2text", traceback.print_exc()) return [-1] @get_memory_info.memory_decorator @timeout_decorator.timeout(300, timeout_exception=TimeoutError) def pdf_analyze(interpreter, page, device): logging.info("into pdf_analyze") # 解析pdf中的不含表格的页 pdf_time = time.time() print("pdf_analyze interpreter process...") interpreter.process_page(page) print("pdf_analyze device get_result...") layout = device.get_result() logging.info("pdf2text read time " + str(time.time()-pdf_time)) return layout def get_html_p(html_path): logging.info("into get_html_p") try: with open(html_path, "r") as ff: html_str = ff.read() soup = BeautifulSoup(html_str, 'lxml') text = "" for p in soup.find_all("p"): p_text = p.text p_text = p_text.strip() if p.string != "": text += p_text text += "\n" return text except Exception as e: logging.info("get_html_p error!") print("get_html_p", traceback.print_exc()) return [-1] def get_single_pdf(path, page_no): logging.info("into get_single_pdf") try: # print("path, ", path) pdf_origin = PdfFileReader(path, strict=False) pdf_new = PdfFileWriter() pdf_new.addPage(pdf_origin.getPage(page_no)) path_new = path.split(".")[0] + "_split.pdf" with open(path_new, "wb") as ff: pdf_new.write(ff) return path_new except PyPDF2.utils.PdfReadError as e: raise e except Exception as e: logging.info("get_single_pdf error! page " + str(page_no)) print("get_single_pdf", traceback.print_exc()) raise e def page_table_connect(has_table_list, page_info_list): logging.info("into page_table_connect") try: # 判断是否有页码的表格相连 table_connect_list = [] temp_list = [] # 离图片顶部或底部距离 threshold = 100 for i in range(1, len(has_table_list)): page_info = has_table_list[i] last_page_info = has_table_list[i - 1] # 页码需相连 if page_info[4] - last_page_info[4] == 1: # 上一页的最后一个列数和下一页的第一个列数都为0,且相等 if not last_page_info[1][-1] and not page_info[1][0] and \ last_page_info[1][-1] == page_info[1][0]: # 上一页的轮廓点要离底部一定距离内,下一页的轮廓点要离顶部一定距离内 if page_info[5][0] - last_page_info[2][-1][1][1] <= threshold and \ page_info[2][0][0][1] - 0 <= 100: # print("page_table_connect accept") temp_list.append(last_page_info[4]) temp_list.append(page_info[4]) continue # 条件不符合的,存储之前保存的连接页码 if len(temp_list) > 1: temp_list = list(set(temp_list)) temp_list.sort(key=lambda x: x) table_connect_list.append(temp_list) temp_list = [] if len(temp_list) > 1: temp_list = list(set(temp_list)) temp_list.sort(key=lambda x: x) table_connect_list.append(temp_list) temp_list = [] # 连接两页内容 connect_text_list = [] for area in table_connect_list: first_page_no = area[0] area_page_text = str(page_info_list[first_page_no][0]) # print("area_page_text", area_page_text) for i in range(1, len(area)): current_page_no = area[i] current_page_text = page_info_list[current_page_no][0] # 连接两个table table_prefix = re.finditer('', current_page_text) index_list = [] for t in table_prefix: index_list.append(t.span()) delete_index = index_list[0] current_page_text = current_page_text[:delete_index[0]] \ + current_page_text[delete_index[1]:] # current_page_text = current_page_text[18:] # print("current_page_text", current_page_text[:30]) # print("current_page_text", current_page_text) table_suffix = re.finditer('
', area_page_text) index_list = [] for t in table_suffix: index_list.append(t.span()) delete_index = index_list[-1] area_page_text = area_page_text[:delete_index[0]] \ + area_page_text[delete_index[1]:] # area_page_text = area_page_text[:-9] # print("area_page_text", area_page_text[-20:]) area_page_text = area_page_text + current_page_text connect_text_list.append([area_page_text, area]) return table_connect_list, connect_text_list except Exception as e: # print("page_table_connect", e) logging.info("page_table_connect error!") print("page_table_connect", traceback.print_exc()) return [-1], [-1] @get_memory_info.memory_decorator def zip2text(path, unique_type_dir): logging.info("into zip2text") try: zip_path = unique_type_dir try: zip_file = zipfile.ZipFile(path) zip_list = zip_file.namelist() # print("zip list namelist", zip_list) if get_platform() == "Windows": if os.path.exists(zip_list[0]): print("zip2text exists") # 循环解压文件到指定目录 file_list = [] for f in zip_list: file_list.append(zip_file.extract(f, path=zip_path)) # zip_file.extractall(path=zip_path) zip_file.close() # 获取文件名 # file_list = [] # for root, dirs, files in os.walk(zip_path, topdown=False): # for name in dirs: # file_list.append(os.path.join(root, name) + os.sep) # for name in files: # file_list.append(os.path.join(root, name)) # # # if get_platform() == "Windows": # # print("file_list", file_list) # # # 过滤掉doc缓存文件 # temp_list = [] # for f in file_list: # if re.search("~\$", f): # continue # else: # temp_list.append(f) # file_list = temp_list except Exception as e: logging.info("zip format error!") print("zip format error!", traceback.print_exc()) return [-3] # 内部文件重命名 # file_list = inner_file_rename(file_list) file_list = rename_inner_files(zip_path) if judge_error_code(file_list): return file_list if get_platform() == "Windows": print("============= zip file list") # print(file_list) text = [] for file in file_list: if os.path.isdir(file): continue # 无文件后缀,猜格式 if len(file.split(".")) <= 1: logging.info(str(file) + " has no type! Guess type...") _type = judge_format(file) if _type is None: logging.info(str(file) + "cannot guess type!") sub_text = [""] else: logging.info(str(file) + " guess type: " + _type) new_file = str(file) + "." + _type os.rename(file, new_file) file = new_file sub_text = getText(_type, file) # 有文件后缀,截取 else: _type = file.split(".")[-1] sub_text = getText(_type, file) if judge_error_code(sub_text, code=[-3]): continue if judge_error_code(sub_text): return sub_text text = text + sub_text return text except Exception as e: logging.info("zip2text error!") print("zip2text", traceback.print_exc()) return [-1] @get_memory_info.memory_decorator def rar2text(path, unique_type_dir): logging.info("into rar2text") try: rar_path = unique_type_dir try: # shell调用unrar解压 _signal = os.system("unrar x " + path + " " + rar_path) print("rar2text _signal", _signal) # =0, 解压成功 if _signal != 0: raise Exception except Exception as e: logging.info("rar format error!") print("rar format error!", e) return [-3] # 获取文件名 # file_list = [] # for root, dirs, files in os.walk(rar_path, topdown=False): # for name in dirs: # file_list.append(os.path.join(root, name) + os.sep) # for name in files: # file_list.append(os.path.join(root, name)) if get_platform() == "Windows": print("============= rar file list") # 内部文件重命名 # file_list = inner_file_rename(file_list) file_list = rename_inner_files(rar_path) if judge_error_code(file_list): return file_list text = [] for file in file_list: if os.path.isdir(file): continue # 无文件后缀,猜格式 if len(file.split(".")) <= 1: logging.info(str(file) + " has no type! Guess type...") _type = judge_format(file) if _type is None: logging.info(str(file) + "cannot guess type!") sub_text = [""] else: logging.info(str(file) + " guess type: " + _type) new_file = str(file) + "." + _type os.rename(file, new_file) file = new_file sub_text = getText(_type, file) # 有文件后缀,截取 else: _type = file.split(".")[-1] sub_text = getText(_type, file) if judge_error_code(sub_text, code=[-3]): continue if judge_error_code(sub_text): return sub_text # print("sub text", sub_text, file, _type) text = text + sub_text return text except Exception as e: logging.info("rar2text error!") print("rar2text", traceback.print_exc()) return [-1] def inner_file_rename(path_list): logging.info("into inner_file_rename") try: # 先过滤文件名中的点 '.' path_list.sort(key=lambda x: len(x), reverse=True) for i in range(len(path_list)): old_path = path_list[i] # 对于目录,判断最后一级是否需过滤,重命名 if os.path.isdir(old_path): ps = old_path.split(os.sep) old_p = ps[-2] if '.' in old_p: new_p = re.sub("\\.", "", old_p) new_path = "" for p in ps[:-2]: new_path += p + os.sep new_path += new_p + os.sep # 重命名,更新 # print("has .", path_list[i], new_path) os.rename(old_path, new_path) for j in range(len(path_list)): if old_path in path_list[j]: path_list[j] = re.sub(old_p, new_p, path_list[j]) + os.sep # 将path分割,按分割个数排名 path_len_list = [] for p in path_list: p_ss = p.split(os.sep) temp_p_ss = [] for pp in p_ss: if pp == "": continue temp_p_ss.append(pp) p_ss = temp_p_ss path_len_list.append([p, p_ss, len(p_ss)]) # 从路径分割少的开始改名,即从根目录开始改 path_len_list.sort(key=lambda x: x[2]) # for p in path_len_list: # print("---", p[1]) # 判断不用变的目录在第几级 no_change_level = 0 loop = 0 for p_s in path_len_list[0][1]: if p_s[-4:] == "_rar" or p_s[-4:] == "_zip": no_change_level += loop loop = 0 loop += 1 no_change_level += 1 # 每个 new_path_list = [] for path_len in path_len_list: # 前n个是固定路径 new_path = "" for i in range(no_change_level): new_path += path_len[1][i] + os.sep old_path = new_path if not get_platform() == "Windows": old_path = os.sep + old_path new_path = os.sep + new_path # print("path_len[1][3:]", path_len[1][3:]) count = 0 for p in path_len[1][no_change_level:]: # 新路径全部转换hash new_path += str(hash(p)) # 最后一个不加os.sep,并且旧路径最后一个不转换hash if count < len(path_len[1][no_change_level:]) - 1: old_path += str(hash(p)) + os.sep new_path += os.sep else: old_path += p count += 1 # path是文件夹再加os.sep if os.path.isdir(path_len[0]): new_path += os.sep old_path += os.sep # path是文件再加文件名后缀 else: p_ss = path_len[1][-1].split(".") if len(p_ss) > 1: path_suffix = "." + p_ss[-1] new_path += path_suffix print("inner_file_rename", old_path, "to", new_path) os.rename(old_path, new_path) new_path_list.append(new_path) return new_path_list except Exception as e: logging.info("inner_file_rename error!") print("inner_file_rename", traceback.print_exc()) return [-1] def rename_inner_files(root_path): try: logging.info("into rename_inner_files") # 获取解压文件夹下所有文件+文件夹,不带根路径 path_list = [] for root, dirs, files in os.walk(root_path, topdown=False): for name in dirs: p = os.path.join(root, name) + os.sep p = re.sub(root_path, "", p) path_list.append(p) for name in files: p = os.path.join(root, name) p = re.sub(root_path, "", p) path_list.append(p) # 按路径长度排序 path_list.sort(key=lambda x: len(x), reverse=True) # 循环改名 for old_path in path_list: # 按路径分隔符分割 ss = old_path.split(os.sep) # 判断是否文件夹 is_dir = 0 file_type = "" if os.path.isdir(root_path + old_path): ss = ss[:-1] is_dir = 1 else: if "." in old_path: file_type = "." + old_path.split(".")[-1] else: file_type = "" # 最后一级需要用hash改名 new_path = "" # new_path = re.sub(ss[-1], str(hash(ss[-1])), old_path) + file_type current_level = 0 for s in ss: # 路径拼接 if current_level < len(ss) - 1: new_path += s + os.sep else: new_path += str(hash(s)) + file_type current_level += 1 new_ab_path = root_path + new_path old_ab_path = root_path + old_path os.rename(old_ab_path, new_ab_path) # 重新获取解压文件夹下所有文件+文件夹 new_path_list = [] for root, dirs, files in os.walk(root_path, topdown=False): for name in dirs: new_path_list.append(os.path.join(root, name) + os.sep) for name in files: new_path_list.append(os.path.join(root, name)) # print("new_path_list", new_path_list) return new_path_list except: traceback.print_exc() return [-1] @get_memory_info.memory_decorator def xls2text(path, unique_type_dir): logging.info("into xls2text") try: # 调用libreoffice格式转换 file_path = from_office_interface(path, unique_type_dir, 'xlsx') # if file_path == [-3]: # return [-3] if judge_error_code(file_path): return file_path text = xlsx2text(file_path, unique_type_dir) # if text == [-1]: # return [-1] # if text == [-3]: # return [-3] if judge_error_code(text): return text return text except Exception as e: logging.info("xls2text error!") print("xls2text", traceback.print_exc()) return [-1] @get_memory_info.memory_decorator def xlsx2text(path, unique_type_dir): logging.info("into xlsx2text") try: try: # sheet_name=None, 即拿取所有sheet,存为dict df_dict = pandas.read_excel(path, header=None, keep_default_na=False, sheet_name=None) except Exception as e: logging.info("xlsx format error!") # print("xlsx format error!", e) return [-3] df_list = [sheet for sheet in df_dict.values()] sheet_text = "" for df in df_list: text = '' + "\n" for index, row in df.iterrows(): text = text + "" for r in row: text = text + "" + "\n" # print(text) text = text + "" + "\n" text = text + "
" + str(r) + "
" + "\n" sheet_text += text return [sheet_text] except Exception as e: logging.info("xlsx2text error!") print("xlsx2text", traceback.print_exc()) return [-1] @get_memory_info.memory_decorator def swf2text(path, unique_type_dir): logging.info("into swf2text") try: try: with open(path, 'rb') as f: swf_file = SWF(f) svg_exporter = SVGExporter() svg = swf_file.export(svg_exporter) # with open('swf_export.jpg', 'wb') as f: # f.write(svg.read()) swf_str = str(svg.getvalue(), encoding='utf-8') except Exception as e: logging.info("swf format error!") traceback.print_exc() return [-3] # 正则匹配图片的信息位置 result0 = re.finditer(']*)', swf_str) image_bytes_list = [] i = 0 image_path_prefix = path.split(".")[-2] + "_" + path.split(".")[-1] image_path_list = [] for r in result0: # 截取图片信息所在位置 swf_str0 = swf_str[r.span()[0]:r.span()[1] + 1] # 正则匹配得到图片的base64编码 result1 = re.search('xlink:href="data:(.[^>]*)', swf_str0) swf_str1 = swf_str0[result1.span()[0]:result1.span()[1]] reg1_prefix = 'b\'' result1 = re.search(reg1_prefix + '(.[^\']*)', swf_str1) swf_str1 = swf_str1[result1.span()[0] + len(reg1_prefix):result1.span()[1]] # base64_str -> base64_bytes -> no "\\" base64_bytes -> bytes -> image base64_bytes_with_double = bytes(swf_str1, "utf-8") base64_bytes = codecs.escape_decode(base64_bytes_with_double, "hex-escape")[0] image_bytes = base64.b64decode(base64_bytes) image_bytes_list.append(image_bytes) image_path = image_path_prefix + "_page_" + str(i) + ".png" with open(image_path, 'wb') as f: f.write(image_bytes) image_path_list.append(image_path) # 正则匹配得到图片的宽高 # reg2_prefix = 'width="' # result2 = re.search(reg2_prefix + '(\d+)', swf_str0) # swf_str2 = swf_str0[result2.span()[0]+len(reg2_prefix):result2.span()[1]] # width = swf_str2 # reg2_prefix = 'height="' # result2 = re.search(reg2_prefix + '(\d+)', swf_str0) # swf_str2 = swf_str0[result2.span()[0]+len(reg2_prefix):result2.span()[1]] # height = swf_str2 i += 1 text_list = [] # print("image_path_list", image_path_list) for image_path in image_path_list: text = picture2text(image_path) # print("text", text) if judge_error_code(text, code=[-3]): continue if judge_error_code(text): return text text = text[0] text_list.append(text) text = "" for t in text_list: text += t return [text] except Exception as e: logging.info("swf2text error!") print("swf2text", traceback.print_exc()) return [-1] @get_memory_info.memory_decorator def picture2text(path, html=False): logging.info("into picture2text") try: # 判断图片中表格 img = cv2.imread(path) if img is None: return [-3] # if get_platform() == "Windows": # print("picture2text img", img) text, column_list, outline_points, is_table = image_preprocess(img, path) if judge_error_code(text): return text # if text == [-5]: # return [-5] # if text == [-2]: # return [-2] # if text == [-1]: # return [-1] if html: text = add_div(text) return [text] except Exception as e: logging.info("picture2text error!") print("picture2text", traceback.print_exc()) return [-1] port_num = [0] def choose_port(): process_num = 4 if port_num[0] % process_num == 0: _url = local_url + ":15011" elif port_num[0] % process_num == 1: _url = local_url + ":15012" elif port_num[0] % process_num == 2: _url = local_url + ":15013" elif port_num[0] % process_num == 3: _url = local_url + ":15014" port_num[0] = port_num[0] + 1 return _url @get_memory_info.memory_decorator def from_ocr_interface(image_stream, is_table=False): logging.info("into from_ocr_interface") try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: r = ocr(data=base64_stream, ocr_model=globals().get("global_ocr_model")) except TimeoutError: if is_table: return [-5], [-5] else: return [-5] except requests.exceptions.ConnectionError as e: if is_table: return [-2], [-2] else: return [-2] _dict = r text_list = eval(_dict.get("text")) bbox_list = eval(_dict.get("bbox")) if text_list is None: text_list = [] if bbox_list is None: bbox_list = [] if is_table: return text_list, bbox_list else: if text_list and bbox_list: text = get_sequential_data(text_list, bbox_list, html=True) if judge_error_code(text): return text # if text == [-1]: # return [-1] else: text = "" return text except Exception as e: logging.info("from_ocr_interface error!") # print("from_ocr_interface", e, global_type) if is_table: return [-1], [-1] else: return [-1] @get_memory_info.memory_decorator def from_otr_interface(image_stream): logging.info("into from_otr_interface") try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: r = otr(data=base64_stream, otr_model=globals().get("global_otr_model")) except TimeoutError: return [-5], [-5], [-5], [-5] except requests.exceptions.ConnectionError as e: logging.info("from_otr_interface") print("from_otr_interface", traceback.print_exc()) return [-2], [-2], [-2], [-2] # 处理结果 _dict = r points = eval(_dict.get("points")) split_lines = eval(_dict.get("split_lines")) bboxes = eval(_dict.get("bboxes")) outline_points = eval(_dict.get("outline_points")) # print("from_otr_interface len(bboxes)", len(bboxes)) if points is None: points = [] if split_lines is None: split_lines = [] if bboxes is None: bboxes = [] if outline_points is None: outline_points = [] return points, split_lines, bboxes, outline_points except Exception as e: logging.info("from_otr_interface error!") print("from_otr_interface", traceback.print_exc()) return [-1], [-1], [-1], [-1] def from_office_interface(src_path, dest_path, target_format, retry_times=1): try: # Win10跳出超时装饰器 if get_platform() == "Windows": # origin_office_convert = office_convert.__wrapped__ # file_path = origin_office_convert(src_path, dest_path, target_format, retry_times) file_path = office_convert(src_path, dest_path, target_format, retry_times) else: # 将装饰器包装为一个类,否则多进程Pickle会报错 it's not the same object as xxx 问题, # timeout_decorator_obj = my_timeout_decorator.TimeoutClass(office_convert, 180, TimeoutError) # file_path = timeout_decorator_obj.run(src_path, dest_path, target_format, retry_times) file_path = office_convert(src_path, dest_path, target_format, retry_times) if judge_error_code(file_path): return file_path return file_path except TimeoutError: logging.info("from_office_interface timeout error!") return [-5] except: logging.info("from_office_interface error!") print("from_office_interface", traceback.print_exc()) return [-1] def get_sequential_data(text_list, bbox_list, html=False): logging.info("into get_sequential_data") try: text = "" order_list = [] for i in range(len(text_list)): length_start = bbox_list[i][0][0] length_end = bbox_list[i][1][0] height_start = bbox_list[i][0][1] height_end = bbox_list[i][-1][1] # print([length_start, length_end, height_start, height_end]) order_list.append([text_list[i], length_start, length_end, height_start, height_end]) # text = text + infomation['text'] + "\n" if get_platform() == "Windows": print("get_sequential_data", order_list) if not order_list: if get_platform() == "Windows": print("get_sequential_data", "no order list") return "" # 根据bbox的坐标对输出排序 order_list.sort(key=lambda x: (x[3], x[1])) # 根据bbox分行分列 # col_list = [] # height_end = int((order_list[0][4] + order_list[0][3]) / 2) # for i in range(len(order_list)): # if height_end - threshold <= order_list[i][3] <= height_end + threshold: # col_list.append(order_list[i]) # else: # row_list.append(col_list) # col_list = [] # height_end = int((order_list[i][4] + order_list[i][3]) / 2) # col_list.append(order_list[i]) # if i == len(order_list) - 1: # row_list.append(col_list) row_list = [] used_box = [] threshold = 5 for box in order_list: if box in used_box: continue height_center = (box[4] + box[3]) / 2 row = [] for box2 in order_list: if box2 in used_box: continue height_center2 = (box2[4] + box2[3]) / 2 if height_center - threshold <= height_center2 <= height_center + threshold: if box2 not in row: row.append(box2) used_box.append(box2) row.sort(key=lambda x: x[0]) row_list.append(row) for row in row_list: if not row: continue if len(row) <= 1: text = text + row[0][0] + "\n" else: sub_text = "" row.sort(key=lambda x: x[1]) for col in row: sub_text = sub_text + col[0] + " " sub_text = sub_text + "\n" text += sub_text if html: text = "
" + text text = re.sub("\n", "
\n
", text) text += "
" # if text[-5:] == "
": # text = text[:-5] return text except Exception as e: logging.info("get_sequential_data error!") print("get_sequential_data", traceback.print_exc()) return [-1] def get_formatted_table(text_list, text_bbox_list, table_bbox_list, split_line): logging.info("into get_formatted_table") try: # 重新定义text_bbox_list,[point, point, text] text_bbox_list = [[text_bbox_list[i][0], text_bbox_list[i][2], text_list[i]] for i in range(len(text_bbox_list))] # 按纵坐标排序 text_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # print("text_bbox_list", text_bbox_list) # print("table_bbox_list", table_bbox_list) # bbox位置 threshold threshold = 5 # 根据split_line分区,可能有个区多个表格 [(), ()] area_text_bbox_list = [] area_table_bbox_list = [] # print("get_formatted_table, split_line", split_line) for j in range(1, len(split_line)): last_y = split_line[j - 1][0][1] current_y = split_line[j][0][1] temp_text_bbox_list = [] temp_table_bbox_list = [] # 找出该区域下text bbox for text_bbox in text_bbox_list: # 计算 text bbox 中心点 text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, (text_bbox[1][1] + text_bbox[0][1]) / 2) if last_y - threshold <= text_bbox_center[1] <= current_y + threshold: temp_text_bbox_list.append(text_bbox) area_text_bbox_list.append(temp_text_bbox_list) # 找出该区域下table bbox for table_bbox in table_bbox_list: # 计算 table bbox 中心点 table_bbox_center = ((table_bbox[1][0] + table_bbox[0][0]) / 2, (table_bbox[1][1] + table_bbox[0][1]) / 2) if last_y < table_bbox_center[1] < current_y: temp_table_bbox_list.append(table_bbox) area_table_bbox_list.append(temp_table_bbox_list) # 对每个区域分别进行两个bbox匹配,生成表格 area_text_list = [] area_column_list = [] for j in range(len(area_text_bbox_list)): # 每个区域的table bbox 和text bbox temp_table_bbox_list = area_table_bbox_list[j] temp_text_bbox_list = area_text_bbox_list[j] # 判断该区域有无表格bbox # 若无表格,将该区域文字连接 if not temp_table_bbox_list: # 找出该区域的所有text bbox only_text_list = [] only_bbox_list = [] for text_bbox in temp_text_bbox_list: only_text_list.append(text_bbox[2]) only_bbox_list.append([text_bbox[0], text_bbox[1]]) only_text = get_sequential_data(only_text_list, only_bbox_list, True) if only_text == [-1]: return [-1], [-1] area_text_list.append(only_text) area_column_list.append(0) continue # 有表格 # 文本对应的表格格子 text_in_table = {} for i in range(len(temp_text_bbox_list)): text_bbox = temp_text_bbox_list[i] # 计算 text bbox 中心点 text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, (text_bbox[1][1] + text_bbox[0][1]) / 2) # 判断中心点在哪个table bbox中 for table_bbox in temp_table_bbox_list: # 中心点在table bbox中,将text写入字典 if table_bbox[0][0] <= text_bbox_center[0] <= table_bbox[1][0] and \ table_bbox[0][1] <= text_bbox_center[1] <= table_bbox[1][1]: if str(table_bbox) in text_in_table.keys(): text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] else: text_in_table[str(table_bbox)] = text_bbox[2] break # 如果未找到text bbox匹配的table bbox,加大threshold匹配 # elif (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]) or \ # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]): # if str(table_bbox) in text_in_table.keys(): # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # else: # text_in_table[str(table_bbox)] = text_bbox[2] # break # 对表格格子进行分行分列,并计算总计多少小列 # 放入坐标 all_col_list = [] all_row_list = [] for i in range(len(temp_table_bbox_list)): table_bbox = temp_table_bbox_list[i] # 放入所有坐标x if table_bbox[0][0] not in all_col_list: all_col_list.append(table_bbox[0][0]) if table_bbox[1][0] not in all_col_list: all_col_list.append(table_bbox[1][0]) # 放入所有坐标y if table_bbox[0][1] not in all_row_list: all_row_list.append(table_bbox[0][1]) if table_bbox[1][1] not in all_row_list: all_row_list.append(table_bbox[1][1]) all_col_list.sort(key=lambda x: x) all_row_list.sort(key=lambda x: x) # 分行 row_list = [] rows = [] temp_table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0], x[1][1], x[1][0])) y_row = temp_table_bbox_list[0][0][1] for i in range(len(temp_table_bbox_list)): table_bbox = temp_table_bbox_list[i] if y_row - threshold <= table_bbox[0][1] <= y_row + threshold: rows.append(table_bbox) else: y_row = table_bbox[0][1] if rows: rows.sort(key=lambda x: x[0][0]) row_list.append(rows) rows = [] rows.append(table_bbox) # print("*" * 30) # print(row_list) if i == len(temp_table_bbox_list) - 1: if rows: rows.sort(key=lambda x: x[0][0]) row_list.append(rows) # 生成表格,包括文字和格子宽度 area_column = [] text = '' + "\n" for row in row_list: text += "" + "\n" for col in row: # 计算bbox y坐标之间有多少其他点,+1即为所占行数 row_span = 1 for y in all_row_list: if col[0][1] < y < col[1][1]: if y - col[0][1] >= 2 and col[1][1] - y >= 2: row_span += 1 # 计算bbox x坐标之间有多少其他点,+1即为所占列数 col_span = 1 for x in all_col_list: if col[0][0] < x < col[1][0]: if x - col[0][0] >= 2 and col[1][0] - x >= 2: col_span += 1 text += "" + "\n" text += "" + "\n" text += "
" if str(col) in text_in_table.keys(): text += text_in_table.get(str(col)) else: text += "" text += "
" + "\n" # 计算最大column max_col_num = 0 for row in row_list: col_num = 0 for col in row: col_num += 1 if max_col_num < col_num: max_col_num = col_num area_text_list.append(text) area_column_list.append(max_col_num) text = "" if get_platform() == "Windows": print("get_formatted_table area_text_list", area_text_list) for area_text in area_text_list: text += area_text return text, area_column_list except Exception as e: logging.info("get_formatted_table error!") print("get_formatted_table", traceback.print_exc()) return [-1], [-1] def getText(_type, path_or_stream): print("file type - " + _type) logging.info("file type - " + _type) try: ss = path_or_stream.split(".") unique_type_dir = ss[-2] + "_" + ss[-1] + os.sep except: unique_type_dir = path_or_stream + "_" + _type + os.sep if _type == "pdf": return pdf2text(path_or_stream, unique_type_dir) if _type == "docx": return docx2text(path_or_stream, unique_type_dir) if _type == "zip": return zip2text(path_or_stream, unique_type_dir) if _type == "rar": return rar2text(path_or_stream, unique_type_dir) if _type == "xlsx": return xlsx2text(path_or_stream, unique_type_dir) if _type == "xls": return xls2text(path_or_stream, unique_type_dir) if _type == "doc": return doc2text(path_or_stream, unique_type_dir) if _type == "jpg" or _type == "png" or _type == "jpeg": return picture2text(path_or_stream) if _type == "swf": return swf2text(path_or_stream, unique_type_dir) if _type == "txt": return txt2text(path_or_stream) return [""] def to_html(path, text): with open(path, 'w') as f: f.write("") f.write('') f.write("") f.write(text) f.write("") def resize_image(image_path, size): try: image_np = cv2.imread(image_path) # print(image_np.shape) width = image_np.shape[1] height = image_np.shape[0] h_w_rate = height / width # width_standard = 900 # height_standard = 1400 width_standard = size[1] height_standard = size[0] width_new = int(height_standard / h_w_rate) height_new = int(width_standard * h_w_rate) if width > width_standard: image_np = cv2.resize(image_np, (width_standard, height_new)) elif height > height_standard: image_np = cv2.resize(image_np, (width_new, height_standard)) cv2.imwrite(image_path, image_np) # print("resize_image", image_np.shape) return except Exception as e: logging.info("resize_image") print("resize_image", e, global_type) return def remove_red_seal(image_np): """ 去除红色印章 """ # 获得红色通道 blue_c, green_c, red_c = cv2.split(image_np) # 多传入一个参数cv2.THRESH_OTSU,并且把阈值thresh设为0,算法会找到最优阈值 thresh, ret = cv2.threshold(red_c, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # print("remove_red_seal thresh", thresh) # 实测调整为95%效果好一些 filter_condition = int(thresh * 0.98) thresh1, red_thresh = cv2.threshold(red_c, filter_condition, 255, cv2.THRESH_BINARY) # 把图片转回 3 通道 image_and = np.expand_dims(red_thresh, axis=2) image_and = np.concatenate((image_and, image_and, image_and), axis=-1) # print(image_and.shape) # 膨胀 gray = cv2.cvtColor(image_and, cv2.COLOR_RGB2GRAY) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) erode = cv2.erode(gray, kernel) cv2.imshow("erode", erode) cv2.waitKey(0) image_and = np.bitwise_and(cv2.bitwise_not(blue_c), cv2.bitwise_not(erode)) result_img = cv2.bitwise_not(image_and) cv2.imshow("remove_red_seal", result_img) cv2.waitKey(0) return result_img def remove_underline(image_np): """ 去除文字下划线 """ # 灰度化 gray = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY) # 二值化 binary = cv2.adaptiveThreshold(~gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 15, 10) # Sobel kernel_row = np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], np.float32) kernel_col = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], np.float32) # binary = cv2.filter2D(binary, -1, kernel=kernel) binary_row = cv2.filter2D(binary, -1, kernel=kernel_row) binary_col = cv2.filter2D(binary, -1, kernel=kernel_col) cv2.imshow("custom_blur_demo", binary) cv2.waitKey(0) rows, cols = binary.shape # 识别横线 scale = 5 kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (cols // scale, 1)) erodedcol = cv2.erode(binary_row, kernel, iterations=1) cv2.imshow("Eroded Image", erodedcol) cv2.waitKey(0) dilatedcol = cv2.dilate(erodedcol, kernel, iterations=1) cv2.imshow("dilate Image", dilatedcol) cv2.waitKey(0) return def getMDFFromFile(path): _length = 0 try: _md5 = hashlib.md5() with open(path, "rb") as ff: while True: data = ff.read(4096) if not data: break _length += len(data) _md5.update(data) return _md5.hexdigest(), _length except Exception as e: traceback.print_exc() return None, _length def add_html_format(text_list): new_text_list = [] for t in text_list: html_t = "\n" html_t += '\n' html_t += "\n" html_t += t html_t += "\n\n" new_text_list.append(html_t) return new_text_list @timeout_decorator.timeout(1200, timeout_exception=TimeoutError) def unique_temp_file_process(stream, _type): logging.info("into unique_temp_file_process") try: # 每个调用在temp中创建一个唯一空间 uid1 = uuid.uuid1().hex unique_space_path = _path + os.sep + "temp" + os.sep + uid1 + os.sep # unique_space_path = "/mnt/fangjiasheng/" + "temp/" + uid1 + "/" # 判断冲突 if not os.path.exists(unique_space_path): if not os.path.exists(_path + os.sep + "temp"): os.mkdir(_path + os.sep + "temp" + os.sep) os.mkdir(unique_space_path) else: uid2 = uuid.uuid1().hex if not os.path.exists(_path + os.sep + "temp"): os.mkdir(_path + os.sep + "temp" + os.sep) os.mkdir(_path + os.sep + "temp" + os.sep + uid2 + os.sep) # os.mkdir("/mnt/" + "temp/" + uid2 + "/") # 在唯一空间中,对传入的文件也保存为唯一 uid3 = uuid.uuid1().hex file_path = unique_space_path + uid3 + "." + _type with open(file_path, "wb") as ff: ff.write(stream) # 跳过一些编号 print("getMDFFromFile", getMDFFromFile(file_path)) if getMDFFromFile(file_path)[0] == '84dba5a65339f338d3ebdf9f33fae13e'\ or getMDFFromFile(file_path)[0] == '3d9f9f4354582d85b21b060ebd5786db'\ or getMDFFromFile(file_path)[0] == 'b52da40f24c6b29dfc2ebeaefe4e41f1' \ or getMDFFromFile(file_path)[0] == 'eefb925b7ccec1467be20b462fde2a09': raise Exception text = getText(_type, file_path) return text except Exception as e: # print("Convert error! Delete temp file. ", e, global_type) logging.info("unique_temp_file_process") print("unique_temp_file_process:", traceback.print_exc()) return [-1] finally: print("======================================") print("File md5:", getMDFFromFile(file_path)) try: if get_platform() == "Linux": # 删除该唯一空间下所有文件 if os.path.exists(unique_space_path): shutil.rmtree(unique_space_path) print() except Exception as e: logging.info("Delete Files Failed!") # print("Delete Files Failed!") return [-1] print("Finally") # to_html(_path + "6.html", text[0]) # to_html(unique_space_path + "result.html", text[0]) # return text logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def log(msg): """ @summary:打印信息 """ logger.info(msg) def cut_str(text_list, only_text_list, max_bytes_length=2000000): logging.info("into cut_str") try: # 计算有格式总字节数 bytes_length = 0 for text in text_list: bytes_length += len(bytes(text, encoding='utf-8')) print("text_list", bytes_length) # 小于直接返回 if bytes_length < max_bytes_length: print("return text_list no cut") return text_list # 全部文件连接,重新计算无格式字节数 all_text = "" bytes_length = 0 for text in only_text_list: bytes_length += len(bytes(text, encoding='utf-8')) all_text += text print("only_text_list", bytes_length) # 小于直接返回 if bytes_length < max_bytes_length: print("return only_text_list no cut") return only_text_list # 截取字符 all_text = all_text[:int(max_bytes_length/3)] print("text bytes ", len(bytes(all_text, encoding='utf-8'))) print("return only_text_list has cut") return [all_text] except Exception as e: logging.info("cut_str " + str(e)) return ["-1"] @get_memory_info.memory_decorator def convert(data, ocr_model, otr_model): """ 接口返回值: {[str], 1}: 处理成功 {[-1], 0}: 逻辑处理错误 {[-2], 0}: 接口调用错误 {[-3], 1}: 文件格式错误,无法打开 {[-4], 0}: 各类文件调用第三方包读取超时 {[-5], 0}: 整个转换过程超时 {[-6], 0}: 阿里云UDF队列超时 {[-7], 1}: 文件需密码,无法打开 :return: {"result": [], "is_success": int} """ # 控制内存 # soft, hard = resource.getrlimit(resource.RLIMIT_AS) # resource.setrlimit(resource.RLIMIT_AS, (15 * 1024 ** 3, hard)) logging.info("into convert") start_time = time.time() try: # 模型加入全局变量 globals().update({"global_ocr_model": ocr_model}) globals().update({"global_otr_model": otr_model}) stream = base64.b64decode(data.get("file")) _type = data.get("type") if get_platform() == "Windows": # 解除超时装饰器,直接访问原函数 origin_unique_temp_file_process = unique_temp_file_process.__wrapped__ text = origin_unique_temp_file_process(stream, _type) else: # Linux 通过装饰器设置整个转换超时时间 try: text = unique_temp_file_process(stream, _type) except TimeoutError: logging.info("convert time out! 1200 sec") text = [-5] if text == [-1]: print({"failed result": [-1], "is_success": 0}, time.time() - start_time) return {"result_html": ["-1"], "result_text": ["-1"], "is_success": 0} if text == [-2]: print({"failed result": [-2], "is_success": 0}, time.time() - start_time) return {"result_html": ["-2"], "result_text": ["-2"], "is_success": 0} if text == [-3]: print({"failed result": [-3], "is_success": 1}, time.time() - start_time) return {"result_html": ["-3"], "result_text": ["-3"], "is_success": 1} if text == [-4]: print({"failed result": [-4], "is_success": 0}, time.time() - start_time) return {"result_html": ["-4"], "result_text": ["-4"], "is_success": 0} if text == [-5]: print({"failed result": [-5], "is_success": 0}, time.time() - start_time) return {"result_html": ["-5"], "result_text": ["-5"], "is_success": 0} if text == [-7]: print({"failed result": [-7], "is_success": 1}, time.time() - start_time) return {"result_html": ["-7"], "result_text": ["-7"], "is_success": 1} # text = add_html_format(text) # 结果保存result.html if get_platform() == "Windows": text_str = "" for t in text: text_str += t to_html("../result.html", text_str) # 取纯文本 only_text = [] for t in text: new_t = BeautifulSoup(t, "lxml").get_text() new_t = re.sub("\n", "", new_t) only_text.append(new_t) # 判断长度,过长截取 text = cut_str(text, only_text) only_text = cut_str(only_text, only_text) if len(only_text) == 0: only_text = [""] if only_text[0] == '' and len(only_text) <= 1: print({"finished result": ["", 0], "is_success": 1}, time.time() - start_time) else: print({"finished result": [str(only_text)[:20], len(str(text))], "is_success": 1}, time.time() - start_time) return {"result_html": text, "result_text": only_text, "is_success": 1} except Exception as e: print({"failed result": [-1], "is_success": 0}, time.time() - start_time) print("convert", traceback.print_exc()) return {"result_html": ["-1"], "result_text": ["-1"], "is_success": 0} global_type = "" local_url = "http://127.0.0.1" if get_platform() == "Windows": _path = os.path.abspath(os.path.dirname(__file__)) else: _path = "/home/admin" if not os.path.exists(_path): _path = os.path.dirname(os.path.abspath(__file__)) if __name__ == '__main__': print(os.path.abspath(__file__) + "/../../") # if len(sys.argv) == 2: # port = int(sys.argv[1]) # else: # port = 15015 # app.run(host='0.0.0.0', port=port, threaded=True, debug=False) # log("format_conversion running") # convert("", "ocr_model", "otr_model") # _str = "啊" # str1 = "" # str2 = "" # for i in range(900000): # str1 += _str # list1 = [str1] # for i in range(700000): # str2 += _str # list2 = [str2] # cut_str(list1, list2) # file_path = "C:/Users/Administrator/Desktop/error1.png" # file_path = "D:/Project/table-detect-master/train_data/label_1.jpg" # file_path = "D:/Project/table-detect-master/test_files/1.png" # file_path = "D:/Project/table-detect-master/test_files/table2.jpg" file_path = "C:/Users/Administrator/Desktop/error9.pdf" # file_path = "C:/Users/Administrator/Desktop/Test_Interface/test1.pdf" # file_path = "C:/Users/Administrator/Desktop/Test_ODPS/1624875783055.pdf" # file_path = "table2.jpg" with open(file_path, "rb") as f: file_bytes = f.read() file_base64 = base64.b64encode(file_bytes) data = {"file": file_base64, "type": file_path.split(".")[-1], "filemd5": 100} ocr_model = ocr_interface.OcrModels().get_model() otr_model = otr_interface.OtrModels().get_model() result = convert(data, ocr_model, otr_model) print("*"*40) result = convert(data, ocr_model, otr_model) # print(result)