import io
import logging
import os
import re
import sys
sys.path.append(os.path.dirname(__file__) + "/../")
from pdfplumber import PDF
from pdfplumber.table import TableFinder
from pdfplumber.page import Page as pdfPage
from format_convert.convert_tree import _Document, _Page, _Image, _Sentence, _Table
import time
import pdfminer
import timeout_decorator
from PIL import Image
from format_convert.convert_image import image_preprocess
from format_convert.convert_need_interface import from_ocr_interface, from_office_interface
import traceback
import cv2
import PyPDF2
from PyPDF2 import PdfFileReader, PdfFileWriter
from pdfminer.pdfparser import PDFParser
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from pdfminer.converter import PDFPageAggregator
from pdfminer.layout import LTTextBoxHorizontal, LAParams, LTFigure, LTImage, LTCurve, LTText, LTChar, LTRect, \
LTTextBoxVertical, LTLine
from format_convert import get_memory_info
from utils import judge_error_code, add_div, get_platform, get_html_p, string_similarity, LineTable
import fitz
@get_memory_info.memory_decorator
def pdf2Image(path, save_dir):
logging.info("into pdf2Image")
try:
try:
doc = fitz.open(path)
except Exception as e:
logging.info("pdf format error!")
# print("pdf format error!", e)
return [-3]
# output_image_list = []
output_image_dict = {}
page_count = doc.page_count
for page_no in range(page_count):
# 限制pdf页数,只取前10页后10页
if page_count > 20:
if 10 <= page_no < page_count - 10:
# logging.info("pdf2Image: pdf pages count " + str(doc.page_count)
# + ", only get 70 pages")
continue
try:
page = doc.loadPage(page_no)
output = save_dir + "_page" + str(page_no) + ".png"
rotate = int(0)
# 每个尺寸的缩放系数为1.3,这将为我们生成分辨率提高2.6的图像。
# 此处若是不做设置,默认图片大小为:792X612, dpi=96
# (1.33333333 --> 1056x816) (2 --> 1584x1224)
# (1.183, 2.28 --> 1920x1080)
zoom_x = 3.
zoom_y = 3.
# mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate)
mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate)
pix = page.getPixmap(matrix=mat, alpha=False)
pix.writePNG(output)
pdf_image = cv2.imread(output)
print("pdf_image", page_no, pdf_image.shape)
# output_image_list.append([page_no, output])
output_image_dict[int(page_no)] = output
except ValueError as e:
traceback.print_exc()
if str(e) == "page not in document":
logging.info("pdf2Image page not in document! continue..." + str(page_no))
continue
elif "encrypted" in str(e):
logging.info("pdf2Image document need password " + str(page_no))
return [-7]
except RuntimeError as e:
if "cannot find page" in str(e):
logging.info("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e))
continue
else:
traceback.print_exc()
return [-3]
return [output_image_dict]
except Exception as e:
logging.info("pdf2Image error!")
print("pdf2Image", traceback.print_exc())
return [-1]
@get_memory_info.memory_decorator
@timeout_decorator.timeout(300, timeout_exception=TimeoutError)
def pdf_analyze(interpreter, page, device):
logging.info("into pdf_analyze")
# 解析pdf中的不含表格的页
pdf_time = time.time()
print("pdf_analyze interpreter process...")
interpreter.process_page(page)
print("pdf_analyze device get_result...")
layout = device.get_result()
logging.info("pdf2text read time " + str(time.time() - pdf_time))
return layout
@get_memory_info.memory_decorator
def pdf2text(path, unique_type_dir):
logging.info("into pdf2text")
try:
# pymupdf pdf to image
save_dir = path.split(".")[-2] + "_" + path.split(".")[-1]
output_image_dict = pdf2Image(path, save_dir)
if judge_error_code(output_image_dict):
return output_image_dict
output_image_dict = output_image_dict[0]
output_image_no_list = list(output_image_dict.keys())
output_image_no_list.sort(key=lambda x: x)
# 获取每页pdf提取的文字、表格的列数、轮廓点、是否含表格、页码
# page_info_list = []
page_info_dict = {}
has_table_dict = {}
no_table_dict = {}
for page_no in output_image_no_list:
img_path = output_image_dict.get(page_no)
print("pdf page", page_no, "in total", output_image_no_list[-1])
# 读不出来的跳过
try:
img = cv2.imread(img_path)
img_size = img.shape
except:
logging.info("pdf2text read image in page fail! continue...")
continue
# 每张图片处理
text, column_list, outline_points, is_table = image_preprocess(img, img_path,
use_ocr=False)
if judge_error_code(text):
return text
# page_info_list.append([text, column_list, outline_points, is_table,
# page_no, img_size])
page_info = [text, column_list, outline_points, is_table, img_size]
page_info_dict[int(page_no)] = page_info
# 包含table的和不包含table的
if is_table:
has_table_dict[int(page_no)] = page_info
else:
no_table_dict[int(page_no)] = page_info
has_table_no_list = list(has_table_dict.keys())
has_table_no_list.sort(key=lambda x: x)
page_no_list = list(page_info_dict.keys())
page_no_list.sort(key=lambda x: x)
# 页码表格连接
table_connect_list, connect_text_list = page_table_connect(has_table_dict)
if judge_error_code(table_connect_list):
return table_connect_list
# 连接的页码
table_connect_page_no_list = []
for area in connect_text_list:
table_connect_page_no_list.append(area[1])
print("pdf2text table_connect_list", table_connect_list)
print("connect_text_list", connect_text_list)
# pdfminer 方式
try:
fp = open(path, 'rb')
# 用文件对象创建一个PDF文档分析器
parser = PDFParser(fp)
# 创建一个PDF文档
doc = PDFDocument(parser)
# 连接分析器,与文档对象
rsrcmgr = PDFResourceManager()
device = PDFPageAggregator(rsrcmgr, laparams=LAParams())
interpreter = PDFPageInterpreter(rsrcmgr, device)
# 判断是否能读pdf
for page in PDFPage.create_pages(doc):
break
except pdfminer.psparser.PSEOF as e:
# pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别
logging.info("pdf2text " + str(e) + " use ocr read pdf!")
text_list = []
for page_no in page_no_list:
logging.info("pdf2text ocr page_no " + str(page_no))
page_info = page_info_dict.get(page_no)
# 表格
if page_info[3]:
# 判断表格是否跨页连接
area_no = 0
jump_page = 0
for area in table_connect_list:
if page_no in area:
# 只记录一次text
if page_no == area[0]:
image_text = connect_text_list[area_no][0]
text_list.append([image_text, page_no, 0])
jump_page = 1
area_no += 1
# 是连接页的跳过后面步骤
if jump_page:
continue
# 直接取text
image_text = page_info_dict.get(page_no)[0]
text_list.append([image_text, page_no, 0])
# 非表格
else:
with open(output_image_dict.get(page_no), "rb") as ff:
image_stream = ff.read()
image_text = from_ocr_interface(image_stream)
text_list.append([image_text, page_no, 0])
text_list.sort(key=lambda z: z[1])
text = ""
for t in text_list:
text += t[0]
return [text]
except Exception as e:
logging.info("pdf format error!")
traceback.print_exc()
return [-3]
text_list = []
page_no = 0
pages = PDFPage.create_pages(doc)
pages = list(pages)
page_count = len(pages)
for page in pages:
logging.info("pdf2text pymupdf page_no " + str(page_no))
# 限制pdf页数,只取前100页
# if page_no >= 70:
# logging.info("pdf2text: pdf pages only get 70 pages")
# break
if page_count > 20:
if 10 <= page_no < page_count - 10:
page_no += 1
continue
# 判断页码在含表格页码中,直接拿已生成的text
if page_no in has_table_no_list:
# 判断表格是否跨页连接
area_no = 0
jump_page = 0
for area in table_connect_list:
if page_no in area:
# 只记录一次text
if page_no == area[0]:
image_text = connect_text_list[area_no][0]
text_list.append([image_text, page_no, 0])
jump_page = 1
area_no += 1
# 是连接页的跳过后面步骤
if jump_page:
page_no += 1
continue
# 直接取text
image_text = has_table_dict.get(page_no)[0]
text_list.append([image_text, page_no, 0])
page_no += 1
continue
# 不含表格的解析pdf
else:
if get_platform() == "Windows":
try:
interpreter.process_page(page)
layout = device.get_result()
except Exception:
logging.info("pdf2text pdfminer read pdf page error! continue...")
continue
else:
# 设置超时时间
try:
# 解析pdf中的不含表格的页
if get_platform() == "Windows":
origin_pdf_analyze = pdf_analyze.__wrapped__
layout = origin_pdf_analyze(interpreter, page, device)
else:
layout = pdf_analyze(interpreter, page, device)
except TimeoutError as e:
logging.info("pdf2text pdfminer read pdf page time out!")
return [-4]
except Exception:
logging.info("pdf2text pdfminer read pdf page error! continue...")
continue
# 判断该页有没有文字对象,没有则有可能是有水印
only_image = 1
image_count = 0
for x in layout:
if isinstance(x, LTTextBoxHorizontal):
only_image = 0
if isinstance(x, LTFigure):
image_count += 1
# 如果该页图片数量过多,直接ocr整页识别
logging.info("pdf2text image_count " + str(image_count))
if image_count >= 3:
image_text = page_info_dict.get(page_no)[0]
if image_text is None:
with open(output_image_dict.get(page_no), "rb") as ff:
image_stream = ff.read()
image_text = from_ocr_interface(image_stream)
if judge_error_code(image_text):
return image_text
page_info_dict[page_no][0] = image_text
text_list.append([image_text, page_no, 0])
page_no += 1
continue
order_list = []
for x in layout:
# 该对象是否是ocr识别
ocr_flag = 0
if get_platform() == "Windows":
# print("x", page_no, x)
print()
if isinstance(x, LTTextBoxHorizontal):
image_text = x.get_text()
# 无法识别编码,用ocr
if re.search('[(]cid:[0-9]+[)]', image_text):
print(re.search('[(]cid:[0-9]+[)]', image_text))
image_text = page_info_dict.get(page_no)[0]
if image_text is None:
with open(output_image_dict.get(page_no), "rb") as ff:
image_stream = ff.read()
image_text = from_ocr_interface(image_stream)
if judge_error_code(image_text):
return image_text
page_info_dict[page_no][0] = image_text
image_text = add_div(image_text)
# order_list.append([image_text, page_no, x.bbox[1]])
order_list = [[image_text, page_no, x.bbox[1]]]
break
else:
image_text = add_div(image_text)
order_list.append([image_text, page_no, x.bbox[1]])
continue
if isinstance(x, LTFigure):
for image in x:
if isinstance(image, LTImage):
try:
print("pdf2text LTImage size", page_no, image.width, image.height)
image_stream = image.stream.get_data()
# 小的图忽略
if image.width <= 300 and image.height <= 300:
continue
# 有些水印导致pdf分割、读取报错
# if image.width <= 200 and image.height<=200:
# continue
# img_test = Image.open(io.BytesIO(image_stream))
# img_test.save('temp/LTImage.jpg')
# 查看提取的图片高宽,太大则抛错用pdf输出图进行ocr识别
img_test = Image.open(io.BytesIO(image_stream))
if img_test.size[1] > 2000 or img_test.size[0] > 1500:
print("pdf2text LTImage stream output size", img_test.size)
raise Exception
# 比较小的图则直接保存用ocr识别
else:
img_test.save('temp/LTImage.jpg')
with open('temp/LTImage.jpg', "rb") as ff:
image_stream = ff.read()
image_text = from_ocr_interface(image_stream)
if judge_error_code(image_text):
return image_text
# except pdfminer.pdftypes.PDFNotImplementedError:
# with open(output_image_list[page_no], "rb") as ff:
# image_stream = ff.read()
except Exception:
logging.info("pdf2text pdfminer read image in page " + str(page_no) +
" fail! use pymupdf read image...")
print(traceback.print_exc())
image_text = page_info_dict.get(page_no)[0]
if image_text is None:
with open(output_image_dict.get(page_no), "rb") as ff:
image_stream = ff.read()
image_text = from_ocr_interface(image_stream)
if judge_error_code(image_text):
return image_text
page_info_dict[page_no][0] = image_text
ocr_flag = 1
# 判断只拿到了水印图: 无文字输出且只有图片对象
if image_text == "" and only_image:
# 拆出该页pdf
try:
logging.info("pdf2text guess pdf has watermark")
split_path = get_single_pdf(path, page_no)
except:
# 如果拆分抛异常,则大概率不是水印图,用ocr识别图片
logging.info("pdf2text guess pdf has no watermark")
image_text = page_info_dict.get(page_no)[0]
if image_text is None:
with open(output_image_dict.get(page_no), "rb") as ff:
image_stream = ff.read()
image_text = from_ocr_interface(image_stream)
order_list.append([image_text, page_no, -1])
page_info_dict[page_no][0] = image_text
ocr_flag = 1
continue
if judge_error_code(split_path):
return split_path
# 调用office格式转换
file_path = from_office_interface(split_path, unique_type_dir, 'html', 3)
# if file_path == [-3]:
# return [-3]
if judge_error_code(file_path):
return file_path
# 获取html文本
image_text = get_html_p(file_path)
if judge_error_code(image_text):
return image_text
if get_platform() == "Windows":
print("image_text", page_no, x.bbox[1], image_text)
with open("temp" + str(x.bbox[0]) + ".jpg", "wb") as ff:
ff.write(image_stream)
image_text = add_div(image_text)
if ocr_flag:
order_list.append([image_text, page_no, -1])
else:
order_list.append([image_text, page_no, x.bbox[1]])
order_list.sort(key=lambda z: z[2], reverse=True)
# 有ocr参与识别
if order_list[-1][2] == -1:
ocr_order_list = [order_list[-1]]
not_ocr_order_list = []
not_ocr_text = ""
# 去重,因读取失败而重复获取
for order in order_list:
if order[2] != -1:
not_ocr_order_list.append(order)
not_ocr_text += order[0]
if string_similarity(ocr_order_list[0][0], not_ocr_text) >= 0.85:
order_list = not_ocr_order_list
else:
order_list = ocr_order_list
for order in order_list:
text_list.append(order)
page_no += 1
text = ""
for t in text_list:
# text += add_div(t[0])
if t[0] is not None:
text += t[0]
return [text]
except UnicodeDecodeError as e:
logging.info("pdf2text pdfminer create pages failed! " + str(e))
return [-3]
except Exception as e:
logging.info("pdf2text error!")
print("pdf2text", traceback.print_exc())
return [-1]
def get_single_pdf(path, page_no):
logging.info("into get_single_pdf")
try:
# print("path, ", path)
pdf_origin = PdfFileReader(path, strict=False)
pdf_new = PdfFileWriter()
pdf_new.addPage(pdf_origin.getPage(page_no))
path_new = path.split(".")[0] + "_split.pdf"
with open(path_new, "wb") as ff:
pdf_new.write(ff)
return path_new
except PyPDF2.utils.PdfReadError as e:
raise e
except Exception as e:
logging.info("get_single_pdf error! page " + str(page_no))
print("get_single_pdf", traceback.print_exc())
raise e
def page_table_connect(has_table_dict):
logging.info("into page_table_connect")
if not has_table_dict:
return [], []
try:
# 判断是否有页码的表格相连
table_connect_list = []
temp_list = []
# 离图片顶部或底部距离,页面高度的1/7
threshold = 7
page_no_list = list(has_table_dict.keys())
page_no_list.sort(key=lambda x: x)
for i in range(1, len(page_no_list)):
page_info = has_table_dict.get(page_no_list[i])
last_page_info = has_table_dict.get(page_no_list[i - 1])
# 页码需相连
if page_no_list[i] - page_no_list[i - 1] == 1:
# 上一页最后一个区域的列数和下一页第一个区域列数都为0,且相等
if not last_page_info[1][-1] and not page_info[1][0] and \
last_page_info[1][-1] == page_info[1][0]:
# 上一页的轮廓点要离底部一定距离内,下一页的轮廓点要离顶部一定距离内
if last_page_info[4][0] - last_page_info[2][-1][1][1] \
<= int(last_page_info[4][0] / threshold) \
and page_info[2][0][0][1] - 0 \
<= int(page_info[4][0] / threshold):
temp_list.append(page_no_list[i - 1])
temp_list.append(page_no_list[i])
continue
# 条件不符合的,存储之前保存的连接页码
if len(temp_list) > 1:
temp_list = list(set(temp_list))
temp_list.sort(key=lambda x: x)
table_connect_list.append(temp_list)
temp_list = []
if len(temp_list) > 1:
temp_list = list(set(temp_list))
temp_list.sort(key=lambda x: x)
table_connect_list.append(temp_list)
temp_list = []
# 连接两页内容
connect_text_list = []
for area in table_connect_list:
first_page_no = area[0]
area_page_text = str(has_table_dict.get(first_page_no)[0])
for i in range(1, len(area)):
current_page_no = area[i]
current_page_text = str(has_table_dict.get(current_page_no)[0])
# 连接两个table
table_prefix = re.finditer('
', current_page_text)
index_list = []
for t in table_prefix:
index_list.append(t.span())
delete_index = index_list[0]
current_page_text = current_page_text[:delete_index[0]] \
+ current_page_text[delete_index[1]:]
table_suffix = re.finditer('
', area_page_text)
index_list = []
for t in table_suffix:
index_list.append(t.span())
delete_index = index_list[-1]
area_page_text = area_page_text[:delete_index[0]] \
+ area_page_text[delete_index[1]:]
area_page_text = area_page_text + current_page_text
connect_text_list.append([area_page_text, area])
return table_connect_list, connect_text_list
except Exception as e:
# print("page_table_connect", e)
logging.info("page_table_connect error!")
print("page_table_connect", traceback.print_exc())
return [-1], [-1]
class PDFConvert:
def __init__(self, path):
self._doc = _Document(path)
self.path = path
self.packages = ["pdfminer", "PyMuPDF", "PyPDF2", "pdfplumber"]
self.has_init_pdf = [0] * len(self.packages)
def init_pdf(self, package_name):
# 各个包初始化
try:
if package_name == self.packages[0]:
fp = open(self.path, 'rb')
parser = PDFParser(fp)
self.doc_pdfminer = PDFDocument(parser)
rsrcmgr = PDFResourceManager()
self.laparams = LAParams(line_overlap=0.01,
char_margin=0.05,
line_margin=0.01,
word_margin=0.01,
boxes_flow=0.1,)
self.device = PDFPageAggregator(rsrcmgr, laparams=self.laparams)
self.interpreter = PDFPageInterpreter(rsrcmgr, self.device)
self.has_init_pdf[0] = 1
elif package_name == self.packages[1]:
self.doc_pymupdf = fitz.open(self.path)
self.has_init_pdf[1] = 1
elif package_name == self.packages[2]:
self.doc_pypdf2 = PdfFileReader(self.path, strict=False)
self.doc_pypdf2_new = PdfFileWriter()
self.has_init_pdf[2] = 1
elif package_name == self.packages[3]:
self.fp = open(self.path, 'rb')
self.lt = LineTable()
self.doc_top = 0
self.doc_pdfplumber = PDF(self.fp, laparams=self.laparams.__dict__)
else:
print("Only Suppport Packages", str(self.packages))
raise Exception
except:
logging.info(package_name + " cannot open pdf!")
self._doc.error_code = [-3]
def convert_pdf(self):
if self.has_init_pdf[0] == 0:
self.init_pdf("pdfminer")
if self._doc.error_code is not None:
return
# 判断是否能读pdf
try:
for page in PDFPage.create_pages(self.doc_pdfminer):
break
except pdfminer.psparser.PSEOF as e:
# pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别
logging.info("pdf2text " + str(e) + " use ocr read pdf!")
# 每一页进行处理
pages = PDFPage.create_pages(self.doc_pdfminer)
pages = list(pages)
page_count = len(pages)
page_no = 0
for page in pages:
# 限制pdf页数,只取前后各10页
if page_count > 20:
if 10 <= page_no < page_count - 10:
page_no += 1
continue
self._page = _Page(page, page_no)
# 解析单页
self.convert_page(page, page_no)
if self._doc.error_code is None and self._page.error_code is not None:
self._doc.error_code = self._page.error_code
break
self._doc.add_child(self._page)
page_no += 1
def convert_page(self, page, page_no):
layout = self.get_layout(page)
if judge_error_code(layout):
self._page.error_code = layout
return
# 判断该页的对象类型,并存储
only_image = 1
image_count = 0
lt_text_list = []
lt_image_list = []
for x in layout:
if isinstance(x, (LTTextBoxHorizontal, LTTextBoxVertical)):
only_image = 0
lt_text_list.append(x)
if isinstance(x, LTFigure):
for y in x:
if isinstance(y, LTImage):
lt_image_list.append(y)
image_count += 1
# 若只有文本且图片数为0,直接提取文字及表格
if only_image == 0 and image_count == 0:
# PDFPlumber
if self.has_init_pdf[3] == 0:
self.init_pdf("pdfplumber")
if self._doc.error_code is not None:
return
try:
lt_line_list = []
page_plumber = pdfPage(self.doc_pdfplumber, page, page_number=page_no, initial_doctop=self.doc_top)
self.doc_top += page_plumber.height
table_finder = TableFinder(page_plumber)
for _edge in table_finder.get_edges():
lt_line_list.append(LTLine(1, (float(_edge["x0"]), float(_edge["y0"])),
(float(_edge["x1"]), float(_edge["y1"]))))
list_tables, filter_objs, _ = self.lt.recognize_table(lt_text_list, lt_line_list)
self._page.in_table_objs = filter_objs
for table in list_tables:
_table = _Table(table["table"], table["bbox"])
# self._page.children.append(_table)
self._page.add_child(_table)
list_sentences = ParseUtils.recognize_sentences(lt_text_list, filter_objs,
layout.bbox, page_no)
for sentence in list_sentences:
_sen = _Sentence(sentence.text, sentence.bbox)
# _sen.x = sentence.x0
# _sen.y = sentence.y0
# self._page.children.append(_sen)
self._page.add_child(_sen)
except:
traceback.print_exc()
self._page.error_code = [-8]
# 若该页图片数量过多,或无文本,则直接ocr整页识别
elif image_count >= 3 or only_image == 1:
page_image = self.get_page_image(page_no)
if judge_error_code(page_image):
self._page.error_code = page_image
else:
_image = _Image(page_image[1], page_image[0])
self._page.add_child(_image)
# 正常读取该页对象
else:
# 文本对象
for x in lt_text_list:
# 获取对象文本
object_text = x.get_text()
# 无法识别pdf字符编码,整页用ocr
if re.search('[(]cid:[0-9]+[)]', object_text):
page_image = self.get_page_image(page_no)
if judge_error_code(page_image):
self._page.error_code = page_image
else:
_image = _Image(page_image[1], page_image[0])
self._page.add_child(_image)
return
else:
_sen = _Sentence(object_text, x.bbox)
# _sen.x = x.bbox[0]
# _sen.y = x.bbox[1]
self._page.add_child(_sen)
# 图表对象
for image in lt_image_list:
try:
print("pdf2text LTImage size", page_no, image.width, image.height)
image_stream = image.stream.get_data()
# 小的图忽略
if image.width <= 300 and image.height <= 300:
continue
# 查看提取的图片高宽,太大则用pdf输出图进行ocr识别
img_test = Image.open(io.BytesIO(image_stream))
if img_test.size[1] > 2000 or img_test.size[0] > 1500:
print("pdf2text LTImage stream output size", img_test.size)
page_image = self.get_page_image(page_no)
if judge_error_code(page_image):
self._page.error_code = page_image
else:
_image = _Image(page_image[1], page_image[0])
self._page.add_child(_image)
return
# 比较小的图则直接保存用ocr识别
else:
temp_path = 'temp/LTImage.jpg'
img_test.save(temp_path)
with open(temp_path, "rb") as ff:
image_stream = ff.read()
_image = _Image(image_stream, temp_path)
_image.x = image.bbox[0]
_image.y = image.bbox[1]
self._page.add_child(_image)
except Exception:
logging.info("pdf2text pdfminer read image in page " + str(page_no) +
" fail! use pymupdf read image...")
print(traceback.print_exc())
def get_layout(self, page):
if self.has_init_pdf[0] == 0:
self.init_pdf("pdfminer")
if self._doc.error_code is not None:
return
# 获取该页layout
try:
if get_platform() == "Windows":
self.interpreter.process_page(page)
layout = self.device.get_result()
else:
# 设置超时时间
try:
# 解析pdf中的不含表格的页
if get_platform() == "Windows":
origin_pdf_analyze = pdf_analyze.__wrapped__
layout = origin_pdf_analyze(self.interpreter, page, self.device)
else:
layout = pdf_analyze(self.interpreter, page, self.device)
except TimeoutError as e:
logging.info("pdf2text pdfminer read pdf page time out!")
layout = [-4]
except Exception:
logging.info("pdf2text pdfminer read pdf page error! continue...")
layout = [-3]
return layout
def get_page_image(self, page_no):
try:
if self.has_init_pdf[1] == 0:
self.init_pdf("PyMuPDF")
if self._doc.error_code is not None:
return
save_dir = self.path.split(".")[-2] + "_" + self.path.split(".")[-1]
page = self.doc_pymupdf.loadPage(page_no)
output = save_dir + "_page" + str(page_no) + ".png"
rotate = int(0)
zoom_x = 2.
zoom_y = 2.
mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate)
pix = page.getPixmap(matrix=mat, alpha=False)
pix.writePNG(output)
# pdf_image = cv2.imread(output)
with open(output, "rb") as f:
pdf_image = f.read()
return [output, pdf_image]
except ValueError as e:
traceback.print_exc()
if str(e) == "page not in document":
logging.info("pdf2Image page not in document! continue..." + str(page_no))
return [0]
elif "encrypted" in str(e):
logging.info("pdf2Image document need password " + str(page_no))
return [-7]
except RuntimeError as e:
if "cannot find page" in str(e):
logging.info("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e))
return [0]
else:
traceback.print_exc()
return [-3]
def get_html(self):
self.convert_pdf()
if self._doc.error_code is not None:
return self._doc.error_code
return self._doc.get_html()
# 以下为现成pdf单页解析接口
class ParsePage:
def __init__(self,lt,_page,pdf_page,page_no):
self.page_no = page_no
self.childs = []
self.linetable = lt
list_textbox = []
list_line = []
self.bbox = _page.bbox
list_rect = []
for _obj in _page._objs:
# if isinstance(_obj,LTLine):
# list_line.append(_obj)
if isinstance(_obj,(LTTextBoxHorizontal,LTTextBoxVertical)):
list_textbox.append(_obj)
if isinstance(_obj,(LTRect)):
list_rect.append(_obj)
_tableFinder = TableFinder(pdf_page)
for _edge in _tableFinder.get_edges():
list_line.append(LTLine(1,(float(_edge["x0"]),float(_edge["y0"])),(float(_edge["x1"]),float(_edge["y1"]))))
ParseUtils.getFontinfo(_page)
tables,filter_objs,_ = self.linetable.recognize_table(list_textbox,list_line)
# tables_rect,filter_objs_rect,_ = self.linetable.recognize_table_by_rect(list_textbox,list_rect)
# print("====$$$",len(filter_objs))
for _table in tables:
self.childs.append(ParseTable(_table["bbox"],_table["table"]))
# if len(filter_objs&filter_objs_rect)==0:
# for _table in tables_rect:
# self.childs.append(ParseTable(_table["bbox"],_table["table"]))
# filter_objs = filter_objs & filter_objs_rect
list_sentences = ParseUtils.recognize_sentences(list_textbox,filter_objs,_page.bbox,page_no)
self.childs.extend(list_sentences)
self.childs.sort(key=lambda x:x.bbox[3],reverse=True)
def fixSentences(self):
'''
#fix the sentences of page by context
:return:
'''
set_remove = set()
for _i in range(1,len(self.childs)):
_sentence = self.childs[_i]
if not isinstance(_sentence,(ParseSentence)):
continue
if not _sentence.is_outline and not _sentence.title:
if _i>0:
_j = _i
while 1:
_j -= 1
_sen_tmp = self.childs[_j]
if isinstance(_sen_tmp,(ParseTable)):
_j = -1
break
if _j not in set_remove and abs(_sen_tmp.bbox[2]-self.bbox[2])<100:
break
if _j<0:
break
if _j>=0:
set_remove.add(_i)
self.childs[_j].text += _sentence.text
self.childs[_j].bbox = (min(_sentence.bbox[0],self.childs[_j].bbox[0]),min(_sentence.bbox[1],self.childs[_j].bbox[1]),
max(_sentence.bbox[2],self.childs[_j].bbox[2]),max(_sentence.bbox[3],self.childs[_j].bbox[3]))
list_remove = list(set_remove)
list_remove.sort(key=lambda x:x,reverse=True)
for _i in list_remove:
self.childs.pop(_i)
class ParseTable:
def __init__(self,bbox,_table):
self.table = _table
self.bbox = bbox
def __repr__(self):
_string = "table>>>>>>>>>>>>>>>>>>>>>>>>>\n"
for _line in self.table:
for _cell in _line:
_string += "[%s]%s"%(_cell.get("text").replace("\n","")[:10],"\t\t")
_string += "\n"
return _string
def getSentence(self):
#todo transform table to sentence
pass
class ParseSentence:
def __init__(self,bbox,fontname,fontsize,_text,_title,title_text,_pattern,title_degree,is_outline,outline_location,page_no):
(x0,y0,x1,y1) = bbox
self.x0 = x0
self.y0 = y0
self.x1 = x1
self.y1 = y1
self.bbox = bbox
self.fontname = fontname
self.fontsize = fontsize
self.text = _text
self.title = _title
self.title_text = title_text
self.groups = _pattern
self.title_degree = title_degree
self.is_outline = is_outline
self.outline_location = outline_location
self.page_no = page_no
def __repr__(self):
return "%s,%s,%s,%d,%s"%(self.text,self.title,self.is_outline,self.outline_location,str(self.bbox))
class ParseUtils:
@staticmethod
def getFontinfo(_page):
for _obj in _page._objs:
if isinstance(_obj,(LTTextBoxHorizontal,LTTextBoxVertical)):
for textline in _obj._objs:
done = False
for lchar in textline._objs:
if isinstance(lchar,(LTChar)):
_obj.fontname = lchar.fontname
_obj.fontsize = lchar.size
done = True
break
if done:
break
@staticmethod
def recognize_sentences(list_textbox,filter_objs,page_bbox,page_no,remove_space=True):
list_textbox.sort(key=lambda x:x.bbox[0])
list_textbox.sort(key=lambda x:x.bbox[3],reverse=True)
cluster_textbox = []
for _textbox in list_textbox:
if _textbox in filter_objs:
continue
_find = False
for _ct in cluster_textbox:
if abs(_ct["y"]-_textbox.bbox[1])<5:
_find = True
_ct["textbox"].append(_textbox)
if not _find:
cluster_textbox.append({"y":_textbox.bbox[1],"textbox":[_textbox]})
cluster_textbox.sort(key=lambda x:x["y"],reverse=True)
list_sentences = []
for _line in cluster_textbox:
_textboxs = _line["textbox"]
_textboxs.sort(key=lambda x:x.bbox[0])
_linetext = _textboxs[0].get_text()
for _i in range(1,len(_textboxs)):
if abs(_textboxs[_i].bbox[0]-_textboxs[_i-1].bbox[0])>30:
if _linetext[-1] not in (",",",","。",".","、",";"):
_linetext += "=,="
_linetext += _textboxs[_i].get_text()
_linetext = re.sub("[\s\r\n]","",_linetext)
_bbox = (_textboxs[0].bbox[0],_textboxs[0].bbox[1],_textboxs[-1].bbox[2],_textboxs[-1].bbox[3])
_title = None
_pattern_groups = None
title_text = ""
if not _title:
_groups = ParseUtils.find_title_by_pattern(_textboxs[0].get_text())
if _groups:
_title = _groups[0][0]
title_text = _groups[0][1]
_pattern_groups = _groups
if not _title:
_groups = ParseUtils.find_title_by_pattern(_linetext)
if _groups:
_title = _groups[0][0]
title_text = _groups[0][1]
_pattern_groups = _groups
if not _title:
_title = ParseUtils.rec_incenter(_bbox,page_bbox)
title_degree = 2
if not _title:
_linetext = _linetext.replace("=,=",",")
else:
_linetext = _linetext.replace("=,=","")
title_degree = int(_title.split("_")[1])
#页码
if ParseUtils.rec_incenter(_bbox,page_bbox) and re.search("^\d+$",_linetext) is not None:
continue
if _linetext=="" or re.search("^,+$",_linetext) is not None:
continue
is_outline = False
outline_location = -1
_search = re.search("(?P.+?)\.{5,}(?P\d+)$",_linetext)
if _search is not None:
is_outline = True
_linetext = _search.group("text")
outline_location = int(_search.group("nums"))
list_sentences.append(ParseSentence(_bbox,_textboxs[-1].__dict__.get("fontname"),_textboxs[-1].__dict__.get("fontsize"),_linetext,_title,title_text,_pattern_groups,title_degree,is_outline,outline_location,page_no))
# for _sen in list_sentences:
# print(_sen.__dict__)
return list_sentences
@staticmethod
def find_title_by_pattern(_text,_pattern="(?P(?P^第?)(?P[一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+)(?P[、章]))|" \
"(?P^(?P[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+))|" \
"(?P^(?P第?)(?P[一二三四五六七八九十]+)(?P[节]))|" \
"(?P^(?P\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-])(?P\d{1,2})(?P[\..、\s\-]))|" \
"(?P^(?P\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-])(?P\d{1,2})(?P[\..、\s\-]))|" \
"(?P^(?P\d{1,2}[\..、\s\-])(?P\d{1,2})(?P[\..、\s\-]))|" \
"(?P^(?P\d{1,2})(?P[\..、\s\-]))|" \
"(?P^(?P(?)(?P\d{1,2})(?P)))|" \
"(?P^(?P(?)(?P[a-zA-Z]+)(?P)))|"
"(?P^(?P(?)(?P[一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+)(?P)))|" \
):
_se = re.search(_pattern,_text)
groups = []
if _se is not None:
_gd = _se.groupdict()
for k,v in _gd.items():
if v is not None:
groups.append((k,v))
if len(groups):
groups.sort(key=lambda x:x[0])
return groups
return None
@staticmethod
def rec_incenter(o_bbox,p_bbox):
p_width = p_bbox[2]-p_bbox[0]
l_space = (o_bbox[0]-p_bbox[0])/p_width
r_space = (p_bbox[2]-o_bbox[2])/p_width
if abs((l_space-r_space))<0.1 and l_space>0.2:
return "title_2"
@staticmethod
def is_first_title(_title):
if _title is None:
return False
if re.search("^\d+$",_title) is not None:
if int(_title)==1:
return True
return False
if re.search("^[一二三四五六七八九十百]+$",_title) is not None:
if _title=="一":
return True
return False
if re.search("^[a-z]+$",_title) is not None:
if _title=="a":
return True
return False
if re.search("^[A-Z]+$",_title) is not None:
if _title=="A":
return True
return False
if re.search("^[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]$",_title) is not None:
if _title=="Ⅰ":
return True
return False
return False
@staticmethod
def get_next_title(_title):
if re.search("^\d+$",_title) is not None:
return str(int(_title)+1)
if re.search("^[一二三四五六七八九十百]+$",_title) is not None:
_next_title = ParseUtils.make_increase(['一','二','三','四','五','六','七','八','九','十'],re.sub("[十百]",'',_title))
_next_title = list(_next_title)
_next_title.reverse()
if _next_title[-1]!="十":
if len(_next_title)>=2:
_next_title.insert(-1,'十')
if len(_next_title)>=4:
_next_title.insert(-3,'百')
if _title[0]=="十":
if _next_title=="十":
_next_title = ["二","十"]
_next_title.insert(0,"十")
_next_title = "".join(_next_title)
return _next_title
if re.search("^[a-z]+$",_title) is not None:
_next_title = ParseUtils.make_increase([chr(i+ord('a')) for i in range(26)],_title)
_next_title = list(_next_title)
_next_title.reverse()
return "".join(_next_title)
if re.search("^[A-Z]+$",_title) is not None:
_next_title = ParseUtils.make_increase([chr(i+ord('A')) for i in range(26)],_title)
_next_title = list(_next_title)
_next_title.reverse()
return "".join(_next_title)
if re.search("^[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]$",_title) is not None:
_sort = ["Ⅰ","Ⅱ","Ⅲ","Ⅳ","Ⅴ","Ⅵ","Ⅶ","Ⅷ","Ⅸ","Ⅹ","Ⅺ","Ⅻ"]
_index = _sort.index(_title)
if _index^[一二三四五六七八九十]+[、])|" \
"(?P^\d+[\.、\s])|" \
"(?P^\d+\.\d+[\.、\s])|" \
"(?P^\d+\.\d+\.\d+[\.、\s])|" \
"(?P^\d+\.\d+\.\d+\.\d+[\.、\s])"):
#todo :recog the serial of the sentence
_se = re.search(_pattern,_text)
if _se is not None:
_gd = _se.groupdict()
for k,v in _gd.items():
if v is not None:
return k
return None