123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103 |
- import copy
- import inspect
- import io
- import logging
- import os
- import re
- import sys
- from bs4 import BeautifulSoup
- sys.path.append(os.path.dirname(__file__) + "/../")
- from pdfplumber import PDF
- from pdfplumber.table import TableFinder
- from pdfplumber.page import Page as pdfPage
- from format_convert.convert_tree import _Document, _Page, _Image, _Sentence, _Table
- import time
- import pdfminer
- import math
- from scipy.stats import linregress
- from matplotlib import pyplot as plt
- from shapely.geometry import LineString, Point
- from format_convert import timeout_decorator
- from PIL import Image
- from format_convert.convert_image import image_process
- from format_convert.convert_need_interface import from_ocr_interface, from_office_interface
- import traceback
- import cv2
- import PyPDF2
- from PyPDF2 import PdfFileReader, PdfFileWriter
- from pdfminer.pdfparser import PDFParser
- from pdfminer.pdfdocument import PDFDocument
- from pdfminer.pdfpage import PDFPage
- from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
- from pdfminer.converter import PDFPageAggregator
- from pdfminer.layout import LTTextBoxHorizontal, LAParams, LTFigure, LTImage, LTCurve, LTText, LTChar, LTRect, \
- LTTextBoxVertical, LTLine, LTTextContainer
- from format_convert.utils import judge_error_code, add_div, get_platform, get_html_p, string_similarity, LineTable, \
- get_logger, log, memory_decorator, draw_lines_plt, get_garble_code, line_is_cross
- import fitz
- from format_convert.wrapt_timeout_decorator import timeout
- @memory_decorator
- def pdf2Image(path, save_dir):
- log("into pdf2Image")
- try:
- try:
- doc = fitz.open(path)
- except Exception as e:
- log("pdf format error!")
- # print("pdf format error!", e)
- return [-3]
- # output_image_list = []
- output_image_dict = {}
- page_count = doc.page_count
- for page_no in range(page_count):
- # 限制pdf页数,只取前10页后10页
- if page_count > 20:
- if 10 <= page_no < page_count - 10:
- # log("pdf2Image: pdf pages count " + str(doc.page_count)
- # + ", only get 70 pages")
- continue
- try:
- page = doc.loadPage(page_no)
- output = save_dir + "_page" + str(page_no) + ".png"
- rotate = int(0)
- # 每个尺寸的缩放系数为1.3,这将为我们生成分辨率提高2.6的图像。
- # 此处若是不做设置,默认图片大小为:792X612, dpi=96
- # (1.33333333 --> 1056x816) (2 --> 1584x1224)
- # (1.183, 2.28 --> 1920x1080)
- zoom_x = 3.
- zoom_y = 3.
- # mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate)
- mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate)
- pix = page.getPixmap(matrix=mat, alpha=False)
- pix.writePNG(output)
- pdf_image = cv2.imread(output)
- print("pdf_image", page_no, pdf_image.shape)
- # output_image_list.append([page_no, output])
- output_image_dict[int(page_no)] = output
- except ValueError as e:
- traceback.print_exc()
- if str(e) == "page not in document":
- log("pdf2Image page not in document! continue..." + str(page_no))
- continue
- elif "encrypted" in str(e):
- log("pdf2Image document need password " + str(page_no))
- return [-7]
- except RuntimeError as e:
- if "cannot find page" in str(e):
- log("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e))
- continue
- else:
- traceback.print_exc()
- return [-3]
- return [output_image_dict]
- except Exception as e:
- log("pdf2Image error!")
- print("pdf2Image", traceback.print_exc())
- return [-1]
- @timeout(10, timeout_exception=TimeoutError)
- def pdf_analyze(interpreter, page, device, page_no):
- log("into pdf_analyze")
- pdf_time = time.time()
- # print("pdf_analyze interpreter process...")
- interpreter.process_page(page)
- # print("pdf_analyze device get_result...")
- layout = device.get_result()
- log("pdf2text page " + str(page_no) + " read time " + str(time.time() - pdf_time))
- return layout
- @memory_decorator
- def pdf2text(path, unique_type_dir):
- log("into pdf2text")
- try:
- # pymupdf pdf to image
- save_dir = path.split(".")[-2] + "_" + path.split(".")[-1]
- output_image_dict = pdf2Image(path, save_dir)
- if judge_error_code(output_image_dict):
- return output_image_dict
- output_image_dict = output_image_dict[0]
- output_image_no_list = list(output_image_dict.keys())
- output_image_no_list.sort(key=lambda x: x)
- # 获取每页pdf提取的文字、表格的列数、轮廓点、是否含表格、页码
- # page_info_list = []
- page_info_dict = {}
- has_table_dict = {}
- no_table_dict = {}
- for page_no in output_image_no_list:
- img_path = output_image_dict.get(page_no)
- print("pdf page", page_no, "in total", output_image_no_list[-1])
- # 读不出来的跳过
- try:
- img = cv2.imread(img_path)
- img_size = img.shape
- except:
- log("pdf2text read image in page fail! continue...")
- continue
- # 每张图片处理
- text, column_list, outline_points, is_table = image_process(img, img_path, use_ocr=False)
- if judge_error_code(text):
- return text
- # page_info_list.append([text, column_list, outline_points, is_table,
- # page_no, img_size])
- page_info = [text, column_list, outline_points, is_table, img_size]
- page_info_dict[int(page_no)] = page_info
- # 包含table的和不包含table的
- if is_table:
- has_table_dict[int(page_no)] = page_info
- else:
- no_table_dict[int(page_no)] = page_info
- has_table_no_list = list(has_table_dict.keys())
- has_table_no_list.sort(key=lambda x: x)
- page_no_list = list(page_info_dict.keys())
- page_no_list.sort(key=lambda x: x)
- # 页码表格连接
- table_connect_list, connect_text_list = page_table_connect(has_table_dict)
- if judge_error_code(table_connect_list):
- return table_connect_list
- # 连接的页码
- table_connect_page_no_list = []
- for area in connect_text_list:
- table_connect_page_no_list.append(area[1])
- print("pdf2text table_connect_list", table_connect_list)
- print("connect_text_list", connect_text_list)
- # pdfminer 方式
- try:
- fp = open(path, 'rb')
- # 用文件对象创建一个PDF文档分析器
- parser = PDFParser(fp)
- # 创建一个PDF文档
- doc = PDFDocument(parser)
- # 连接分析器,与文档对象
- rsrcmgr = PDFResourceManager()
- device = PDFPageAggregator(rsrcmgr, laparams=LAParams())
- interpreter = PDFPageInterpreter(rsrcmgr, device)
- # 判断是否能读pdf
- for page in PDFPage.create_pages(doc):
- break
- except pdfminer.psparser.PSEOF as e:
- # pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别
- log("pdf2text " + str(e) + " use ocr read pdf!")
- text_list = []
- for page_no in page_no_list:
- log("pdf2text ocr page_no " + str(page_no))
- page_info = page_info_dict.get(page_no)
- # 表格
- if page_info[3]:
- # 判断表格是否跨页连接
- area_no = 0
- jump_page = 0
- for area in table_connect_list:
- if page_no in area:
- # 只记录一次text
- if page_no == area[0]:
- image_text = connect_text_list[area_no][0]
- text_list.append([image_text, page_no, 0])
- jump_page = 1
- area_no += 1
- # 是连接页的跳过后面步骤
- if jump_page:
- continue
- # 直接取text
- image_text = page_info_dict.get(page_no)[0]
- text_list.append([image_text, page_no, 0])
- # 非表格
- else:
- with open(output_image_dict.get(page_no), "rb") as ff:
- image_stream = ff.read()
- image_text = from_ocr_interface(image_stream)
- text_list.append([image_text, page_no, 0])
- text_list.sort(key=lambda z: z[1])
- text = ""
- for t in text_list:
- text += t[0]
- return [text]
- except Exception as e:
- log("pdf format error!")
- traceback.print_exc()
- return [-3]
- text_list = []
- page_no = 0
- pages = PDFPage.create_pages(doc)
- pages = list(pages)
- page_count = len(pages)
- for page in pages:
- log("pdf2text pymupdf page_no " + str(page_no))
- # 限制pdf页数,只取前100页
- # if page_no >= 70:
- # log("pdf2text: pdf pages only get 70 pages")
- # break
- if page_count > 20:
- if 10 <= page_no < page_count - 10:
- page_no += 1
- continue
- # 判断页码在含表格页码中,直接拿已生成的text
- if page_no in has_table_no_list:
- # 判断表格是否跨页连接
- area_no = 0
- jump_page = 0
- for area in table_connect_list:
- if page_no in area:
- # 只记录一次text
- if page_no == area[0]:
- image_text = connect_text_list[area_no][0]
- text_list.append([image_text, page_no, 0])
- jump_page = 1
- area_no += 1
- # 是连接页的跳过后面步骤
- if jump_page:
- page_no += 1
- continue
- # 直接取text
- image_text = has_table_dict.get(page_no)[0]
- text_list.append([image_text, page_no, 0])
- page_no += 1
- continue
- # 不含表格的解析pdf
- else:
- if get_platform() == "Windows":
- try:
- interpreter.process_page(page)
- layout = device.get_result()
- except Exception:
- log("pdf2text pdfminer read pdf page error! continue...")
- continue
- else:
- # 设置超时时间
- try:
- # 解析pdf中的不含表格的页
- if get_platform() == "Windows":
- origin_pdf_analyze = pdf_analyze.__wrapped__
- layout = origin_pdf_analyze(interpreter, page, device)
- else:
- layout = pdf_analyze(interpreter, page, device, page_no)
- except TimeoutError as e:
- log("pdf2text pdfminer read pdf page time out!")
- return [-4]
- except Exception:
- log("pdf2text pdfminer read pdf page error! continue...")
- continue
- # 判断该页有没有文字对象,没有则有可能是有水印
- only_image = 1
- image_count = 0
- for x in layout:
- if isinstance(x, LTTextBoxHorizontal):
- only_image = 0
- if isinstance(x, LTFigure):
- image_count += 1
- # 如果该页图片数量过多,直接ocr整页识别
- log("pdf2text image_count " + str(image_count))
- if image_count >= 3:
- image_text = page_info_dict.get(page_no)[0]
- if image_text is None:
- with open(output_image_dict.get(page_no), "rb") as ff:
- image_stream = ff.read()
- image_text = from_ocr_interface(image_stream)
- if judge_error_code(image_text):
- return image_text
- page_info_dict[page_no][0] = image_text
- text_list.append([image_text, page_no, 0])
- page_no += 1
- continue
- order_list = []
- for x in layout:
- # 该对象是否是ocr识别
- ocr_flag = 0
- if get_platform() == "Windows":
- # print("x", page_no, x)
- print()
- if isinstance(x, LTTextBoxHorizontal):
- image_text = x.get_text()
- # 无法识别编码,用ocr
- if re.search('[(]cid:[0-9]+[)]', image_text):
- print(re.search('[(]cid:[0-9]+[)]', image_text))
- image_text = page_info_dict.get(page_no)[0]
- if image_text is None:
- with open(output_image_dict.get(page_no), "rb") as ff:
- image_stream = ff.read()
- image_text = from_ocr_interface(image_stream)
- if judge_error_code(image_text):
- return image_text
- page_info_dict[page_no][0] = image_text
- image_text = add_div(image_text)
- # order_list.append([image_text, page_no, x.bbox[1]])
- order_list = [[image_text, page_no, x.bbox[1]]]
- break
- else:
- image_text = add_div(image_text)
- order_list.append([image_text, page_no, x.bbox[1]])
- continue
- if isinstance(x, LTFigure):
- for image in x:
- if isinstance(image, LTImage):
- try:
- print("pdf2text LTImage size", page_no, image.width, image.height)
- image_stream = image.stream.get_data()
- # 小的图忽略
- if image.width <= 300 and image.height <= 300:
- continue
- # 有些水印导致pdf分割、读取报错
- # if image.width <= 200 and image.height<=200:
- # continue
- # img_test = Image.open(io.BytesIO(image_stream))
- # img_test.save('temp/LTImage.jpg')
- # 查看提取的图片高宽,太大则抛错用pdf输出图进行ocr识别
- img_test = Image.open(io.BytesIO(image_stream))
- if img_test.size[1] > 2000 or img_test.size[0] > 1500:
- print("pdf2text LTImage stream output size", img_test.size)
- raise Exception
- # 比较小的图则直接保存用ocr识别
- else:
- img_test.save('temp/LTImage.jpg')
- with open('temp/LTImage.jpg', "rb") as ff:
- image_stream = ff.read()
- image_text = from_ocr_interface(image_stream)
- if judge_error_code(image_text):
- return image_text
- # except pdfminer.pdftypes.PDFNotImplementedError:
- # with open(output_image_list[page_no], "rb") as ff:
- # image_stream = ff.read()
- except Exception:
- log("pdf2text pdfminer read image in page " + str(page_no) +
- " fail! use pymupdf read image...")
- # print(traceback.print_exc())
- image_text = page_info_dict.get(page_no)[0]
- if image_text is None:
- with open(output_image_dict.get(page_no), "rb") as ff:
- image_stream = ff.read()
- image_text = from_ocr_interface(image_stream)
- if judge_error_code(image_text):
- return image_text
- page_info_dict[page_no][0] = image_text
- ocr_flag = 1
- # 判断只拿到了水印图: 无文字输出且只有图片对象
- if image_text == "" and only_image:
- # 拆出该页pdf
- try:
- log("pdf2text guess pdf has watermark")
- split_path = get_single_pdf(path, page_no)
- except:
- # 如果拆分抛异常,则大概率不是水印图,用ocr识别图片
- log("pdf2text guess pdf has no watermark")
- image_text = page_info_dict.get(page_no)[0]
- if image_text is None:
- with open(output_image_dict.get(page_no), "rb") as ff:
- image_stream = ff.read()
- image_text = from_ocr_interface(image_stream)
- order_list.append([image_text, page_no, -1])
- page_info_dict[page_no][0] = image_text
- ocr_flag = 1
- continue
- if judge_error_code(split_path):
- return split_path
- # 调用office格式转换
- file_path = from_office_interface(split_path, unique_type_dir, 'html', 3)
- # if file_path == [-3]:
- # return [-3]
- if judge_error_code(file_path):
- return file_path
- # 获取html文本
- image_text = get_html_p(file_path)
- if judge_error_code(image_text):
- return image_text
- if get_platform() == "Windows":
- print("image_text", page_no, x.bbox[1], image_text)
- with open("temp" + str(x.bbox[0]) + ".jpg", "wb") as ff:
- ff.write(image_stream)
- image_text = add_div(image_text)
- if ocr_flag:
- order_list.append([image_text, page_no, -1])
- else:
- order_list.append([image_text, page_no, x.bbox[1]])
- order_list.sort(key=lambda z: z[2], reverse=True)
- # 有ocr参与识别
- if order_list[-1][2] == -1:
- ocr_order_list = [order_list[-1]]
- not_ocr_order_list = []
- not_ocr_text = ""
- # 去重,因读取失败而重复获取
- for order in order_list:
- if order[2] != -1:
- not_ocr_order_list.append(order)
- not_ocr_text += order[0]
- if string_similarity(ocr_order_list[0][0], not_ocr_text) >= 0.85:
- order_list = not_ocr_order_list
- else:
- order_list = ocr_order_list
- for order in order_list:
- text_list.append(order)
- page_no += 1
- text = ""
- for t in text_list:
- # text += add_div(t[0])
- if t[0] is not None:
- text += t[0]
- return [text]
- except UnicodeDecodeError as e:
- log("pdf2text pdfminer create pages failed! " + str(e))
- return [-3]
- except Exception as e:
- log("pdf2text error!")
- traceback.print_exc()
- return [-1]
- def get_single_pdf(path, page_no):
- log("into get_single_pdf")
- try:
- # print("path, ", path)
- pdf_origin = PdfFileReader(path, strict=False)
- pdf_new = PdfFileWriter()
- pdf_new.addPage(pdf_origin.getPage(page_no))
- path_new = path.split(".")[0] + "_split.pdf"
- with open(path_new, "wb") as ff:
- pdf_new.write(ff)
- return path_new
- except PyPDF2.utils.PdfReadError as e:
- raise e
- except Exception as e:
- log("get_single_pdf error! page " + str(page_no))
- traceback.print_exc()
- raise e
- def page_table_connect(has_table_dict):
- log("into page_table_connect")
- if not has_table_dict:
- return [], []
- try:
- # 判断是否有页码的表格相连
- table_connect_list = []
- temp_list = []
- # 离图片顶部或底部距离,页面高度的1/7
- threshold = 7
- page_no_list = list(has_table_dict.keys())
- page_no_list.sort(key=lambda x: x)
- for i in range(1, len(page_no_list)):
- page_info = has_table_dict.get(page_no_list[i])
- last_page_info = has_table_dict.get(page_no_list[i - 1])
- # 页码需相连
- if page_no_list[i] - page_no_list[i - 1] == 1:
- # 上一页最后一个区域的列数和下一页第一个区域列数都为0,且相等
- if not last_page_info[1][-1] and not page_info[1][0] and \
- last_page_info[1][-1] == page_info[1][0]:
- # 上一页的轮廓点要离底部一定距离内,下一页的轮廓点要离顶部一定距离内
- if last_page_info[4][0] - last_page_info[2][-1][1][1] \
- <= int(last_page_info[4][0] / threshold) \
- and page_info[2][0][0][1] - 0 \
- <= int(page_info[4][0] / threshold):
- temp_list.append(page_no_list[i - 1])
- temp_list.append(page_no_list[i])
- continue
- # 条件不符合的,存储之前保存的连接页码
- if len(temp_list) > 1:
- temp_list = list(set(temp_list))
- temp_list.sort(key=lambda x: x)
- table_connect_list.append(temp_list)
- temp_list = []
- if len(temp_list) > 1:
- temp_list = list(set(temp_list))
- temp_list.sort(key=lambda x: x)
- table_connect_list.append(temp_list)
- temp_list = []
- # 连接两页内容
- connect_text_list = []
- for area in table_connect_list:
- first_page_no = area[0]
- area_page_text = str(has_table_dict.get(first_page_no)[0])
- for i in range(1, len(area)):
- current_page_no = area[i]
- current_page_text = str(has_table_dict.get(current_page_no)[0])
- # 连接两个table
- table_prefix = re.finditer('<table border="1">', current_page_text)
- index_list = []
- for t in table_prefix:
- index_list.append(t.span())
- delete_index = index_list[0]
- current_page_text = current_page_text[:delete_index[0]] \
- + current_page_text[delete_index[1]:]
- table_suffix = re.finditer('</table>', area_page_text)
- index_list = []
- for t in table_suffix:
- index_list.append(t.span())
- delete_index = index_list[-1]
- area_page_text = area_page_text[:delete_index[0]] \
- + area_page_text[delete_index[1]:]
- area_page_text = area_page_text + current_page_text
- connect_text_list.append([area_page_text, area])
- return table_connect_list, connect_text_list
- except Exception as e:
- # print("page_table_connect", e)
- log("page_table_connect error!")
- traceback.print_exc()
- return [-1], [-1]
- @timeout(30, timeout_exception=TimeoutError)
- def read_pdf(path, package_name, packages):
- log(package_name)
- laparams = LAParams(line_overlap=0.01,
- char_margin=0.3,
- line_margin=0.01,
- word_margin=0.01,
- boxes_flow=0.1, )
- if package_name == packages[0]:
- fp = open(path, 'rb')
- parser = PDFParser(fp)
- doc_pdfminer = PDFDocument(parser)
- rsrcmgr = PDFResourceManager()
- device = PDFPageAggregator(rsrcmgr, laparams=laparams)
- interpreter = PDFPageInterpreter(rsrcmgr, device)
- return doc_pdfminer, device, interpreter
- elif package_name == packages[1]:
- doc_pymupdf = fitz.open(path)
- return doc_pymupdf
- elif package_name == packages[2]:
- doc_pypdf2 = PdfFileReader(path, strict=False)
- doc_pypdf2_new = PdfFileWriter()
- return doc_pypdf2, doc_pypdf2_new
- elif package_name == packages[3]:
- fp = open(path, 'rb')
- lt = LineTable()
- doc_top = 0
- doc_pdfplumber = read_pdfplumber(fp, laparams)
- return lt, doc_top, doc_pdfplumber
- @timeout(25, timeout_exception=TimeoutError)
- def read_pdfminer(path, laparams):
- fp = open(path, 'rb')
- parser = PDFParser(fp)
- doc_pdfminer = PDFDocument(parser)
- rsrcmgr = PDFResourceManager()
- device = PDFPageAggregator(rsrcmgr, laparams=laparams)
- interpreter = PDFPageInterpreter(rsrcmgr, device)
- return doc_pdfminer, device, interpreter
- @timeout(15, timeout_exception=TimeoutError)
- def read_pymupdf(path):
- return fitz.open(path)
- @timeout(15, timeout_exception=TimeoutError)
- def read_pypdf2(path):
- doc_pypdf2 = PdfFileReader(path, strict=False)
- doc_pypdf2_new = PdfFileWriter()
- return doc_pypdf2, doc_pypdf2_new
- @timeout(25, timeout_exception=TimeoutError, use_signals=False)
- def read_pdfplumber(path, laparams):
- fp = open(path, 'rb')
- lt = LineTable()
- doc_top = 0
- doc_pdfplumber = PDF(fp, laparams=laparams.__dict__)
- return lt, doc_top, doc_pdfplumber
- class PDFConvert:
- def __init__(self, path, unique_type_dir, need_page_no):
- self._doc = _Document(path)
- self.path = path
- self.unique_type_dir = unique_type_dir
- if not os.path.exists(self.unique_type_dir):
- os.mkdir(self.unique_type_dir)
- # 指定提取的页码范围
- self.need_page_no = need_page_no
- self.start_page_no = None
- self.end_page_no = None
- # 默认使用limit_page_cnt控制,前10页后10页
- if self.need_page_no is None:
- self.limit_page_cnt = 20
- else:
- # 使用start_page_no,end_page_no范围控制,例如2,5
- ss = self.need_page_no.split(',')
- if len(ss) != 2:
- self._doc.error_code = [-14]
- else:
- self.start_page_no = int(ss[0])
- self.end_page_no = int(ss[-1])
- if self.end_page_no == -1:
- self.end_page_no = 1000000
- self.start_page_no -= 1
- self.end_page_no -= 1
- if self.end_page_no <= self.start_page_no or self.start_page_no < 0 or self.end_page_no < -1:
- self._doc.error_code = [-14]
- self.packages = ["pdfminer", "PyMuPDF", "PyPDF2", "pdfplumber"]
- self.has_init_pdf = [0] * len(self.packages)
- @memory_decorator
- def init_package(self, package_name):
- # 各个包初始化
- try:
- laparams = LAParams(line_overlap=0.01,
- char_margin=0.3,
- line_margin=0.01,
- word_margin=0.01,
- boxes_flow=0.1, )
- if package_name == self.packages[0]:
- # fp = open(self.path, 'rb')
- # parser = PDFParser(fp)
- # self.doc_pdfminer = PDFDocument(parser)
- # rsrcmgr = PDFResourceManager()
- # self.laparams = LAParams(line_overlap=0.01,
- # char_margin=0.3,
- # line_margin=0.01,
- # word_margin=0.01,
- # boxes_flow=0.1,)
- # self.device = PDFPageAggregator(rsrcmgr, laparams=self.laparams)
- # self.interpreter = PDFPageInterpreter(rsrcmgr, self.device)
- self.doc_pdfminer, self.device, self.interpreter = read_pdfminer(self.path, laparams)
- self.has_init_pdf[0] = 1
- elif package_name == self.packages[1]:
- self.doc_pymupdf = read_pymupdf(self.path)
- self.has_init_pdf[1] = 1
- elif package_name == self.packages[2]:
- # self.doc_pypdf2 = PdfFileReader(self.path, strict=False)
- # self.doc_pypdf2_new = PdfFileWriter()
- self.doc_pypdf2, self.doc_pypdf2_new = read_pypdf2(self.path)
- self.has_init_pdf[2] = 1
- elif package_name == self.packages[3]:
- # self.fp = open(self.path, 'rb')
- # self.lt = LineTable()
- # self.doc_top = 0
- # self.doc_pdfplumber = PDF(self.fp, laparams=self.laparams.__dict__)
- self.lt, self.doc_top, self.doc_pdfplumber = read_pdfplumber(self.path, laparams)
- self.has_init_pdf[3] = 0
- else:
- log("Only Support Packages " + str(self.packages))
- raise Exception
- except Exception as e:
- log(package_name + " cannot open pdf!")
- traceback.print_exc()
- self._doc.error_code = [-3]
- def convert(self, limit_page_cnt=20):
- if self.has_init_pdf[0] == 0:
- self.init_package("pdfminer")
- if self._doc.error_code is not None:
- self._doc.error_code = None
- # pdfminer读不了直接转成图片识别
- self.get_all_page_image()
- return
- # 判断是否能读pdf
- try:
- pages = PDFPage.create_pages(self.doc_pdfminer)
- for page in pages:
- break
- pages = list(pages)
- # except pdfminer.psparser.PSEOF as e:
- except:
- # pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别
- log("pdf2text pdfminer read failed! read by pymupdf!")
- traceback.print_exc()
- try:
- self.get_all_page_image()
- return
- except:
- traceback.print_exc()
- log("pdf2text use pymupdf read failed!")
- self._doc.error_code = [-3]
- return
- # 每一页进行处理
- pages = PDFPage.create_pages(self.doc_pdfminer)
- pages = list(pages)
- page_count = len(pages)
- page_no = 0
- for page in pages:
- # 指定pdf页码
- if self.start_page_no is not None and self.end_page_no is not None:
- if page_count < self.end_page_no:
- self.end_page_no = page_count
- if page_no < self.start_page_no or page_no >= self.end_page_no:
- page_no += 1
- continue
- # 限制pdf页数,只取前后各10页
- else:
- if page_count > limit_page_cnt and int(limit_page_cnt/2) <= page_no < page_count - int(limit_page_cnt/2):
- page_no += 1
- continue
- # 解析单页
- self._page = _Page(page, page_no)
- self.convert_page(page, page_no)
- if self._doc.error_code is None and self._page.error_code is not None:
- if self._page.error_code[0] in [-4, -3, 0]:
- page_no += 1
- continue
- else:
- self._doc.error_code = self._page.error_code
- break
- self._doc.add_child(self._page)
- page_no += 1
- def clean_text(self, _text):
- return re.sub("\s", "", _text)
- def get_text_lines(self, page, page_no):
- lt_line_list = []
- page_plumber = pdfPage(self.doc_pdfplumber, page, page_number=page_no, initial_doctop=self.doc_top)
- self.doc_top += page_plumber.height
- table_finder = TableFinder(page_plumber)
- all_width_zero = True
- for _edge in table_finder.get_edges():
- if _edge.get('linewidth') and _edge.get('linewidth') > 0:
- all_width_zero = False
- break
- for _edge in table_finder.get_edges():
- # print(_edge)
- if _edge.get('linewidth', 0.1) > 0 or all_width_zero:
- lt_line_list.append(LTLine(1, (float(_edge["x0"]), float(_edge["y0"])),
- (float(_edge["x1"]), float(_edge["y1"]))))
- log("pdf page %s has %s lines" % (str(page_no), str(len(lt_line_list))))
- return lt_line_list
- def get_page_lines(self, layout, page_no):
- def _plot(_line_list, mode=1):
- for _line in _line_list:
- if mode == 1:
- x0, y0, x1, y1 = _line.__dict__.get("bbox")
- elif mode == 2:
- x0, y0, x1, y1 = _line
- plt.plot([x0, x1], [y0, y1])
- plt.show()
- return
- def is_cross(A, B, C, D):
- if A[0] == B[0] == C[0] == D[0]:
- if A[1] <= C[1] <= B[1] or A[1] <= D[1] <= B[1] \
- or C[1] <= A[1] <= D[1] or C[1] <= B[1] <= D[1]:
- return True
- if A[1] == B[1] == C[1] == D[1]:
- if A[0] <= C[0] <= B[0] or A[0] <= D[0] <= B[0] \
- or C[0] <= A[0] <= D[0] or C[0] <= B[0] <= D[0]:
- return True
- line1 = LineString([A, B])
- line2 = LineString([C, D])
- int_pt = line1.intersection(line2)
- try:
- point_of_intersection = int_pt.x, int_pt.y
- return True
- except:
- return False
- def calculate_k(bbox):
- x = [bbox[0], bbox[2]]
- y = [bbox[1], bbox[3]]
- slope, intercept, r_value, p_value, std_err = linregress(x, y)
- # print('k', slope)
- if math.isnan(slope):
- slope = 0
- return slope
- def line_iou(line1, line2, axis=0):
- inter = min(line1[1][axis], line2[1][axis]) - max(line1[0][axis], line2[0][axis])
- # union = max(line1[1][axis], line2[1][axis]) - min(line1[0][axis], line2[0][axis])
- union = min(abs(line1[0][axis] - line1[1][axis]), abs(line2[0][axis] - line2[1][axis]))
- if union in [0, 0.]:
- iou = 0.
- else:
- iou = inter / union
- return iou
- def get_cross_line(_line_list, threshold=1, cross_times=0):
- # 根据是否有交点判断表格线
- _cross_line_list = []
- for line1 in _line_list:
- if line1 in _cross_line_list:
- continue
- if abs(line1[2] - line1[0]) > abs(line1[3] - line1[1]):
- p1 = [max(0, line1[0] - threshold), line1[1]]
- p2 = [min(line1[2] + threshold, page_w), line1[3]]
- else:
- p1 = [line1[0], max(0, line1[1] - threshold)]
- p2 = [line1[2], min(line1[3] + threshold, page_h)]
- line1 = [p1[0], p1[1], p2[0], p2[1]]
- _times = 0
- for line2 in _line_list:
- if abs(line2[2] - line2[0]) > abs(line2[3] - line2[1]):
- p3 = [max(0, line2[0] - threshold), line2[1]]
- p4 = [min(line2[2] + threshold, page_w), line2[3]]
- else:
- p3 = [line2[0], max(0, line2[1] - threshold)]
- p4 = [line2[2], min(line2[3] + threshold, page_h)]
- line2 = [p3[0], p3[1], p4[0], p4[1]]
- if line1 == line2:
- continue
- if is_cross(p1, p2, p3, p4):
- _times += 1
- if _times >= cross_times:
- _cross_line_list += [line1]
- break
- return _cross_line_list
- def repair_bias_line(_line_list):
- temp_list = []
- for line in _line_list:
- x0, y0, x1, y1 = line
- _y = min(y0, y1)
- _x = min(x0, x1)
- if abs(x0 - x1) > abs(y0 - y1):
- temp_list.append([x0, _y, x1, _y])
- else:
- temp_list.append([_x, y0, _x, y1])
- _line_list = temp_list
- return _line_list
- def repair_col_line(_straight_list, _bias_list, threshold=2, min_width=7):
- if not _straight_list or not _bias_list:
- print('add_col_bias_line empty', len(_straight_list), len(_bias_list))
- return []
- # 分列
- _straight_list.sort(key=lambda x: (x[0], x[1]))
- cols = []
- col = []
- current_w = _straight_list[0][0]
- for line in _straight_list:
- if abs(line[0] - line[2]) > abs(line[1] - line[3]):
- continue
- if min(line[0], line[2]) - threshold <= current_w <= max(line[0], line[2]) + threshold:
- col.append(line)
- else:
- if col:
- cols.append(col)
- col = [line]
- current_w = line[0]
- if col:
- cols.append(col)
- # 补充col
- new_list = []
- for line in bias_line_list:
- if abs(line[0] - line[2]) > abs(line[1] - line[3]):
- continue
- for col in cols:
- w = col[0][0]
- if w - threshold <= line[0] <= w + threshold or w - threshold <= line[2] <= w + threshold:
- new_list.append([w, line[1] - 3, w, line[3] + 3])
- new_list += _straight_list
- # 去重
- new_list = [str(x) for x in new_list]
- new_list = list(set(new_list))
- new_list = [eval(x) for x in new_list]
- # 分列
- new_list.sort(key=lambda x: (x[0], x[1]))
- cols = []
- col = []
- current_w = new_list[0][0]
- for line in new_list:
- if abs(line[0] - line[2]) > abs(line[1] - line[3]):
- continue
- if min(line[0], line[2]) - threshold <= current_w <= max(line[0], line[2]) + threshold:
- col.append(line)
- else:
- if col:
- cols.append(col)
- col = [line]
- current_w = line[0]
- if col:
- cols.append(col)
- # 删除col
- for col1 in cols:
- for col2 in cols:
- if col1 == col2 or abs(col1[0][0] - col2[0][0]) > min_width:
- continue
- col1_len, col2_len = 0, 0
- for c in col1:
- col1_len += abs(c[1] - c[3])
- for c in col2:
- col2_len += abs(c[1] - c[3])
- if col1_len > col2_len * 3:
- for c in col2:
- if c in new_list:
- new_list.remove(c)
- if col2_len > col1_len * 3:
- for c in col1:
- if c in new_list:
- new_list.remove(c)
- return new_list
- def merge_line(_line_list, threshold=2):
- new_line_list = []
- # 分列
- _line_list.sort(key=lambda x: (x[0], x[1]))
- cols = []
- col = [_line_list[0]]
- current_w = _line_list[0][0]
- for line in _line_list:
- if abs(line[0] - line[2]) > abs(line[1] - line[3]):
- continue
- if min(line[0], line[2]) - threshold <= current_w <= max(line[0], line[2]) + threshold \
- and is_cross(line[0:2], line[2:4], col[-1][0:2], col[-1][2:4]):
- col.append(line)
- else:
- if col:
- cols.append(col)
- col = [line]
- current_w = line[0]
- if col:
- cols.append(col)
- for col in cols:
- temp_c = col[0]
- col_w = col[0][0]
- for i in range(len(col) - 1):
- c = col[i]
- next_c = col[i + 1]
- if is_cross(c[0:2], c[2:4], next_c[0:2], next_c[2:4]):
- temp_c = [col_w, min(temp_c[1], c[1], c[3], next_c[1], next_c[3]), col_w,
- max(temp_c[3], c[1], c[3], next_c[1], next_c[3])]
- else:
- new_line_list.append(temp_c)
- temp_c = next_c
- if not new_line_list or (new_line_list and new_line_list[-1] != temp_c):
- new_line_list.append(temp_c)
- # 分行
- _line_list.sort(key=lambda x: (x[1], x[0]))
- rows = []
- row = []
- current_h = _line_list[0][1]
- for line in _line_list:
- if abs(line[0] - line[2]) < abs(line[1] - line[3]):
- continue
- if min(line[1], line[3]) - threshold <= current_h <= max(line[1], line[3]) + threshold:
- row.append(line)
- else:
- if row:
- rows.append(row)
- row = [line]
- current_h = line[1]
- if row:
- rows.append(row)
- for row in rows:
- temp_r = row[0]
- row_h = row[0][1]
- for i in range(len(row) - 1):
- r = row[i]
- next_r = row[i + 1]
- # if is_cross(r[0:2], r[2:4], next_r[0:2], next_r[2:4]):
- if line_iou([r[0:2], r[2:4]], [next_r[0:2], next_r[2:4]], axis=0):
- temp_r = [min(temp_r[0], r[0], r[2], next_r[0], next_r[2]), row_h,
- max(temp_r[2], r[0], r[2], next_r[0], next_r[2]), row_h]
- else:
- new_line_list.append(temp_r)
- temp_r = next_r
- if not new_line_list or (new_line_list and new_line_list[-1] != temp_r):
- new_line_list.append(temp_r)
- return new_line_list
- def remove_outline_no_cross(_line_list):
- row_list = []
- col_list = []
- for line in _line_list:
- # 存所有行
- if abs(line[0] - line[2]) > abs(line[1] - line[3]):
- row_list.append(line)
- # 存所有列
- if abs(line[0] - line[2]) < abs(line[1] - line[3]):
- col_list.append(line)
- if not col_list:
- return _line_list
- # 左右两条边框
- col_list.sort(key=lambda x: (x[0], x[1]))
- left_col = col_list[0]
- right_col = col_list[-1]
- # 判断有交点但中间区域无交点
- compare_list = []
- for col in [left_col, right_col]:
- add_h = abs(col[1]-col[3]) / 8
- center_area = [col[1]+add_h, col[3]-add_h]
- cross_cnt = 0
- center_cross_cnt = 0
- center_row_cnt = 0
- for row in row_list:
- if is_cross(row[0:2], row[2:4], col[0:2], col[2:4]):
- if center_area[0] <= row[1] <= center_area[1]:
- center_cross_cnt += 1
- else:
- cross_cnt += 1
- else:
- if center_area[0] <= row[1] <= center_area[1]:
- center_row_cnt += 1
- compare_list.append([cross_cnt, center_cross_cnt, center_row_cnt])
- _flag = True
- for c in compare_list:
- if c[0] >= 2 and c[1] == 0 and c[2] >= 2:
- continue
- _flag = False
- print('compare_list', compare_list)
- if _flag and compare_list[0][1] == compare_list[1][1] \
- and compare_list[0][2] == compare_list[1][2]:
- for col in [left_col, right_col]:
- if col in _line_list:
- _line_list.remove(col)
- return _line_list
- log('into get_page_lines')
- page_h = layout.height
- page_w = layout.width
- element_list = []
- line_list = []
- bias_line_list = []
- text_bbox_list = []
- for element in layout:
- if isinstance(element, LTTextContainer):
- text_bbox_list.append(element.bbox)
- # 只取这三种类型的bbox
- if isinstance(element, (LTRect, LTCurve, LTLine)):
- element_list.append(element)
- if element.height > 0.5 and element.width > 0.5:
- # print('element.height, element.width', element.height, element.width)
- k = calculate_k(element.bbox)
- if 1.73 / 3 < abs(k) < 1.73:
- continue
- else:
- bias_line_list.append(element.bbox)
- continue
- line_list.append(element.bbox)
- if not line_list and not bias_line_list:
- return []
- # 是否使用斜线来生成表格
- if len(line_list) < 6 and len(bias_line_list) > len(line_list) * 2:
- # print('use bias line')
- # bias_line_list += add_col_bias_line(line_list, bias_line_list)
- line_list = bias_line_list
- # 去重
- line_list = [str(x) for x in line_list]
- line_list = list(set(line_list))
- line_list = [eval(x) for x in line_list]
- # 根据是否有交点判断表格线
- cross_line_list = get_cross_line(line_list, threshold=2, cross_times=1)
- if not cross_line_list:
- return []
- # 斜线校正
- if cross_line_list:
- cross_line_list = repair_bias_line(cross_line_list)
- # 修复竖线
- if bias_line_list:
- cross_line_list = repair_col_line(cross_line_list, bias_line_list)
- # 根据是否有交点判断表格线
- cross_line_list = get_cross_line(cross_line_list, threshold=1, cross_times=1)
- # 合并线条
- if not cross_line_list:
- return []
- cross_line_list = merge_line(cross_line_list)
- # 删除最外层嵌套边框
- cross_line_list = remove_outline_no_cross(cross_line_list)
- # show
- # print('len(cross_line_list)', len(cross_line_list))
- # _plot(line_list, mode=2)
- # _plot(cross_line_list, mode=2)
- lt_line_list = []
- for line in cross_line_list:
- lt_line_list.append(LTLine(1, (float(line[0]), float(line[1])),
- (float(line[2]), float(line[3]))))
- log("pdf page %s has %s lines" % (str(page_no), str(len(lt_line_list))))
- return lt_line_list
- def recognize_text(self, layout, page_no, lt_text_list, lt_line_list):
- list_tables, filter_objs, _, connect_textbox_list = self.lt.recognize_table(lt_text_list, lt_line_list)
- self._page.in_table_objs = filter_objs
- # print("=======text_len:%d:filter_len:%d"%(len(lt_text_list),len(filter_objs)))
- for table in list_tables:
- _table = _Table(table["table"], table["bbox"])
- # self._page.children.append(_table)
- self._page.add_child(_table)
- list_sentences = ParseUtils.recognize_sentences(lt_text_list, filter_objs,
- layout.bbox, page_no)
- for sentence in list_sentences:
- _sen = _Sentence(sentence.text, sentence.bbox)
- self._page.add_child(_sen)
- # pdf对象需反向排序
- self._page.is_reverse = True
- def is_text_legal(self, lt_text_list, page_no):
- # 无法识别pdf字符编码,整页用ocr
- text_temp = ""
- for _t in lt_text_list:
- text_temp += _t.get_text()
- if re.search('[(]cid:[0-9]+[)]', text_temp):
- log("text has cid! try pymupdf...")
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- self._page.add_child(_image)
- return False
- match1 = re.findall(get_garble_code(), text_temp)
- # match2 = re.search('[\u4e00-\u9fa5]', text_temp)
- if len(match1) > 3 and len(text_temp) > 10:
- log("pdf garbled code! try pymupdf... " + text_temp[:20])
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- self._page.add_child(_image)
- return False
- return True
- def judge_b_table(self, lt_text_list):
- # 先分行
- lt_text_list.sort(key=lambda x: (x.bbox[1], x.bbox[0]))
- lt_text_row_list = []
- current_h = lt_text_list[0].bbox[1]
- row = []
- threshold = 2
- for lt_text in lt_text_list:
- bbox = lt_text.bbox
- if current_h - threshold <= bbox[1] <= current_h + threshold:
- row.append(lt_text)
- else:
- if row:
- lt_text_row_list.append(row)
- row = [lt_text]
- current_h = lt_text.bbox[1]
- if row:
- lt_text_row_list.append(row)
- # print('lt_text_row_list')
- # for r in lt_text_row_list:
- # print('r', [x.get_text() for x in r])
- # 判断文本中间是否是空格,或一行文本中间有多个
- is_b_table_flag = False
- is_b_table_cnt = 3
- tolerate_cnt = 2
- t_cnt = 0
- row_cnt = 0
- for row in lt_text_row_list:
- # 水印行跳过
- if len(row) == 1 and len(row[0].get_text()[:-1]) == 1:
- continue
- if len(row) == 1:
- text = row[0].get_text()
- bbox = row[0].bbox
- match = re.search('[ ]{3,}', text)
- if match and re.search('[\u4e00-\u9fff]{2,}', text[:match.span()[0]]) \
- and re.search('[\u4e00-\u9fff]{2,}', text[match.span()[1]:]):
- row_cnt += 1
- t_cnt = 0
- else:
- # 容忍
- if t_cnt < tolerate_cnt:
- t_cnt += 1
- continue
- row_cnt = 0
- else:
- row_cnt += 1
- t_cnt = 0
- if row_cnt >= is_b_table_cnt:
- is_b_table_flag = True
- break
- log('pdf is_b_table_flag ' + str(is_b_table_flag))
- return is_b_table_flag
- def convert_page(self, page, page_no):
- layout = self.get_layout(page, page_no)
- if self._doc.error_code is not None:
- return
- if judge_error_code(layout):
- self._page.error_code = layout
- return
- # 判断该页的对象类型,并存储
- lt_text_list = []
- lt_image_list = []
- for x in layout:
- if isinstance(x, (LTTextBoxHorizontal, LTTextBoxVertical)):
- lt_text_list.append(x)
- if isinstance(x, LTFigure):
- for y in x:
- if isinstance(y, LTImage):
- # 小的图忽略
- if y.width <= 300 and y.height <= 300:
- continue
- # 图的width超过layout width,很大可能是水印
- if y.width > layout.width + 20:
- continue
- lt_image_list.append(y)
- lt_text_list = self.delete_water_mark(lt_text_list, layout.bbox, 15)
- log("convert_pdf page " + str(page_no))
- log("len(lt_image_list), len(lt_text_list) " + str(len(lt_image_list)) + " " + str(len(lt_text_list)))
- log('layout.width, layout.height' + str(layout.width) + str(layout.height))
- # 若只有文本且图片数为0,直接提取文字及表格
- # if only_image == 0 and image_count == 0:
- # if len(lt_image_list) == 0 and len(lt_text_list) > 0:
- # # PDFPlumber
- # if self.has_init_pdf[3] == 0:
- # self.init_package("pdfplumber")
- # if self._doc.error_code is not None:
- # self._doc.error_code = None
- # log("init pdfplumber failed! try pymupdf...")
- # # 调用pdfplumber获取pdf图片报错,则使用pypdf2将pdf转html
- # page_image = self.get_page_image(page_no)
- # if judge_error_code(page_image):
- # self._page.error_code = page_image
- # else:
- # _image = _Image(page_image[1], page_image[0])
- # self._page.add_child(_image)
- # return
- #
- # if not self.is_text_legal(lt_text_list, page_no):
- # return
- #
- # # 根据text规律,判断该页是否可能有无边框表格
- # start_time = time.time()
- # if self.judge_b_table(lt_text_list):
- # page_image = self.get_page_image(page_no)
- # if judge_error_code(page_image):
- # self._page.error_code = page_image
- # else:
- # _image = _Image(page_image[1], page_image[0])
- # _image.is_from_pdf = True
- # _image.b_table_from_text = True
- # _image.b_table_text_obj_list = lt_text_list
- # _image.b_table_layout_size = (layout.width, layout.height)
- # self._page.add_child(_image)
- # log('convert_pdf judge_b_table set image cost: ' + str(time.time()-start_time))
- #
- # try:
- # lt_line_list = self.get_page_lines(layout, page_no)
- # except:
- # traceback.print_exc()
- # lt_line_list = []
- # self._page.error_code = [-13]
- # try:
- # # lt_line_list = self.get_text_lines(page,page_no)
- # self.recognize_text(layout, page_no, lt_text_list, lt_line_list)
- # except:
- # traceback.print_exc()
- # self._page.error_code = [-8]
- # 若该页图片数量过多,或无文本,则直接ocr整页识别
- # elif image_count > 3 or only_image == 1:
- if len(lt_image_list) > 3 or len(lt_text_list) == 0:
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- _image.is_from_pdf = True
- self._page.add_child(_image)
- # 正常读取该页对象
- else:
- # 图表对象
- for image in lt_image_list:
- try:
- print("pdf2text LTImage size", page_no, image.width, image.height)
- image_stream = image.stream.get_data()
- # 小的图忽略
- if image.width <= 300 and image.height <= 300:
- continue
- # 查看提取的图片高宽,太大则用pdf输出图进行ocr识别
- img_test = Image.open(io.BytesIO(image_stream))
- if image.height >= 1000 and image.width >= 1000:
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- _image.is_from_pdf = True
- self._page.add_child(_image)
- return
- # 比较小的图则直接保存用ocr识别
- else:
- temp_path = self.unique_type_dir + 'page' + str(page_no) \
- + '_lt' + str(lt_image_list.index(image)) + '.jpg'
- img_test.save(temp_path)
- with open(temp_path, "rb") as ff:
- image_stream = ff.read()
- _image = _Image(image_stream, temp_path, image.bbox)
- self._page.add_child(_image)
- except Exception:
- log("pdf2text pdfminer read image in page " + str(page_no) +
- " fail! use pymupdf read image...")
- traceback.print_exc()
- # pdf对象需反向排序
- self._page.is_reverse = True
- self.init_package("pdfplumber")
- if not self.is_text_legal(lt_text_list, page_no):
- return
- # 根据text规律,判断该页是否可能有无边框表格
- if self.judge_b_table(lt_text_list):
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- _image.is_from_pdf = True
- _image.b_table_from_text = True
- _image.b_table_text_obj_list = lt_text_list
- _image.b_table_layout_size = (layout.width, layout.height)
- self._page.add_child(_image)
- try:
- lt_line_list = self.get_page_lines(layout, page_no)
- except:
- traceback.print_exc()
- lt_line_list = []
- self._page.error_code = [-13]
- self.recognize_text(layout, page_no, lt_text_list, lt_line_list)
- def get_layout(self, page, page_no):
- log("get_layout")
- if self.has_init_pdf[0] == 0:
- self.init_package("pdfminer")
- if self._doc.error_code is not None:
- return
- # 获取该页layout
- start_time = time.time()
- try:
- if get_platform() == "Windows":
- # origin_pdf_analyze = pdf_analyze.__wrapped__
- # layout = origin_pdf_analyze(self.interpreter, page, self.device)
- layout = pdf_analyze(self.interpreter, page, self.device, page_no)
- else:
- layout = pdf_analyze(self.interpreter, page, self.device, page_no)
- except TimeoutError as e:
- log("pdf2text pdfminer read pdf page " + str(page_no) + " time out! " + str(time.time() - start_time))
- layout = [-4]
- except Exception:
- traceback.print_exc()
- log("pdf2text pdfminer read pdf page " + str(page_no) + " error! continue...")
- layout = [-3]
- return layout
- def get_page_image(self, page_no):
- log("")
- try:
- if self.has_init_pdf[1] == 0:
- self.init_package("PyMuPDF")
- if self._doc.error_code is not None:
- return
- # save_dir = self.path.split(".")[-2] + "_" + self.path.split(".")[-1]
- output = self.unique_type_dir + "page" + str(page_no) + ".png"
- page = self.doc_pymupdf.loadPage(page_no)
- rotate = int(0)
- zoom_x = 2.
- zoom_y = 2.
- mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate)
- pix = page.getPixmap(matrix=mat, alpha=False)
- pix.writePNG(output)
- # 输出图片resize
- self.resize_image(output)
- with open(output, "rb") as f:
- pdf_image = f.read()
- return [output, pdf_image]
- except ValueError as e:
- traceback.print_exc()
- if str(e) == "page not in document":
- log("pdf2Image page not in document! continue... page " + str(page_no))
- return [0]
- elif "encrypted" in str(e):
- log("pdf2Image document need password " + str(page_no))
- return [-7]
- except RuntimeError as e:
- if "cannot find page" in str(e):
- log("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e))
- return [0]
- else:
- traceback.print_exc()
- return [-3]
- def get_all_page_image(self):
- log("")
- if self.has_init_pdf[1] == 0:
- self.init_package("PyMuPDF")
- if self._doc.error_code is not None:
- return
- page_count = self.doc_pymupdf.page_count
- for page_no in range(page_count):
- # 限制pdf页数,只取前10页后10页
- if page_count > 20:
- if 10 <= page_no < page_count - 10:
- continue
- self._page = _Page(None, page_no)
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- self._page.add_child(_image)
- # 报错继续读后面页面
- if self._doc.error_code is None and self._page.error_code is not None:
- continue
- self._doc.add_child(self._page)
- def connect_table(self, html_list):
- if not html_list:
- return html_list
- # 判断条件1:最后一个表格后有无非页码文本/第一个表格前有无文本
- connect_flag_list = []
- soup_list = []
- for i, h in enumerate(html_list):
- soup_list.append(BeautifulSoup(h, 'lxml'))
- # 找最后一个表格
- table_start1, table_end1 = None, None
- # print('h', h)
- match = re.finditer('<table', h)
- for m in match:
- table_start1 = m.span()[0]
- if table_start1 is not None:
- match = re.finditer('</table>', h[table_start1:])
- for m in match:
- table_end1 = m.span()[1] + table_start1
- # 最后一个表格后有无除了页码外的内容
- connect_flag1 = False
- if table_end1 is not None:
- match = re.search('[^-/第页0-9]*', re.sub('<div>|</div>', '', h[table_end1:]))
- # print('match1', match.group())
- if not match or match.group() == '':
- connect_flag1 = True
- # 找第一个表格
- table_start2, table_end2 = None, None
- match = re.finditer('<table', h)
- for m in match:
- table_start2 = m.span()[0]
- break
- # 第一个表格后有无内容
- connect_flag2 = False
- if table_start2 is not None and table_start2 == 0:
- connect_flag2 = True
- connect_flag_list.append([i, connect_flag2, connect_flag1])
- # print('connect_flag_list', connect_flag_list)
- # 根据条件1合并需连接页码,形成组
- connect_pages_list = []
- temp_list = []
- for i, c in enumerate(connect_flag_list):
- if temp_list and c[1]:
- temp_list.append(c)
- elif not temp_list and c[2]:
- temp_list.append(c)
- else:
- if temp_list:
- connect_pages_list.append(temp_list)
- temp_list = []
- connect_pages_list.append([c])
- if temp_list:
- connect_pages_list.append(temp_list)
- # print('connect_pages_list', connect_pages_list)
- # 判断条件2:判断组内列数是否相同
- connect_pages_list2 = []
- for c_list in connect_pages_list:
- if len(c_list) == 1:
- connect_pages_list2.append(c_list)
- else:
- col_cnt_list = []
- for c in c_list:
- soup = soup_list[c[0]]
- table1 = soup.findAll('table')[-1]
- table2 = soup.findAll('table')[0]
- tr1 = table1.findAll('tr')
- tr2 = table2.findAll('tr')
- td1 = tr1[-1].findAll('td')
- td2 = tr2[0].findAll('td')
- col_cnt_list.append([len(td2), len(td1)])
- new_c_list = [c_list[0]]
- # print('col_cnt_list', col_cnt_list)
- for i in range(len(col_cnt_list) - 1):
- if col_cnt_list[i][1] != col_cnt_list[i + 1][0]:
- connect_pages_list2.append(new_c_list)
- new_c_list = [c_list[i + 1]]
- else:
- new_c_list.append(c_list[i + 1])
- if new_c_list:
- connect_pages_list2.append(new_c_list)
- # print('connect_pages_list2', connect_pages_list2)
- # 符合连接条件的拼接表格
- new_html_list = []
- for c_list in connect_pages_list2:
- if len(c_list) == 1:
- new_html_list.append(html_list[c_list[0][0]])
- continue
- new_html = ''
- for c in c_list:
- new_html += html_list[c[0]]
- new_html = re.sub('</table>([-/第页0-9]|<div>|</div>)*<table border="1">', '<tr><td>#@#@#</td></tr>',
- new_html)
- soup = BeautifulSoup(new_html, 'lxml')
- trs = soup.findAll('tr')
- for i in range(len(trs)):
- if trs[i].get_text() == '#@#@#':
- td1 = trs[i - 1].findAll('td')
- td2 = trs[i + 1].findAll('td')
- if td2[0].get_text() == '':
- for j in range(len(td1)):
- td1[j].string = td1[j].get_text() + td2[j].get_text()
- trs[i + 1].decompose()
- trs[i].decompose()
- new_html = str(soup)
- new_html_list.append(new_html)
- html_str = ''
- for h in new_html_list:
- html_str += h
- return [html_str]
- def get_html(self):
- if self._doc.error_code is not None:
- return self._doc.error_code
- self.convert()
- if self._doc.error_code is not None:
- return self._doc.error_code
- html = self._doc.get_html(return_list=True)
- # 表格连接
- try:
- html = self.connect_table(html)
- except:
- traceback.print_exc()
- return [-12]
- return html
- def delete_water_mark(self, lt_text_list, page_bbox, times=5):
- # 删除过多重复字句,为水印
- duplicate_dict = {}
- for _obj in lt_text_list:
- t = _obj.get_text()
- if t in duplicate_dict.keys():
- duplicate_dict[t][0] += 1
- duplicate_dict[t][1].append(_obj)
- else:
- duplicate_dict[t] = [1, [_obj]]
- delete_text = []
- for t in duplicate_dict.keys():
- if duplicate_dict[t][0] >= times:
- obj_list = duplicate_dict[t][1]
- obj_list.sort(key=lambda x: x.bbox[3])
- obj_distance_h = abs(obj_list[-1].bbox[3] - obj_list[0].bbox[1])
- obj_list.sort(key=lambda x: x.bbox[2])
- obj_distance_w = abs(obj_list[-1].bbox[2] - obj_list[0].bbox[0])
- if obj_distance_h >= abs(page_bbox[1] - page_bbox[3]) * 0.7 \
- and obj_distance_w >= abs(page_bbox[0] - page_bbox[2]) * 0.7:
- delete_text.append(t)
- temp_text_list = []
- for _obj in lt_text_list:
- t = _obj.get_text()
- if t not in delete_text:
- temp_text_list.append(_obj)
- return temp_text_list
- def resize_image(self, img_path, max_size=2000):
- _img = cv2.imread(img_path)
- if _img.shape[0] <= max_size or _img.shape[1] <= max_size:
- return
- else:
- resize_axis = 0 if _img.shape[0] >= _img.shape[1] else 1
- ratio = max_size / _img.shape[resize_axis]
- new_shape = [0, 0]
- new_shape[resize_axis] = max_size
- new_shape[1 - resize_axis] = int(_img.shape[1 - resize_axis] * ratio)
- _img = cv2.resize(_img, (new_shape[1], new_shape[0]))
- cv2.imwrite(img_path, _img)
- def get_single_pdf(self, path, page_no):
- log("into get_single_pdf")
- try:
- pdf_origin = copy.deepcopy(self.doc_pypdf2)
- pdf_new = copy.deepcopy(self.doc_pypdf2_new)
- pdf_new.addPage(pdf_origin.getPage(page_no))
- path_new = path.split(".")[0] + "_split.pdf"
- with open(path_new, "wb") as ff:
- pdf_new.write(ff)
- return path_new
- except PyPDF2.utils.PdfReadError as e:
- return [-3]
- except Exception as e:
- log("get_single_pdf error! page " + str(page_no))
- return [-3]
- def get_text_font():
- def flags_decomposer(flags):
- """Make font flags human readable."""
- l = []
- if flags & 2 ** 0:
- l.append("superscript")
- if flags & 2 ** 1:
- l.append("italic")
- if flags & 2 ** 2:
- l.append("serifed")
- else:
- l.append("sans")
- if flags & 2 ** 3:
- l.append("monospaced")
- else:
- l.append("proportional")
- if flags & 2 ** 4:
- l.append("bold")
- return ", ".join(l)
- def get_underlined_textLines(page):
- """
- 获取某页pdf上的所有下划线文本信息
- :param page: fitz中的一页
- :return: list of tuples,每个tuple都是一个完整的下划线覆盖的整体:[(下划线句, 所在blk_no, 所在line_no), ...]
- """
- paths = page.get_drawings() # get drawings on the current page
- # 获取该页内所有的height很小的bbox。因为下划线其实大多是这种矩形
- # subselect things we may regard as lines
- lines = []
- for p in paths:
- for item in p["items"]:
- if item[0] == "l": # an actual line
- p1, p2 = item[1:]
- if p1.y == p2.y:
- lines.append((p1, p2))
- elif item[0] == "re": # a rectangle: check if height is small
- r = item[1]
- if r.width > r.height and r.height <= 2:
- lines.append((r.tl, r.tr)) # take top left / right points
- # 获取该页的`max_lineheight`,用于下面比较距离使用
- blocks = page.get_text("dict", flags=11)["blocks"]
- max_lineheight = 0
- for b in blocks:
- for l in b["lines"]:
- bbox = fitz.Rect(l["bbox"])
- if bbox.height > max_lineheight:
- max_lineheight = bbox.height
- underlined_res = []
- # 开始对下划线内容进行查询
- # make a list of words
- words = page.get_text("words")
- # if underlined, the bottom left / right of a word
- # should not be too far away from left / right end of some line:
- for wdx, w in enumerate(words): # w[4] is the actual word string
- r = fitz.Rect(w[:4]) # first 4 items are the word bbox
- for p1, p2 in lines: # check distances for start / end points
- if abs(r.bl - p1) <= max_lineheight: # 当前word的左下满足下划线左下
- if abs(r.br - p2) <= max_lineheight: # 当前word的右下满足下划线右下(单个词,无空格)
- print(f"Word '{w[4]}' is underlined! Its block-line number is {w[-3], w[-2]}")
- underlined_res.append((w[4], w[-3], w[-2])) # 分别是(下划线词,所在blk_no,所在line_no)
- break # don't check more lines
- else: # 继续寻找同line右侧的有缘人,因为有些下划线覆盖的词包含多个词,多个词之间有空格
- curr_line_num = w[-2] # line nunmber
- for right_wdx in range(wdx + 1, len(words), 1):
- _next_w = words[right_wdx]
- if _next_w[-2] != curr_line_num: # 当前遍历到的右侧word已经不是当前行的了(跨行是不行的)
- break
- _r_right = fitz.Rect(_next_w[:4]) # 获取当前同行右侧某word的方框4点
- if abs(_r_right.br - p2) <= max_lineheight: # 用此word右下点和p2(目标下划线右上点)算距离,距离要小于max_lineheight
- print(
- f"Word '{' '.join([_one_word[4] for _one_word in words[wdx:right_wdx + 1]])}' is underlined! " +
- f"Its block-line number is {w[-3], w[-2]}")
- underlined_res.append(
- (' '.join([_one_word[4] for _one_word in words[wdx:right_wdx + 1]]),
- w[-3], w[-2])
- ) # 分别是(下划线词,所在blk_no,所在line_no)
- break # don't check more lines
- return underlined_res
- _p = r'C:\Users\Administrator\Desktop\test_pdf\error2-2.pdf'
- doc_pymupdf = read_pymupdf(_p)
- page = doc_pymupdf[0]
- blocks = page.get_text("dict", flags=11)["blocks"]
- for b in blocks: # iterate through the text blocks
- for l in b["lines"]: # iterate through the text lines
- for s in l["spans"]: # iterate through the text spans
- print("")
- font_properties = "Font: '%s' (%s), size %g, color #%06x" % (
- s["font"], # font name
- flags_decomposer(s["flags"]), # readable font flags
- s["size"], # font size
- s["color"], # font color
- )
- print(s)
- print("Text: '%s'" % s["text"]) # simple print of text
- print(font_properties)
- get_underlined_textLines(page)
- # 以下为现成pdf单页解析接口
- class ParseSentence:
- def __init__(self, bbox, fontname, fontsize, _text, _title, title_text, _pattern, title_degree, is_outline,
- outline_location, page_no):
- (x0, y0, x1, y1) = bbox
- self.x0 = x0
- self.y0 = y0
- self.x1 = x1
- self.y1 = y1
- self.bbox = bbox
- self.fontname = fontname
- self.fontsize = fontsize
- self.text = _text
- self.title = _title
- self.title_text = title_text
- self.groups = _pattern
- self.title_degree = title_degree
- self.is_outline = is_outline
- self.outline_location = outline_location
- self.page_no = page_no
- def __repr__(self):
- return "%s,%s,%s,%d,%s" % (self.text, self.title, self.is_outline, self.outline_location, str(self.bbox))
- class ParseUtils:
- @staticmethod
- def getFontinfo(_page):
- for _obj in _page._objs:
- if isinstance(_obj, (LTTextBoxHorizontal, LTTextBoxVertical)):
- for textline in _obj._objs:
- done = False
- for lchar in textline._objs:
- if isinstance(lchar, (LTChar)):
- _obj.fontname = lchar.fontname
- _obj.fontsize = lchar.size
- done = True
- break
- if done:
- break
- @staticmethod
- def recognize_sentences(list_textbox, filter_objs, page_bbox, page_no,
- remove_space=True, sourceP_LB=True):
- list_textbox.sort(key=lambda x: x.bbox[0])
- list_textbox.sort(key=lambda x: x.bbox[3], reverse=sourceP_LB)
- cluster_textbox = []
- for _textbox in list_textbox:
- if _textbox in filter_objs:
- continue
- _find = False
- for _ct in cluster_textbox:
- if abs(_ct["y"] - _textbox.bbox[1]) < 5:
- _find = True
- _ct["textbox"].append(_textbox)
- if not _find:
- cluster_textbox.append({"y": _textbox.bbox[1], "textbox": [_textbox]})
- cluster_textbox.sort(key=lambda x: x["y"], reverse=sourceP_LB)
- list_sentences = []
- for _line in cluster_textbox:
- _textboxs = _line["textbox"]
- _textboxs.sort(key=lambda x: x.bbox[0])
- _linetext = _textboxs[0].get_text()
- for _i in range(1, len(_textboxs)):
- if abs(_textboxs[_i].bbox[0] - _textboxs[_i - 1].bbox[2]) > 60:
- if _linetext[-1] not in (",", ",", "。", ".", "、", ";"):
- _linetext += "=,="
- _linetext += _textboxs[_i].get_text()
- _linetext = re.sub("[\s\r\n]", "", _linetext)
- _bbox = (_textboxs[0].bbox[0], _textboxs[0].bbox[1],
- _textboxs[-1].bbox[2], _textboxs[-1].bbox[3])
- _title = None
- _pattern_groups = None
- title_text = ""
- if not _title:
- _groups = ParseUtils.find_title_by_pattern(_textboxs[0].get_text())
- if _groups:
- _title = _groups[0][0]
- title_text = _groups[0][1]
- _pattern_groups = _groups
- if not _title:
- _groups = ParseUtils.find_title_by_pattern(_linetext)
- if _groups:
- _title = _groups[0][0]
- title_text = _groups[0][1]
- _pattern_groups = _groups
- if not _title:
- _title = ParseUtils.rec_incenter(_bbox, page_bbox)
- title_degree = 2
- if not _title:
- _linetext = _linetext.replace("=,=", ",")
- else:
- _linetext = _linetext.replace("=,=", "")
- title_degree = int(_title.split("_")[1])
- # 页码
- if ParseUtils.rec_incenter(_bbox, page_bbox) and re.search("^\d+$", _linetext) is not None:
- continue
- if _linetext == "" or re.search("^,+$", _linetext) is not None:
- continue
- is_outline = False
- outline_location = -1
- _search = re.search("(?P<text>.+?)\.{5,}(?P<nums>\d+)$", _linetext)
- if _search is not None:
- is_outline = True
- _linetext = _search.group("text")
- outline_location = int(_search.group("nums"))
- list_sentences.append(
- ParseSentence(_bbox, _textboxs[-1].__dict__.get("fontname"), _textboxs[-1].__dict__.get("fontsize"),
- _linetext, _title, title_text, _pattern_groups, title_degree, is_outline,
- outline_location, page_no))
- # for _sen in list_sentences:
- # print(_sen.__dict__)
- return list_sentences
- @staticmethod
- def find_title_by_pattern(_text,
- _pattern="(?P<title_1>(?P<title_1_index_0_0>^第?)(?P<title_1_index_1_1>[一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+)(?P<title_1_index_2_0>[、章]))|" \
- "(?P<title_3>^(?P<title_3_index_0_1>[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+))|" \
- "(?P<title_4>^(?P<title_4_index_0_0>第?)(?P<title_4_index_1_1>[一二三四五六七八九十]+)(?P<title_4_index_2_0>[节]))|" \
- "(?P<title_11>^(?P<title_11_index_0_0>\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-])(?P<title_11_index_1_1>\d{1,2})(?P<title_11_index_2_0>[\..、\s\-]))|" \
- "(?P<title_10>^(?P<title_10_index_0_0>\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-])(?P<title_10_index_1_1>\d{1,2})(?P<title_10_index_2_0>[\..、\s\-]))|" \
- "(?P<title_7>^(?P<title_7_index_0_0>\d{1,2}[\..、\s\-])(?P<title_7_index_1_1>\d{1,2})(?P<title_7_index_2_0>[\..、\s\-]))|" \
- "(?P<title_6>^(?P<title_6_index_0_1>\d{1,2})(?P<title_6_index_1_0>[\..、\s\-]))|" \
- "(?P<title_15>^(?P<title_15_index_0_0>(?)(?P<title_15_index_1_1>\d{1,2})(?P<title_15_index_2_0>)))|" \
- "(?P<title_17>^(?P<title_17_index_0_0>(?)(?P<title_17_index_1_1>[a-zA-Z]+)(?P<title_17_index_2_0>)))|"
- "(?P<title_19>^(?P<title_19_index_0_0>(?)(?P<title_19_index_1_1>[一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+)(?P<title_19_index_2_0>)))|" \
- ):
- _se = re.search(_pattern, _text)
- groups = []
- if _se is not None:
- _gd = _se.groupdict()
- for k, v in _gd.items():
- if v is not None:
- groups.append((k, v))
- if len(groups):
- groups.sort(key=lambda x: x[0])
- return groups
- return None
- @staticmethod
- def rec_incenter(o_bbox, p_bbox):
- p_width = p_bbox[2] - p_bbox[0]
- l_space = (o_bbox[0] - p_bbox[0]) / p_width
- r_space = (p_bbox[2] - o_bbox[2]) / p_width
- if abs((l_space - r_space)) < 0.1 and l_space > 0.2:
- return "title_2"
- @staticmethod
- def is_first_title(_title):
- if _title is None:
- return False
- if re.search("^\d+$", _title) is not None:
- if int(_title) == 1:
- return True
- return False
- if re.search("^[一二三四五六七八九十百]+$", _title) is not None:
- if _title == "一":
- return True
- return False
- if re.search("^[a-z]+$", _title) is not None:
- if _title == "a":
- return True
- return False
- if re.search("^[A-Z]+$", _title) is not None:
- if _title == "A":
- return True
- return False
- if re.search("^[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]$", _title) is not None:
- if _title == "Ⅰ":
- return True
- return False
- return False
- @staticmethod
- def get_next_title(_title):
- if re.search("^\d+$", _title) is not None:
- return str(int(_title) + 1)
- if re.search("^[一二三四五六七八九十百]+$", _title) is not None:
- _next_title = ParseUtils.make_increase(['一', '二', '三', '四', '五', '六', '七', '八', '九', '十'],
- re.sub("[十百]", '', _title))
- _next_title = list(_next_title)
- _next_title.reverse()
- if _next_title[-1] != "十":
- if len(_next_title) >= 2:
- _next_title.insert(-1, '十')
- if len(_next_title) >= 4:
- _next_title.insert(-3, '百')
- if _title[0] == "十":
- if _next_title == "十":
- _next_title = ["二", "十"]
- _next_title.insert(0, "十")
- _next_title = "".join(_next_title)
- return _next_title
- if re.search("^[a-z]+$", _title) is not None:
- _next_title = ParseUtils.make_increase([chr(i + ord('a')) for i in range(26)], _title)
- _next_title = list(_next_title)
- _next_title.reverse()
- return "".join(_next_title)
- if re.search("^[A-Z]+$", _title) is not None:
- _next_title = ParseUtils.make_increase([chr(i + ord('A')) for i in range(26)], _title)
- _next_title = list(_next_title)
- _next_title.reverse()
- return "".join(_next_title)
- if re.search("^[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]$", _title) is not None:
- _sort = ["Ⅰ", "Ⅱ", "Ⅲ", "Ⅳ", "Ⅴ", "Ⅵ", "Ⅶ", "Ⅷ", "Ⅸ", "Ⅹ", "Ⅺ", "Ⅻ"]
- _index = _sort.index(_title)
- if _index < len(_sort) - 1:
- return _sort[_index + 1]
- return None
- @staticmethod
- def make_increase(_sort, _title, _add=1):
- if len(_title) == 0 and _add == 0:
- return ""
- if len(_title) == 0 and _add == 1:
- return _sort[0]
- _index = _sort.index(_title[-1])
- next_index = (_index + _add) % len(_sort)
- next_chr = _sort[next_index]
- if _index == len(_sort) - 1:
- _add = 1
- else:
- _add = 0
- return next_chr + ParseUtils.make_increase(_sort, _title[:-1], _add)
- @staticmethod
- def rec_serial(_text, o_bbox, p_bbox, fontname, _pattern="(?P<title_1>^[一二三四五六七八九十]+[、])|" \
- "(?P<title_2>^\d+[\.、\s])|" \
- "(?P<title_3>^\d+\.\d+[\.、\s])|" \
- "(?P<title_4>^\d+\.\d+\.\d+[\.、\s])|" \
- "(?P<title_5>^\d+\.\d+\.\d+\.\d+[\.、\s])"):
- # todo :recog the serial of the sentence
- _se = re.search(_pattern, _text)
- if _se is not None:
- _gd = _se.groupdict()
- for k, v in _gd.items():
- if v is not None:
- return k
- return None
- if __name__ == '__main__':
- # get_text_font()
- PDFConvert(r"C:/Users/Administrator/Downloads/1651896704621.pdf", "C:/Users/Administrator/Downloads/1").get_html()
- # print(b'\x10')
|