import copy import inspect import io import logging import os import re import sys from bs4 import BeautifulSoup sys.path.append(os.path.dirname(__file__) + "/../") from pdfplumber import PDF from pdfplumber.table import TableFinder from pdfplumber.page import Page as pdfPage from format_convert.convert_tree import _Document, _Page, _Image, _Sentence, _Table import time import pdfminer import math from scipy.stats import linregress from matplotlib import pyplot as plt from shapely.geometry import LineString, Point from format_convert import timeout_decorator from PIL import Image from format_convert.convert_image import image_process from format_convert.convert_need_interface import from_ocr_interface, from_office_interface import traceback import cv2 import PyPDF2 from PyPDF2 import PdfFileReader, PdfFileWriter from pdfminer.pdfparser import PDFParser from pdfminer.pdfdocument import PDFDocument from pdfminer.pdfpage import PDFPage from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter from pdfminer.converter import PDFPageAggregator from pdfminer.layout import LTTextBoxHorizontal, LAParams, LTFigure, LTImage, LTCurve, LTText, LTChar, LTRect, \ LTTextBoxVertical, LTLine, LTTextContainer from format_convert.utils import judge_error_code, add_div, get_platform, get_html_p, string_similarity, LineTable, \ get_logger, log, memory_decorator, draw_lines_plt, get_garble_code, line_is_cross import fitz from format_convert.wrapt_timeout_decorator import timeout @memory_decorator def pdf2Image(path, save_dir): log("into pdf2Image") try: try: doc = fitz.open(path) except Exception as e: log("pdf format error!") # print("pdf format error!", e) return [-3] # output_image_list = [] output_image_dict = {} page_count = doc.page_count for page_no in range(page_count): # 限制pdf页数,只取前10页后10页 if page_count > 20: if 10 <= page_no < page_count - 10: # log("pdf2Image: pdf pages count " + str(doc.page_count) # + ", only get 70 pages") continue try: page = doc.loadPage(page_no) output = save_dir + "_page" + str(page_no) + ".png" rotate = int(0) # 每个尺寸的缩放系数为1.3,这将为我们生成分辨率提高2.6的图像。 # 此处若是不做设置,默认图片大小为:792X612, dpi=96 # (1.33333333 --> 1056x816) (2 --> 1584x1224) # (1.183, 2.28 --> 1920x1080) zoom_x = 3. zoom_y = 3. # mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) pix = page.getPixmap(matrix=mat, alpha=False) pix.writePNG(output) pdf_image = cv2.imread(output) print("pdf_image", page_no, pdf_image.shape) # output_image_list.append([page_no, output]) output_image_dict[int(page_no)] = output except ValueError as e: traceback.print_exc() if str(e) == "page not in document": log("pdf2Image page not in document! continue..." + str(page_no)) continue elif "encrypted" in str(e): log("pdf2Image document need password " + str(page_no)) return [-7] except RuntimeError as e: if "cannot find page" in str(e): log("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e)) continue else: traceback.print_exc() return [-3] return [output_image_dict] except Exception as e: log("pdf2Image error!") print("pdf2Image", traceback.print_exc()) return [-1] @timeout(10, timeout_exception=TimeoutError) def pdf_analyze(interpreter, page, device, page_no): log("into pdf_analyze") pdf_time = time.time() # print("pdf_analyze interpreter process...") interpreter.process_page(page) # print("pdf_analyze device get_result...") layout = device.get_result() log("pdf2text page " + str(page_no) + " read time " + str(time.time() - pdf_time)) return layout @memory_decorator def pdf2text(path, unique_type_dir): log("into pdf2text") try: # pymupdf pdf to image save_dir = path.split(".")[-2] + "_" + path.split(".")[-1] output_image_dict = pdf2Image(path, save_dir) if judge_error_code(output_image_dict): return output_image_dict output_image_dict = output_image_dict[0] output_image_no_list = list(output_image_dict.keys()) output_image_no_list.sort(key=lambda x: x) # 获取每页pdf提取的文字、表格的列数、轮廓点、是否含表格、页码 # page_info_list = [] page_info_dict = {} has_table_dict = {} no_table_dict = {} for page_no in output_image_no_list: img_path = output_image_dict.get(page_no) print("pdf page", page_no, "in total", output_image_no_list[-1]) # 读不出来的跳过 try: img = cv2.imread(img_path) img_size = img.shape except: log("pdf2text read image in page fail! continue...") continue # 每张图片处理 text, column_list, outline_points, is_table = image_process(img, img_path, use_ocr=False) if judge_error_code(text): return text # page_info_list.append([text, column_list, outline_points, is_table, # page_no, img_size]) page_info = [text, column_list, outline_points, is_table, img_size] page_info_dict[int(page_no)] = page_info # 包含table的和不包含table的 if is_table: has_table_dict[int(page_no)] = page_info else: no_table_dict[int(page_no)] = page_info has_table_no_list = list(has_table_dict.keys()) has_table_no_list.sort(key=lambda x: x) page_no_list = list(page_info_dict.keys()) page_no_list.sort(key=lambda x: x) # 页码表格连接 table_connect_list, connect_text_list = page_table_connect(has_table_dict) if judge_error_code(table_connect_list): return table_connect_list # 连接的页码 table_connect_page_no_list = [] for area in connect_text_list: table_connect_page_no_list.append(area[1]) print("pdf2text table_connect_list", table_connect_list) print("connect_text_list", connect_text_list) # pdfminer 方式 try: fp = open(path, 'rb') # 用文件对象创建一个PDF文档分析器 parser = PDFParser(fp) # 创建一个PDF文档 doc = PDFDocument(parser) # 连接分析器,与文档对象 rsrcmgr = PDFResourceManager() device = PDFPageAggregator(rsrcmgr, laparams=LAParams()) interpreter = PDFPageInterpreter(rsrcmgr, device) # 判断是否能读pdf for page in PDFPage.create_pages(doc): break except pdfminer.psparser.PSEOF as e: # pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别 log("pdf2text " + str(e) + " use ocr read pdf!") text_list = [] for page_no in page_no_list: log("pdf2text ocr page_no " + str(page_no)) page_info = page_info_dict.get(page_no) # 表格 if page_info[3]: # 判断表格是否跨页连接 area_no = 0 jump_page = 0 for area in table_connect_list: if page_no in area: # 只记录一次text if page_no == area[0]: image_text = connect_text_list[area_no][0] text_list.append([image_text, page_no, 0]) jump_page = 1 area_no += 1 # 是连接页的跳过后面步骤 if jump_page: continue # 直接取text image_text = page_info_dict.get(page_no)[0] text_list.append([image_text, page_no, 0]) # 非表格 else: with open(output_image_dict.get(page_no), "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) text_list.append([image_text, page_no, 0]) text_list.sort(key=lambda z: z[1]) text = "" for t in text_list: text += t[0] return [text] except Exception as e: log("pdf format error!") traceback.print_exc() return [-3] text_list = [] page_no = 0 pages = PDFPage.create_pages(doc) pages = list(pages) page_count = len(pages) for page in pages: log("pdf2text pymupdf page_no " + str(page_no)) # 限制pdf页数,只取前100页 # if page_no >= 70: # log("pdf2text: pdf pages only get 70 pages") # break if page_count > 20: if 10 <= page_no < page_count - 10: page_no += 1 continue # 判断页码在含表格页码中,直接拿已生成的text if page_no in has_table_no_list: # 判断表格是否跨页连接 area_no = 0 jump_page = 0 for area in table_connect_list: if page_no in area: # 只记录一次text if page_no == area[0]: image_text = connect_text_list[area_no][0] text_list.append([image_text, page_no, 0]) jump_page = 1 area_no += 1 # 是连接页的跳过后面步骤 if jump_page: page_no += 1 continue # 直接取text image_text = has_table_dict.get(page_no)[0] text_list.append([image_text, page_no, 0]) page_no += 1 continue # 不含表格的解析pdf else: if get_platform() == "Windows": try: interpreter.process_page(page) layout = device.get_result() except Exception: log("pdf2text pdfminer read pdf page error! continue...") continue else: # 设置超时时间 try: # 解析pdf中的不含表格的页 if get_platform() == "Windows": origin_pdf_analyze = pdf_analyze.__wrapped__ layout = origin_pdf_analyze(interpreter, page, device) else: layout = pdf_analyze(interpreter, page, device, page_no) except TimeoutError as e: log("pdf2text pdfminer read pdf page time out!") return [-4] except Exception: log("pdf2text pdfminer read pdf page error! continue...") continue # 判断该页有没有文字对象,没有则有可能是有水印 only_image = 1 image_count = 0 for x in layout: if isinstance(x, LTTextBoxHorizontal): only_image = 0 if isinstance(x, LTFigure): image_count += 1 # 如果该页图片数量过多,直接ocr整页识别 log("pdf2text image_count " + str(image_count)) if image_count >= 3: image_text = page_info_dict.get(page_no)[0] if image_text is None: with open(output_image_dict.get(page_no), "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) if judge_error_code(image_text): return image_text page_info_dict[page_no][0] = image_text text_list.append([image_text, page_no, 0]) page_no += 1 continue order_list = [] for x in layout: # 该对象是否是ocr识别 ocr_flag = 0 if get_platform() == "Windows": # print("x", page_no, x) print() if isinstance(x, LTTextBoxHorizontal): image_text = x.get_text() # 无法识别编码,用ocr if re.search('[(]cid:[0-9]+[)]', image_text): print(re.search('[(]cid:[0-9]+[)]', image_text)) image_text = page_info_dict.get(page_no)[0] if image_text is None: with open(output_image_dict.get(page_no), "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) if judge_error_code(image_text): return image_text page_info_dict[page_no][0] = image_text image_text = add_div(image_text) # order_list.append([image_text, page_no, x.bbox[1]]) order_list = [[image_text, page_no, x.bbox[1]]] break else: image_text = add_div(image_text) order_list.append([image_text, page_no, x.bbox[1]]) continue if isinstance(x, LTFigure): for image in x: if isinstance(image, LTImage): try: print("pdf2text LTImage size", page_no, image.width, image.height) image_stream = image.stream.get_data() # 小的图忽略 if image.width <= 300 and image.height <= 300: continue # 有些水印导致pdf分割、读取报错 # if image.width <= 200 and image.height<=200: # continue # img_test = Image.open(io.BytesIO(image_stream)) # img_test.save('temp/LTImage.jpg') # 查看提取的图片高宽,太大则抛错用pdf输出图进行ocr识别 img_test = Image.open(io.BytesIO(image_stream)) if img_test.size[1] > 2000 or img_test.size[0] > 1500: print("pdf2text LTImage stream output size", img_test.size) raise Exception # 比较小的图则直接保存用ocr识别 else: img_test.save('temp/LTImage.jpg') with open('temp/LTImage.jpg', "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) if judge_error_code(image_text): return image_text # except pdfminer.pdftypes.PDFNotImplementedError: # with open(output_image_list[page_no], "rb") as ff: # image_stream = ff.read() except Exception: log("pdf2text pdfminer read image in page " + str(page_no) + " fail! use pymupdf read image...") # print(traceback.print_exc()) image_text = page_info_dict.get(page_no)[0] if image_text is None: with open(output_image_dict.get(page_no), "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) if judge_error_code(image_text): return image_text page_info_dict[page_no][0] = image_text ocr_flag = 1 # 判断只拿到了水印图: 无文字输出且只有图片对象 if image_text == "" and only_image: # 拆出该页pdf try: log("pdf2text guess pdf has watermark") split_path = get_single_pdf(path, page_no) except: # 如果拆分抛异常,则大概率不是水印图,用ocr识别图片 log("pdf2text guess pdf has no watermark") image_text = page_info_dict.get(page_no)[0] if image_text is None: with open(output_image_dict.get(page_no), "rb") as ff: image_stream = ff.read() image_text = from_ocr_interface(image_stream) order_list.append([image_text, page_no, -1]) page_info_dict[page_no][0] = image_text ocr_flag = 1 continue if judge_error_code(split_path): return split_path # 调用office格式转换 file_path = from_office_interface(split_path, unique_type_dir, 'html', 3) # if file_path == [-3]: # return [-3] if judge_error_code(file_path): return file_path # 获取html文本 image_text = get_html_p(file_path) if judge_error_code(image_text): return image_text if get_platform() == "Windows": print("image_text", page_no, x.bbox[1], image_text) with open("temp" + str(x.bbox[0]) + ".jpg", "wb") as ff: ff.write(image_stream) image_text = add_div(image_text) if ocr_flag: order_list.append([image_text, page_no, -1]) else: order_list.append([image_text, page_no, x.bbox[1]]) order_list.sort(key=lambda z: z[2], reverse=True) # 有ocr参与识别 if order_list[-1][2] == -1: ocr_order_list = [order_list[-1]] not_ocr_order_list = [] not_ocr_text = "" # 去重,因读取失败而重复获取 for order in order_list: if order[2] != -1: not_ocr_order_list.append(order) not_ocr_text += order[0] if string_similarity(ocr_order_list[0][0], not_ocr_text) >= 0.85: order_list = not_ocr_order_list else: order_list = ocr_order_list for order in order_list: text_list.append(order) page_no += 1 text = "" for t in text_list: # text += add_div(t[0]) if t[0] is not None: text += t[0] return [text] except UnicodeDecodeError as e: log("pdf2text pdfminer create pages failed! " + str(e)) return [-3] except Exception as e: log("pdf2text error!") traceback.print_exc() return [-1] def get_single_pdf(path, page_no): log("into get_single_pdf") try: # print("path, ", path) pdf_origin = PdfFileReader(path, strict=False) pdf_new = PdfFileWriter() pdf_new.addPage(pdf_origin.getPage(page_no)) path_new = path.split(".")[0] + "_split.pdf" with open(path_new, "wb") as ff: pdf_new.write(ff) return path_new except PyPDF2.utils.PdfReadError as e: raise e except Exception as e: log("get_single_pdf error! page " + str(page_no)) traceback.print_exc() raise e def page_table_connect(has_table_dict): log("into page_table_connect") if not has_table_dict: return [], [] try: # 判断是否有页码的表格相连 table_connect_list = [] temp_list = [] # 离图片顶部或底部距离,页面高度的1/7 threshold = 7 page_no_list = list(has_table_dict.keys()) page_no_list.sort(key=lambda x: x) for i in range(1, len(page_no_list)): page_info = has_table_dict.get(page_no_list[i]) last_page_info = has_table_dict.get(page_no_list[i - 1]) # 页码需相连 if page_no_list[i] - page_no_list[i - 1] == 1: # 上一页最后一个区域的列数和下一页第一个区域列数都为0,且相等 if not last_page_info[1][-1] and not page_info[1][0] and \ last_page_info[1][-1] == page_info[1][0]: # 上一页的轮廓点要离底部一定距离内,下一页的轮廓点要离顶部一定距离内 if last_page_info[4][0] - last_page_info[2][-1][1][1] \ <= int(last_page_info[4][0] / threshold) \ and page_info[2][0][0][1] - 0 \ <= int(page_info[4][0] / threshold): temp_list.append(page_no_list[i - 1]) temp_list.append(page_no_list[i]) continue # 条件不符合的,存储之前保存的连接页码 if len(temp_list) > 1: temp_list = list(set(temp_list)) temp_list.sort(key=lambda x: x) table_connect_list.append(temp_list) temp_list = [] if len(temp_list) > 1: temp_list = list(set(temp_list)) temp_list.sort(key=lambda x: x) table_connect_list.append(temp_list) temp_list = [] # 连接两页内容 connect_text_list = [] for area in table_connect_list: first_page_no = area[0] area_page_text = str(has_table_dict.get(first_page_no)[0]) for i in range(1, len(area)): current_page_no = area[i] current_page_text = str(has_table_dict.get(current_page_no)[0]) # 连接两个table table_prefix = re.finditer('', current_page_text) index_list = [] for t in table_prefix: index_list.append(t.span()) delete_index = index_list[0] current_page_text = current_page_text[:delete_index[0]] \ + current_page_text[delete_index[1]:] table_suffix = re.finditer('
', area_page_text) index_list = [] for t in table_suffix: index_list.append(t.span()) delete_index = index_list[-1] area_page_text = area_page_text[:delete_index[0]] \ + area_page_text[delete_index[1]:] area_page_text = area_page_text + current_page_text connect_text_list.append([area_page_text, area]) return table_connect_list, connect_text_list except Exception as e: # print("page_table_connect", e) log("page_table_connect error!") traceback.print_exc() return [-1], [-1] @timeout(30, timeout_exception=TimeoutError) def read_pdf(path, package_name, packages): log(package_name) laparams = LAParams(line_overlap=0.01, char_margin=0.3, line_margin=0.01, word_margin=0.01, boxes_flow=0.1, ) if package_name == packages[0]: fp = open(path, 'rb') parser = PDFParser(fp) doc_pdfminer = PDFDocument(parser) rsrcmgr = PDFResourceManager() device = PDFPageAggregator(rsrcmgr, laparams=laparams) interpreter = PDFPageInterpreter(rsrcmgr, device) return doc_pdfminer, device, interpreter elif package_name == packages[1]: doc_pymupdf = fitz.open(path) return doc_pymupdf elif package_name == packages[2]: doc_pypdf2 = PdfFileReader(path, strict=False) doc_pypdf2_new = PdfFileWriter() return doc_pypdf2, doc_pypdf2_new elif package_name == packages[3]: fp = open(path, 'rb') lt = LineTable() doc_top = 0 doc_pdfplumber = read_pdfplumber(fp, laparams) return lt, doc_top, doc_pdfplumber @timeout(25, timeout_exception=TimeoutError) def read_pdfminer(path, laparams): fp = open(path, 'rb') parser = PDFParser(fp) doc_pdfminer = PDFDocument(parser) rsrcmgr = PDFResourceManager() device = PDFPageAggregator(rsrcmgr, laparams=laparams) interpreter = PDFPageInterpreter(rsrcmgr, device) return doc_pdfminer, device, interpreter @timeout(15, timeout_exception=TimeoutError) def read_pymupdf(path): return fitz.open(path) @timeout(15, timeout_exception=TimeoutError) def read_pypdf2(path): doc_pypdf2 = PdfFileReader(path, strict=False) doc_pypdf2_new = PdfFileWriter() return doc_pypdf2, doc_pypdf2_new @timeout(25, timeout_exception=TimeoutError, use_signals=False) def read_pdfplumber(path, laparams): fp = open(path, 'rb') lt = LineTable() doc_top = 0 doc_pdfplumber = PDF(fp, laparams=laparams.__dict__) return lt, doc_top, doc_pdfplumber class PDFConvert: def __init__(self, path, unique_type_dir, need_page_no): self._doc = _Document(path) self.path = path self.unique_type_dir = unique_type_dir if not os.path.exists(self.unique_type_dir): os.mkdir(self.unique_type_dir) # 指定提取的页码范围 self.need_page_no = need_page_no self.start_page_no = None self.end_page_no = None # 默认使用limit_page_cnt控制,前10页后10页 if self.need_page_no is None: self.limit_page_cnt = 20 else: # 使用start_page_no,end_page_no范围控制,例如2,5 ss = self.need_page_no.split(',') if len(ss) != 2: self._doc.error_code = [-14] else: self.start_page_no = int(ss[0]) self.end_page_no = int(ss[-1]) if self.end_page_no == -1: self.end_page_no = 1000000 self.start_page_no -= 1 self.end_page_no -= 1 if self.end_page_no <= self.start_page_no or self.start_page_no < 0 or self.end_page_no < -1: self._doc.error_code = [-14] self.packages = ["pdfminer", "PyMuPDF", "PyPDF2", "pdfplumber"] self.has_init_pdf = [0] * len(self.packages) @memory_decorator def init_package(self, package_name): # 各个包初始化 try: laparams = LAParams(line_overlap=0.01, char_margin=0.3, line_margin=0.01, word_margin=0.01, boxes_flow=0.1, ) if package_name == self.packages[0]: # fp = open(self.path, 'rb') # parser = PDFParser(fp) # self.doc_pdfminer = PDFDocument(parser) # rsrcmgr = PDFResourceManager() # self.laparams = LAParams(line_overlap=0.01, # char_margin=0.3, # line_margin=0.01, # word_margin=0.01, # boxes_flow=0.1,) # self.device = PDFPageAggregator(rsrcmgr, laparams=self.laparams) # self.interpreter = PDFPageInterpreter(rsrcmgr, self.device) self.doc_pdfminer, self.device, self.interpreter = read_pdfminer(self.path, laparams) self.has_init_pdf[0] = 1 elif package_name == self.packages[1]: self.doc_pymupdf = read_pymupdf(self.path) self.has_init_pdf[1] = 1 elif package_name == self.packages[2]: # self.doc_pypdf2 = PdfFileReader(self.path, strict=False) # self.doc_pypdf2_new = PdfFileWriter() self.doc_pypdf2, self.doc_pypdf2_new = read_pypdf2(self.path) self.has_init_pdf[2] = 1 elif package_name == self.packages[3]: # self.fp = open(self.path, 'rb') # self.lt = LineTable() # self.doc_top = 0 # self.doc_pdfplumber = PDF(self.fp, laparams=self.laparams.__dict__) self.lt, self.doc_top, self.doc_pdfplumber = read_pdfplumber(self.path, laparams) self.has_init_pdf[3] = 0 else: log("Only Support Packages " + str(self.packages)) raise Exception except Exception as e: log(package_name + " cannot open pdf!") traceback.print_exc() self._doc.error_code = [-3] def convert(self, limit_page_cnt=20): if self.has_init_pdf[0] == 0: self.init_package("pdfminer") if self._doc.error_code is not None: self._doc.error_code = None # pdfminer读不了直接转成图片识别 self.get_all_page_image() return # 判断是否能读pdf try: pages = PDFPage.create_pages(self.doc_pdfminer) for page in pages: break pages = list(pages) # except pdfminer.psparser.PSEOF as e: except: # pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别 log("pdf2text pdfminer read failed! read by pymupdf!") traceback.print_exc() try: self.get_all_page_image() return except: traceback.print_exc() log("pdf2text use pymupdf read failed!") self._doc.error_code = [-3] return # 每一页进行处理 pages = PDFPage.create_pages(self.doc_pdfminer) pages = list(pages) page_count = len(pages) page_no = 0 for page in pages: # 指定pdf页码 if self.start_page_no is not None and self.end_page_no is not None: if page_count < self.end_page_no: self.end_page_no = page_count if page_no < self.start_page_no or page_no >= self.end_page_no: page_no += 1 continue # 限制pdf页数,只取前后各10页 else: if page_count > limit_page_cnt and int(limit_page_cnt/2) <= page_no < page_count - int(limit_page_cnt/2): page_no += 1 continue # 解析单页 self._page = _Page(page, page_no) self.convert_page(page, page_no) if self._doc.error_code is None and self._page.error_code is not None: if self._page.error_code[0] in [-4, -3, 0]: page_no += 1 continue else: self._doc.error_code = self._page.error_code break self._doc.add_child(self._page) page_no += 1 def clean_text(self, _text): return re.sub("\s", "", _text) def get_text_lines(self, page, page_no): lt_line_list = [] page_plumber = pdfPage(self.doc_pdfplumber, page, page_number=page_no, initial_doctop=self.doc_top) self.doc_top += page_plumber.height table_finder = TableFinder(page_plumber) all_width_zero = True for _edge in table_finder.get_edges(): if _edge.get('linewidth') and _edge.get('linewidth') > 0: all_width_zero = False break for _edge in table_finder.get_edges(): # print(_edge) if _edge.get('linewidth', 0.1) > 0 or all_width_zero: lt_line_list.append(LTLine(1, (float(_edge["x0"]), float(_edge["y0"])), (float(_edge["x1"]), float(_edge["y1"])))) log("pdf page %s has %s lines" % (str(page_no), str(len(lt_line_list)))) return lt_line_list def get_page_lines(self, layout, page_no): def _plot(_line_list, mode=1): for _line in _line_list: if mode == 1: x0, y0, x1, y1 = _line.__dict__.get("bbox") elif mode == 2: x0, y0, x1, y1 = _line plt.plot([x0, x1], [y0, y1]) plt.show() return def is_cross(A, B, C, D): if A[0] == B[0] == C[0] == D[0]: if A[1] <= C[1] <= B[1] or A[1] <= D[1] <= B[1] \ or C[1] <= A[1] <= D[1] or C[1] <= B[1] <= D[1]: return True if A[1] == B[1] == C[1] == D[1]: if A[0] <= C[0] <= B[0] or A[0] <= D[0] <= B[0] \ or C[0] <= A[0] <= D[0] or C[0] <= B[0] <= D[0]: return True line1 = LineString([A, B]) line2 = LineString([C, D]) int_pt = line1.intersection(line2) try: point_of_intersection = int_pt.x, int_pt.y return True except: return False def calculate_k(bbox): x = [bbox[0], bbox[2]] y = [bbox[1], bbox[3]] slope, intercept, r_value, p_value, std_err = linregress(x, y) # print('k', slope) if math.isnan(slope): slope = 0 return slope def line_iou(line1, line2, axis=0): inter = min(line1[1][axis], line2[1][axis]) - max(line1[0][axis], line2[0][axis]) # union = max(line1[1][axis], line2[1][axis]) - min(line1[0][axis], line2[0][axis]) union = min(abs(line1[0][axis] - line1[1][axis]), abs(line2[0][axis] - line2[1][axis])) if union in [0, 0.]: iou = 0. else: iou = inter / union return iou def get_cross_line(_line_list, threshold=1, cross_times=0): # 根据是否有交点判断表格线 _cross_line_list = [] for line1 in _line_list: if line1 in _cross_line_list: continue if abs(line1[2] - line1[0]) > abs(line1[3] - line1[1]): p1 = [max(0, line1[0] - threshold), line1[1]] p2 = [min(line1[2] + threshold, page_w), line1[3]] else: p1 = [line1[0], max(0, line1[1] - threshold)] p2 = [line1[2], min(line1[3] + threshold, page_h)] line1 = [p1[0], p1[1], p2[0], p2[1]] _times = 0 for line2 in _line_list: if abs(line2[2] - line2[0]) > abs(line2[3] - line2[1]): p3 = [max(0, line2[0] - threshold), line2[1]] p4 = [min(line2[2] + threshold, page_w), line2[3]] else: p3 = [line2[0], max(0, line2[1] - threshold)] p4 = [line2[2], min(line2[3] + threshold, page_h)] line2 = [p3[0], p3[1], p4[0], p4[1]] if line1 == line2: continue if is_cross(p1, p2, p3, p4): _times += 1 if _times >= cross_times: _cross_line_list += [line1] break return _cross_line_list def repair_bias_line(_line_list): temp_list = [] for line in _line_list: x0, y0, x1, y1 = line _y = min(y0, y1) _x = min(x0, x1) if abs(x0 - x1) > abs(y0 - y1): temp_list.append([x0, _y, x1, _y]) else: temp_list.append([_x, y0, _x, y1]) _line_list = temp_list return _line_list def repair_col_line(_straight_list, _bias_list, threshold=2, min_width=7): if not _straight_list or not _bias_list: print('add_col_bias_line empty', len(_straight_list), len(_bias_list)) return [] # 分列 _straight_list.sort(key=lambda x: (x[0], x[1])) cols = [] col = [] current_w = _straight_list[0][0] for line in _straight_list: if abs(line[0] - line[2]) > abs(line[1] - line[3]): continue if min(line[0], line[2]) - threshold <= current_w <= max(line[0], line[2]) + threshold: col.append(line) else: if col: cols.append(col) col = [line] current_w = line[0] if col: cols.append(col) # 补充col new_list = [] for line in bias_line_list: if abs(line[0] - line[2]) > abs(line[1] - line[3]): continue for col in cols: w = col[0][0] if w - threshold <= line[0] <= w + threshold or w - threshold <= line[2] <= w + threshold: new_list.append([w, line[1] - 3, w, line[3] + 3]) new_list += _straight_list # 去重 new_list = [str(x) for x in new_list] new_list = list(set(new_list)) new_list = [eval(x) for x in new_list] # 分列 new_list.sort(key=lambda x: (x[0], x[1])) cols = [] col = [] current_w = new_list[0][0] for line in new_list: if abs(line[0] - line[2]) > abs(line[1] - line[3]): continue if min(line[0], line[2]) - threshold <= current_w <= max(line[0], line[2]) + threshold: col.append(line) else: if col: cols.append(col) col = [line] current_w = line[0] if col: cols.append(col) # 删除col for col1 in cols: for col2 in cols: if col1 == col2 or abs(col1[0][0] - col2[0][0]) > min_width: continue col1_len, col2_len = 0, 0 for c in col1: col1_len += abs(c[1] - c[3]) for c in col2: col2_len += abs(c[1] - c[3]) if col1_len > col2_len * 3: for c in col2: if c in new_list: new_list.remove(c) if col2_len > col1_len * 3: for c in col1: if c in new_list: new_list.remove(c) return new_list def merge_line(_line_list, threshold=2): new_line_list = [] # 分列 _line_list.sort(key=lambda x: (x[0], x[1])) cols = [] col = [_line_list[0]] current_w = _line_list[0][0] for line in _line_list: if abs(line[0] - line[2]) > abs(line[1] - line[3]): continue if min(line[0], line[2]) - threshold <= current_w <= max(line[0], line[2]) + threshold \ and is_cross(line[0:2], line[2:4], col[-1][0:2], col[-1][2:4]): col.append(line) else: if col: cols.append(col) col = [line] current_w = line[0] if col: cols.append(col) for col in cols: temp_c = col[0] col_w = col[0][0] for i in range(len(col) - 1): c = col[i] next_c = col[i + 1] if is_cross(c[0:2], c[2:4], next_c[0:2], next_c[2:4]): temp_c = [col_w, min(temp_c[1], c[1], c[3], next_c[1], next_c[3]), col_w, max(temp_c[3], c[1], c[3], next_c[1], next_c[3])] else: new_line_list.append(temp_c) temp_c = next_c if not new_line_list or (new_line_list and new_line_list[-1] != temp_c): new_line_list.append(temp_c) # 分行 _line_list.sort(key=lambda x: (x[1], x[0])) rows = [] row = [] current_h = _line_list[0][1] for line in _line_list: if abs(line[0] - line[2]) < abs(line[1] - line[3]): continue if min(line[1], line[3]) - threshold <= current_h <= max(line[1], line[3]) + threshold: row.append(line) else: if row: rows.append(row) row = [line] current_h = line[1] if row: rows.append(row) for row in rows: temp_r = row[0] row_h = row[0][1] for i in range(len(row) - 1): r = row[i] next_r = row[i + 1] # if is_cross(r[0:2], r[2:4], next_r[0:2], next_r[2:4]): if line_iou([r[0:2], r[2:4]], [next_r[0:2], next_r[2:4]], axis=0): temp_r = [min(temp_r[0], r[0], r[2], next_r[0], next_r[2]), row_h, max(temp_r[2], r[0], r[2], next_r[0], next_r[2]), row_h] else: new_line_list.append(temp_r) temp_r = next_r if not new_line_list or (new_line_list and new_line_list[-1] != temp_r): new_line_list.append(temp_r) return new_line_list def remove_outline_no_cross(_line_list): row_list = [] col_list = [] for line in _line_list: # 存所有行 if abs(line[0] - line[2]) > abs(line[1] - line[3]): row_list.append(line) # 存所有列 if abs(line[0] - line[2]) < abs(line[1] - line[3]): col_list.append(line) if not col_list: return _line_list # 左右两条边框 col_list.sort(key=lambda x: (x[0], x[1])) left_col = col_list[0] right_col = col_list[-1] # 判断有交点但中间区域无交点 compare_list = [] for col in [left_col, right_col]: add_h = abs(col[1]-col[3]) / 8 center_area = [col[1]+add_h, col[3]-add_h] cross_cnt = 0 center_cross_cnt = 0 center_row_cnt = 0 for row in row_list: if is_cross(row[0:2], row[2:4], col[0:2], col[2:4]): if center_area[0] <= row[1] <= center_area[1]: center_cross_cnt += 1 else: cross_cnt += 1 else: if center_area[0] <= row[1] <= center_area[1]: center_row_cnt += 1 compare_list.append([cross_cnt, center_cross_cnt, center_row_cnt]) _flag = True for c in compare_list: if c[0] >= 2 and c[1] == 0 and c[2] >= 2: continue _flag = False print('compare_list', compare_list) if _flag and compare_list[0][1] == compare_list[1][1] \ and compare_list[0][2] == compare_list[1][2]: for col in [left_col, right_col]: if col in _line_list: _line_list.remove(col) return _line_list log('into get_page_lines') page_h = layout.height page_w = layout.width element_list = [] line_list = [] bias_line_list = [] text_bbox_list = [] for element in layout: if isinstance(element, LTTextContainer): text_bbox_list.append(element.bbox) # 只取这三种类型的bbox if isinstance(element, (LTRect, LTCurve, LTLine)): element_list.append(element) if element.height > 0.5 and element.width > 0.5: # print('element.height, element.width', element.height, element.width) k = calculate_k(element.bbox) if 1.73 / 3 < abs(k) < 1.73: continue else: bias_line_list.append(element.bbox) continue line_list.append(element.bbox) if not line_list and not bias_line_list: return [] # 是否使用斜线来生成表格 if len(line_list) < 6 and len(bias_line_list) > len(line_list) * 2: # print('use bias line') # bias_line_list += add_col_bias_line(line_list, bias_line_list) line_list = bias_line_list # 去重 line_list = [str(x) for x in line_list] line_list = list(set(line_list)) line_list = [eval(x) for x in line_list] # 根据是否有交点判断表格线 cross_line_list = get_cross_line(line_list, threshold=2, cross_times=1) if not cross_line_list: return [] # 斜线校正 if cross_line_list: cross_line_list = repair_bias_line(cross_line_list) # 修复竖线 if bias_line_list: cross_line_list = repair_col_line(cross_line_list, bias_line_list) # 根据是否有交点判断表格线 cross_line_list = get_cross_line(cross_line_list, threshold=1, cross_times=1) # 合并线条 cross_line_list = merge_line(cross_line_list) # 删除最外层嵌套边框 cross_line_list = remove_outline_no_cross(cross_line_list) # show # print('len(cross_line_list)', len(cross_line_list)) # _plot(line_list, mode=2) # _plot(cross_line_list, mode=2) lt_line_list = [] for line in cross_line_list: lt_line_list.append(LTLine(1, (float(line[0]), float(line[1])), (float(line[2]), float(line[3])))) log("pdf page %s has %s lines" % (str(page_no), str(len(lt_line_list)))) return lt_line_list def recognize_text(self, layout, page_no, lt_text_list, lt_line_list): list_tables, filter_objs, _, connect_textbox_list = self.lt.recognize_table(lt_text_list, lt_line_list) self._page.in_table_objs = filter_objs # print("=======text_len:%d:filter_len:%d"%(len(lt_text_list),len(filter_objs))) for table in list_tables: _table = _Table(table["table"], table["bbox"]) # self._page.children.append(_table) self._page.add_child(_table) list_sentences = ParseUtils.recognize_sentences(lt_text_list, filter_objs, layout.bbox, page_no) for sentence in list_sentences: _sen = _Sentence(sentence.text, sentence.bbox) self._page.add_child(_sen) # pdf对象需反向排序 self._page.is_reverse = True def is_text_legal(self, lt_text_list, page_no): # 无法识别pdf字符编码,整页用ocr text_temp = "" for _t in lt_text_list: text_temp += _t.get_text() if re.search('[(]cid:[0-9]+[)]', text_temp): log("text has cid! try pymupdf...") page_image = self.get_page_image(page_no) if judge_error_code(page_image): self._page.error_code = page_image else: _image = _Image(page_image[1], page_image[0]) self._page.add_child(_image) return False match1 = re.findall(get_garble_code(), text_temp) # match2 = re.search('[\u4e00-\u9fa5]', text_temp) if len(match1) > 3 and len(text_temp) > 10: log("pdf garbled code! try pymupdf... " + text_temp[:20]) page_image = self.get_page_image(page_no) if judge_error_code(page_image): self._page.error_code = page_image else: _image = _Image(page_image[1], page_image[0]) self._page.add_child(_image) return False return True def judge_b_table(self, lt_text_list): # 先分行 lt_text_list.sort(key=lambda x: (x.bbox[1], x.bbox[0])) lt_text_row_list = [] current_h = lt_text_list[0].bbox[1] row = [] threshold = 2 for lt_text in lt_text_list: bbox = lt_text.bbox if current_h - threshold <= bbox[1] <= current_h + threshold: row.append(lt_text) else: if row: lt_text_row_list.append(row) row = [lt_text] current_h = lt_text.bbox[1] if row: lt_text_row_list.append(row) # print('lt_text_row_list') # for r in lt_text_row_list: # print('r', [x.get_text() for x in r]) # 判断文本中间是否是空格,或一行文本中间有多个 is_b_table_flag = False is_b_table_cnt = 3 tolerate_cnt = 2 t_cnt = 0 row_cnt = 0 for row in lt_text_row_list: # 水印行跳过 if len(row) == 1 and len(row[0].get_text()[:-1]) == 1: continue if len(row) == 1: text = row[0].get_text() bbox = row[0].bbox match = re.search('[ ]{3,}', text) if match and re.search('[\u4e00-\u9fff]{2,}', text[:match.span()[0]]) \ and re.search('[\u4e00-\u9fff]{2,}', text[match.span()[1]:]): row_cnt += 1 t_cnt = 0 else: # 容忍 if t_cnt < tolerate_cnt: t_cnt += 1 continue row_cnt = 0 else: row_cnt += 1 t_cnt = 0 if row_cnt >= is_b_table_cnt: is_b_table_flag = True break log('pdf is_b_table_flag ' + str(is_b_table_flag)) return is_b_table_flag def convert_page(self, page, page_no): layout = self.get_layout(page, page_no) if self._doc.error_code is not None: return if judge_error_code(layout): self._page.error_code = layout return # 判断该页的对象类型,并存储 lt_text_list = [] lt_image_list = [] for x in layout: if isinstance(x, (LTTextBoxHorizontal, LTTextBoxVertical)): lt_text_list.append(x) if isinstance(x, LTFigure): for y in x: if isinstance(y, LTImage): # 小的图忽略 if y.width <= 300 and y.height <= 300: continue # 图的width超过layout width,很大可能是水印 if y.width > layout.width + 20: continue lt_image_list.append(y) lt_text_list = self.delete_water_mark(lt_text_list, layout.bbox, 15) log("convert_pdf page " + str(page_no)) log("len(lt_image_list), len(lt_text_list) " + str(len(lt_image_list)) + " " + str(len(lt_text_list))) log('layout.width, layout.height' + str(layout.width) + str(layout.height)) # 若只有文本且图片数为0,直接提取文字及表格 # if only_image == 0 and image_count == 0: # if len(lt_image_list) == 0 and len(lt_text_list) > 0: # # PDFPlumber # if self.has_init_pdf[3] == 0: # self.init_package("pdfplumber") # if self._doc.error_code is not None: # self._doc.error_code = None # log("init pdfplumber failed! try pymupdf...") # # 调用pdfplumber获取pdf图片报错,则使用pypdf2将pdf转html # page_image = self.get_page_image(page_no) # if judge_error_code(page_image): # self._page.error_code = page_image # else: # _image = _Image(page_image[1], page_image[0]) # self._page.add_child(_image) # return # # if not self.is_text_legal(lt_text_list, page_no): # return # # # 根据text规律,判断该页是否可能有无边框表格 # start_time = time.time() # if self.judge_b_table(lt_text_list): # page_image = self.get_page_image(page_no) # if judge_error_code(page_image): # self._page.error_code = page_image # else: # _image = _Image(page_image[1], page_image[0]) # _image.is_from_pdf = True # _image.b_table_from_text = True # _image.b_table_text_obj_list = lt_text_list # _image.b_table_layout_size = (layout.width, layout.height) # self._page.add_child(_image) # log('convert_pdf judge_b_table set image cost: ' + str(time.time()-start_time)) # # try: # lt_line_list = self.get_page_lines(layout, page_no) # except: # traceback.print_exc() # lt_line_list = [] # self._page.error_code = [-13] # try: # # lt_line_list = self.get_text_lines(page,page_no) # self.recognize_text(layout, page_no, lt_text_list, lt_line_list) # except: # traceback.print_exc() # self._page.error_code = [-8] # 若该页图片数量过多,或无文本,则直接ocr整页识别 # elif image_count > 3 or only_image == 1: if len(lt_image_list) > 3 or len(lt_text_list) == 0: page_image = self.get_page_image(page_no) if judge_error_code(page_image): self._page.error_code = page_image else: _image = _Image(page_image[1], page_image[0]) _image.is_from_pdf = True self._page.add_child(_image) # 正常读取该页对象 else: # 图表对象 for image in lt_image_list: try: print("pdf2text LTImage size", page_no, image.width, image.height) image_stream = image.stream.get_data() # 小的图忽略 if image.width <= 300 and image.height <= 300: continue # 查看提取的图片高宽,太大则用pdf输出图进行ocr识别 img_test = Image.open(io.BytesIO(image_stream)) if image.height >= 1000 and image.width >= 1000: page_image = self.get_page_image(page_no) if judge_error_code(page_image): self._page.error_code = page_image else: _image = _Image(page_image[1], page_image[0]) _image.is_from_pdf = True self._page.add_child(_image) return # 比较小的图则直接保存用ocr识别 else: temp_path = self.unique_type_dir + 'page' + str(page_no) \ + '_lt' + str(lt_image_list.index(image)) + '.jpg' img_test.save(temp_path) with open(temp_path, "rb") as ff: image_stream = ff.read() _image = _Image(image_stream, temp_path, image.bbox) self._page.add_child(_image) except Exception: log("pdf2text pdfminer read image in page " + str(page_no) + " fail! use pymupdf read image...") traceback.print_exc() # pdf对象需反向排序 self._page.is_reverse = True self.init_package("pdfplumber") if not self.is_text_legal(lt_text_list, page_no): return # 根据text规律,判断该页是否可能有无边框表格 if self.judge_b_table(lt_text_list): page_image = self.get_page_image(page_no) if judge_error_code(page_image): self._page.error_code = page_image else: _image = _Image(page_image[1], page_image[0]) _image.is_from_pdf = True _image.b_table_from_text = True _image.b_table_text_obj_list = lt_text_list _image.b_table_layout_size = (layout.width, layout.height) self._page.add_child(_image) try: lt_line_list = self.get_page_lines(layout, page_no) except: traceback.print_exc() lt_line_list = [] self._page.error_code = [-13] self.recognize_text(layout, page_no, lt_text_list, lt_line_list) def get_layout(self, page, page_no): log("get_layout") if self.has_init_pdf[0] == 0: self.init_package("pdfminer") if self._doc.error_code is not None: return # 获取该页layout start_time = time.time() try: if get_platform() == "Windows": # origin_pdf_analyze = pdf_analyze.__wrapped__ # layout = origin_pdf_analyze(self.interpreter, page, self.device) layout = pdf_analyze(self.interpreter, page, self.device, page_no) else: layout = pdf_analyze(self.interpreter, page, self.device, page_no) except TimeoutError as e: log("pdf2text pdfminer read pdf page " + str(page_no) + " time out! " + str(time.time() - start_time)) layout = [-4] except Exception: traceback.print_exc() log("pdf2text pdfminer read pdf page " + str(page_no) + " error! continue...") layout = [-3] return layout def get_page_image(self, page_no): log("") try: if self.has_init_pdf[1] == 0: self.init_package("PyMuPDF") if self._doc.error_code is not None: return # save_dir = self.path.split(".")[-2] + "_" + self.path.split(".")[-1] output = self.unique_type_dir + "page" + str(page_no) + ".png" page = self.doc_pymupdf.loadPage(page_no) rotate = int(0) zoom_x = 2. zoom_y = 2. mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) pix = page.getPixmap(matrix=mat, alpha=False) pix.writePNG(output) # 输出图片resize self.resize_image(output) with open(output, "rb") as f: pdf_image = f.read() return [output, pdf_image] except ValueError as e: traceback.print_exc() if str(e) == "page not in document": log("pdf2Image page not in document! continue... page " + str(page_no)) return [0] elif "encrypted" in str(e): log("pdf2Image document need password " + str(page_no)) return [-7] except RuntimeError as e: if "cannot find page" in str(e): log("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e)) return [0] else: traceback.print_exc() return [-3] def get_all_page_image(self): log("") if self.has_init_pdf[1] == 0: self.init_package("PyMuPDF") if self._doc.error_code is not None: return page_count = self.doc_pymupdf.page_count for page_no in range(page_count): # 限制pdf页数,只取前10页后10页 if page_count > 20: if 10 <= page_no < page_count - 10: continue self._page = _Page(None, page_no) page_image = self.get_page_image(page_no) if judge_error_code(page_image): self._page.error_code = page_image else: _image = _Image(page_image[1], page_image[0]) self._page.add_child(_image) # 报错继续读后面页面 if self._doc.error_code is None and self._page.error_code is not None: continue self._doc.add_child(self._page) def connect_table(self, html_list): if not html_list: return html_list # 判断条件1:最后一个表格后有无非页码文本/第一个表格前有无文本 connect_flag_list = [] soup_list = [] for i, h in enumerate(html_list): soup_list.append(BeautifulSoup(h, 'lxml')) # 找最后一个表格 table_start1, table_end1 = None, None # print('h', h) match = re.finditer('', h[table_start1:]) for m in match: table_end1 = m.span()[1] + table_start1 # 最后一个表格后有无除了页码外的内容 connect_flag1 = False if table_end1 is not None: match = re.search('[^-/第页0-9]*', re.sub('
|
', '', h[table_end1:])) # print('match1', match.group()) if not match or match.group() == '': connect_flag1 = True # 找第一个表格 table_start2, table_end2 = None, None match = re.finditer('([-/第页0-9]|
|
)*', '', new_html) soup = BeautifulSoup(new_html, 'lxml') trs = soup.findAll('tr') for i in range(len(trs)): if trs[i].get_text() == '#@#@#': td1 = trs[i - 1].findAll('td') td2 = trs[i + 1].findAll('td') if td2[0].get_text() == '': for j in range(len(td1)): td1[j].string = td1[j].get_text() + td2[j].get_text() trs[i + 1].decompose() trs[i].decompose() new_html = str(soup) new_html_list.append(new_html) html_str = '' for h in new_html_list: html_str += h return [html_str] def get_html(self): if self._doc.error_code is not None: return self._doc.error_code self.convert() if self._doc.error_code is not None: return self._doc.error_code html = self._doc.get_html(return_list=True) # 表格连接 try: html = self.connect_table(html) except: traceback.print_exc() return [-12] return html def delete_water_mark(self, lt_text_list, page_bbox, times=5): # 删除过多重复字句,为水印 duplicate_dict = {} for _obj in lt_text_list: t = _obj.get_text() if t in duplicate_dict.keys(): duplicate_dict[t][0] += 1 duplicate_dict[t][1].append(_obj) else: duplicate_dict[t] = [1, [_obj]] delete_text = [] for t in duplicate_dict.keys(): if duplicate_dict[t][0] >= times: obj_list = duplicate_dict[t][1] obj_list.sort(key=lambda x: x.bbox[3]) obj_distance_h = abs(obj_list[-1].bbox[3] - obj_list[0].bbox[1]) obj_list.sort(key=lambda x: x.bbox[2]) obj_distance_w = abs(obj_list[-1].bbox[2] - obj_list[0].bbox[0]) if obj_distance_h >= abs(page_bbox[1] - page_bbox[3]) * 0.7 \ and obj_distance_w >= abs(page_bbox[0] - page_bbox[2]) * 0.7: delete_text.append(t) temp_text_list = [] for _obj in lt_text_list: t = _obj.get_text() if t not in delete_text: temp_text_list.append(_obj) return temp_text_list def resize_image(self, img_path, max_size=2000): _img = cv2.imread(img_path) if _img.shape[0] <= max_size or _img.shape[1] <= max_size: return else: resize_axis = 0 if _img.shape[0] >= _img.shape[1] else 1 ratio = max_size / _img.shape[resize_axis] new_shape = [0, 0] new_shape[resize_axis] = max_size new_shape[1 - resize_axis] = int(_img.shape[1 - resize_axis] * ratio) _img = cv2.resize(_img, (new_shape[1], new_shape[0])) cv2.imwrite(img_path, _img) def get_single_pdf(self, path, page_no): log("into get_single_pdf") try: pdf_origin = copy.deepcopy(self.doc_pypdf2) pdf_new = copy.deepcopy(self.doc_pypdf2_new) pdf_new.addPage(pdf_origin.getPage(page_no)) path_new = path.split(".")[0] + "_split.pdf" with open(path_new, "wb") as ff: pdf_new.write(ff) return path_new except PyPDF2.utils.PdfReadError as e: return [-3] except Exception as e: log("get_single_pdf error! page " + str(page_no)) return [-3] def get_text_font(): def flags_decomposer(flags): """Make font flags human readable.""" l = [] if flags & 2 ** 0: l.append("superscript") if flags & 2 ** 1: l.append("italic") if flags & 2 ** 2: l.append("serifed") else: l.append("sans") if flags & 2 ** 3: l.append("monospaced") else: l.append("proportional") if flags & 2 ** 4: l.append("bold") return ", ".join(l) def get_underlined_textLines(page): """ 获取某页pdf上的所有下划线文本信息 :param page: fitz中的一页 :return: list of tuples,每个tuple都是一个完整的下划线覆盖的整体:[(下划线句, 所在blk_no, 所在line_no), ...] """ paths = page.get_drawings() # get drawings on the current page # 获取该页内所有的height很小的bbox。因为下划线其实大多是这种矩形 # subselect things we may regard as lines lines = [] for p in paths: for item in p["items"]: if item[0] == "l": # an actual line p1, p2 = item[1:] if p1.y == p2.y: lines.append((p1, p2)) elif item[0] == "re": # a rectangle: check if height is small r = item[1] if r.width > r.height and r.height <= 2: lines.append((r.tl, r.tr)) # take top left / right points # 获取该页的`max_lineheight`,用于下面比较距离使用 blocks = page.get_text("dict", flags=11)["blocks"] max_lineheight = 0 for b in blocks: for l in b["lines"]: bbox = fitz.Rect(l["bbox"]) if bbox.height > max_lineheight: max_lineheight = bbox.height underlined_res = [] # 开始对下划线内容进行查询 # make a list of words words = page.get_text("words") # if underlined, the bottom left / right of a word # should not be too far away from left / right end of some line: for wdx, w in enumerate(words): # w[4] is the actual word string r = fitz.Rect(w[:4]) # first 4 items are the word bbox for p1, p2 in lines: # check distances for start / end points if abs(r.bl - p1) <= max_lineheight: # 当前word的左下满足下划线左下 if abs(r.br - p2) <= max_lineheight: # 当前word的右下满足下划线右下(单个词,无空格) print(f"Word '{w[4]}' is underlined! Its block-line number is {w[-3], w[-2]}") underlined_res.append((w[4], w[-3], w[-2])) # 分别是(下划线词,所在blk_no,所在line_no) break # don't check more lines else: # 继续寻找同line右侧的有缘人,因为有些下划线覆盖的词包含多个词,多个词之间有空格 curr_line_num = w[-2] # line nunmber for right_wdx in range(wdx + 1, len(words), 1): _next_w = words[right_wdx] if _next_w[-2] != curr_line_num: # 当前遍历到的右侧word已经不是当前行的了(跨行是不行的) break _r_right = fitz.Rect(_next_w[:4]) # 获取当前同行右侧某word的方框4点 if abs(_r_right.br - p2) <= max_lineheight: # 用此word右下点和p2(目标下划线右上点)算距离,距离要小于max_lineheight print( f"Word '{' '.join([_one_word[4] for _one_word in words[wdx:right_wdx + 1]])}' is underlined! " + f"Its block-line number is {w[-3], w[-2]}") underlined_res.append( (' '.join([_one_word[4] for _one_word in words[wdx:right_wdx + 1]]), w[-3], w[-2]) ) # 分别是(下划线词,所在blk_no,所在line_no) break # don't check more lines return underlined_res _p = r'C:\Users\Administrator\Desktop\test_pdf\error2-2.pdf' doc_pymupdf = read_pymupdf(_p) page = doc_pymupdf[0] blocks = page.get_text("dict", flags=11)["blocks"] for b in blocks: # iterate through the text blocks for l in b["lines"]: # iterate through the text lines for s in l["spans"]: # iterate through the text spans print("") font_properties = "Font: '%s' (%s), size %g, color #%06x" % ( s["font"], # font name flags_decomposer(s["flags"]), # readable font flags s["size"], # font size s["color"], # font color ) print(s) print("Text: '%s'" % s["text"]) # simple print of text print(font_properties) get_underlined_textLines(page) # 以下为现成pdf单页解析接口 class ParseSentence: def __init__(self, bbox, fontname, fontsize, _text, _title, title_text, _pattern, title_degree, is_outline, outline_location, page_no): (x0, y0, x1, y1) = bbox self.x0 = x0 self.y0 = y0 self.x1 = x1 self.y1 = y1 self.bbox = bbox self.fontname = fontname self.fontsize = fontsize self.text = _text self.title = _title self.title_text = title_text self.groups = _pattern self.title_degree = title_degree self.is_outline = is_outline self.outline_location = outline_location self.page_no = page_no def __repr__(self): return "%s,%s,%s,%d,%s" % (self.text, self.title, self.is_outline, self.outline_location, str(self.bbox)) class ParseUtils: @staticmethod def getFontinfo(_page): for _obj in _page._objs: if isinstance(_obj, (LTTextBoxHorizontal, LTTextBoxVertical)): for textline in _obj._objs: done = False for lchar in textline._objs: if isinstance(lchar, (LTChar)): _obj.fontname = lchar.fontname _obj.fontsize = lchar.size done = True break if done: break @staticmethod def recognize_sentences(list_textbox, filter_objs, page_bbox, page_no, remove_space=True, sourceP_LB=True): list_textbox.sort(key=lambda x: x.bbox[0]) list_textbox.sort(key=lambda x: x.bbox[3], reverse=sourceP_LB) cluster_textbox = [] for _textbox in list_textbox: if _textbox in filter_objs: continue _find = False for _ct in cluster_textbox: if abs(_ct["y"] - _textbox.bbox[1]) < 5: _find = True _ct["textbox"].append(_textbox) if not _find: cluster_textbox.append({"y": _textbox.bbox[1], "textbox": [_textbox]}) cluster_textbox.sort(key=lambda x: x["y"], reverse=sourceP_LB) list_sentences = [] for _line in cluster_textbox: _textboxs = _line["textbox"] _textboxs.sort(key=lambda x: x.bbox[0]) _linetext = _textboxs[0].get_text() for _i in range(1, len(_textboxs)): if abs(_textboxs[_i].bbox[0] - _textboxs[_i - 1].bbox[2]) > 60: if _linetext[-1] not in (",", ",", "。", ".", "、", ";"): _linetext += "=,=" _linetext += _textboxs[_i].get_text() _linetext = re.sub("[\s\r\n]", "", _linetext) _bbox = (_textboxs[0].bbox[0], _textboxs[0].bbox[1], _textboxs[-1].bbox[2], _textboxs[-1].bbox[3]) _title = None _pattern_groups = None title_text = "" if not _title: _groups = ParseUtils.find_title_by_pattern(_textboxs[0].get_text()) if _groups: _title = _groups[0][0] title_text = _groups[0][1] _pattern_groups = _groups if not _title: _groups = ParseUtils.find_title_by_pattern(_linetext) if _groups: _title = _groups[0][0] title_text = _groups[0][1] _pattern_groups = _groups if not _title: _title = ParseUtils.rec_incenter(_bbox, page_bbox) title_degree = 2 if not _title: _linetext = _linetext.replace("=,=", ",") else: _linetext = _linetext.replace("=,=", "") title_degree = int(_title.split("_")[1]) # 页码 if ParseUtils.rec_incenter(_bbox, page_bbox) and re.search("^\d+$", _linetext) is not None: continue if _linetext == "" or re.search("^,+$", _linetext) is not None: continue is_outline = False outline_location = -1 _search = re.search("(?P.+?)\.{5,}(?P\d+)$", _linetext) if _search is not None: is_outline = True _linetext = _search.group("text") outline_location = int(_search.group("nums")) list_sentences.append( ParseSentence(_bbox, _textboxs[-1].__dict__.get("fontname"), _textboxs[-1].__dict__.get("fontsize"), _linetext, _title, title_text, _pattern_groups, title_degree, is_outline, outline_location, page_no)) # for _sen in list_sentences: # print(_sen.__dict__) return list_sentences @staticmethod def find_title_by_pattern(_text, _pattern="(?P(?P^第?)(?P[一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+)(?P[、章]))|" \ "(?P^(?P[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+))|" \ "(?P^(?P第?)(?P[一二三四五六七八九十]+)(?P[节]))|" \ "(?P^(?P\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-])(?P\d{1,2})(?P[\..、\s\-]))|" \ "(?P^(?P\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-])(?P\d{1,2})(?P[\..、\s\-]))|" \ "(?P^(?P\d{1,2}[\..、\s\-])(?P\d{1,2})(?P[\..、\s\-]))|" \ "(?P^(?P\d{1,2})(?P[\..、\s\-]))|" \ "(?P^(?P(?)(?P\d{1,2})(?P)))|" \ "(?P^(?P(?)(?P[a-zA-Z]+)(?P)))|" "(?P^(?P(?)(?P[一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+)(?P)))|" \ ): _se = re.search(_pattern, _text) groups = [] if _se is not None: _gd = _se.groupdict() for k, v in _gd.items(): if v is not None: groups.append((k, v)) if len(groups): groups.sort(key=lambda x: x[0]) return groups return None @staticmethod def rec_incenter(o_bbox, p_bbox): p_width = p_bbox[2] - p_bbox[0] l_space = (o_bbox[0] - p_bbox[0]) / p_width r_space = (p_bbox[2] - o_bbox[2]) / p_width if abs((l_space - r_space)) < 0.1 and l_space > 0.2: return "title_2" @staticmethod def is_first_title(_title): if _title is None: return False if re.search("^\d+$", _title) is not None: if int(_title) == 1: return True return False if re.search("^[一二三四五六七八九十百]+$", _title) is not None: if _title == "一": return True return False if re.search("^[a-z]+$", _title) is not None: if _title == "a": return True return False if re.search("^[A-Z]+$", _title) is not None: if _title == "A": return True return False if re.search("^[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]$", _title) is not None: if _title == "Ⅰ": return True return False return False @staticmethod def get_next_title(_title): if re.search("^\d+$", _title) is not None: return str(int(_title) + 1) if re.search("^[一二三四五六七八九十百]+$", _title) is not None: _next_title = ParseUtils.make_increase(['一', '二', '三', '四', '五', '六', '七', '八', '九', '十'], re.sub("[十百]", '', _title)) _next_title = list(_next_title) _next_title.reverse() if _next_title[-1] != "十": if len(_next_title) >= 2: _next_title.insert(-1, '十') if len(_next_title) >= 4: _next_title.insert(-3, '百') if _title[0] == "十": if _next_title == "十": _next_title = ["二", "十"] _next_title.insert(0, "十") _next_title = "".join(_next_title) return _next_title if re.search("^[a-z]+$", _title) is not None: _next_title = ParseUtils.make_increase([chr(i + ord('a')) for i in range(26)], _title) _next_title = list(_next_title) _next_title.reverse() return "".join(_next_title) if re.search("^[A-Z]+$", _title) is not None: _next_title = ParseUtils.make_increase([chr(i + ord('A')) for i in range(26)], _title) _next_title = list(_next_title) _next_title.reverse() return "".join(_next_title) if re.search("^[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]$", _title) is not None: _sort = ["Ⅰ", "Ⅱ", "Ⅲ", "Ⅳ", "Ⅴ", "Ⅵ", "Ⅶ", "Ⅷ", "Ⅸ", "Ⅹ", "Ⅺ", "Ⅻ"] _index = _sort.index(_title) if _index < len(_sort) - 1: return _sort[_index + 1] return None @staticmethod def make_increase(_sort, _title, _add=1): if len(_title) == 0 and _add == 0: return "" if len(_title) == 0 and _add == 1: return _sort[0] _index = _sort.index(_title[-1]) next_index = (_index + _add) % len(_sort) next_chr = _sort[next_index] if _index == len(_sort) - 1: _add = 1 else: _add = 0 return next_chr + ParseUtils.make_increase(_sort, _title[:-1], _add) @staticmethod def rec_serial(_text, o_bbox, p_bbox, fontname, _pattern="(?P^[一二三四五六七八九十]+[、])|" \ "(?P^\d+[\.、\s])|" \ "(?P^\d+\.\d+[\.、\s])|" \ "(?P^\d+\.\d+\.\d+[\.、\s])|" \ "(?P^\d+\.\d+\.\d+\.\d+[\.、\s])"): # todo :recog the serial of the sentence _se = re.search(_pattern, _text) if _se is not None: _gd = _se.groupdict() for k, v in _gd.items(): if v is not None: return k return None if __name__ == '__main__': # get_text_font() PDFConvert(r"C:/Users/Administrator/Downloads/1651896704621.pdf", "C:/Users/Administrator/Downloads/1").get_html() # print(b'\x10')
#@#@#