12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303 |
- import shutil
- import zlib
- from glob import glob
- import copy
- import io
- import os
- import re
- import sys
- from bs4 import BeautifulSoup
- sys.path.append(os.path.dirname(__file__) + "/../")
- from pdfplumber import PDF
- from pdfplumber.table import TableFinder
- from pdfplumber.page import Page as pdfPage
- from format_convert.convert_tree import _Document, _Page, _Image, _Sentence, _Table, TextBox
- import time
- from PIL import Image
- import traceback
- import cv2
- import PyPDF2
- from PyPDF2 import PdfFileReader, PdfFileWriter
- from pdfminer.pdfparser import PDFParser
- from pdfminer.pdfdocument import PDFDocument
- from pdfminer.pdfpage import PDFPage
- from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
- from pdfminer.converter import PDFPageAggregator
- from pdfminer.layout import LTTextBoxHorizontal, LAParams, LTFigure, LTImage, LTCurve, LTText, LTChar, LTRect, \
- LTTextBoxVertical, LTLine, LTTextContainer, LTTextLine
- from format_convert.utils import judge_error_code, get_platform, LineTable, log, \
- memory_decorator, get_garble_code, get_md5_from_bytes, bytes2np, bbox_iou, get_garble_code2, \
- get_traditional_chinese, ascii85_decode
- import fitz
- from format_convert.wrapt_timeout_decorator import timeout
- from otr.table_line_pdf import table_line_pdf
- from botr.extract_table import get_b_table_by_blank_colon
- @memory_decorator
- def pdf2text(path, unique_type_dir):
- return
- @timeout(10, timeout_exception=TimeoutError)
- def pdf_analyze(interpreter, page, device, page_no):
- pdf_time = time.time()
- interpreter.process_page(page)
- # print('interpreter.process_page time', time.time()-pdf_time)
- layout = device.get_result()
- log("page_no: " + str(page_no) + " pdf_analyze cost: " + str(time.time() - pdf_time))
- return layout
- @timeout(25, timeout_exception=TimeoutError)
- def read_pdfminer(path, laparams):
- fp = open(path, 'rb')
- parser = PDFParser(fp)
- doc_pdfminer = PDFDocument(parser)
- rsrcmgr = PDFResourceManager()
- device = PDFPageAggregator(rsrcmgr, laparams=laparams)
- interpreter = PDFPageInterpreter(rsrcmgr, device)
- return doc_pdfminer, device, interpreter
- @timeout(15, timeout_exception=TimeoutError)
- def read_pymupdf(path):
- return fitz.open(path)
- @timeout(15, timeout_exception=TimeoutError)
- def read_pypdf2(path):
- doc_pypdf2 = PdfFileReader(path, strict=False)
- doc_pypdf2_new = PdfFileWriter()
- return doc_pypdf2, doc_pypdf2_new
- @timeout(25, timeout_exception=TimeoutError, use_signals=False)
- def read_pdfplumber(path, laparams):
- fp = open(path, 'rb')
- lt = LineTable()
- doc_top = 0
- doc_pdfplumber = PDF(fp, laparams=laparams.__dict__)
- return lt, doc_top, doc_pdfplumber
- class PDFConvert:
- def __init__(self, path, unique_type_dir, need_page_no, page_need_to_image_dict=None):
- self._doc = _Document(path)
- self.path = path
- self.unique_type_dir = unique_type_dir
- if not os.path.exists(self.unique_type_dir):
- os.mkdir(self.unique_type_dir)
- # 指定提取的页码范围
- self.need_page_no = need_page_no
- self.start_page_no = None
- self.end_page_no = None
- # 默认使用limit_page_cnt控制,前10页后10页
- if self.need_page_no is None:
- self.limit_page_cnt = 50
- else:
- # 使用start_page_no,end_page_no范围控制,例如2,5
- ss = self.need_page_no.split(',')
- if len(ss) != 2:
- self._doc.error_code = [-14]
- else:
- self.start_page_no = int(ss[0])
- self.end_page_no = int(ss[-1])
- if self.end_page_no == -1:
- self.end_page_no = 1000000
- self.start_page_no -= 1
- self.end_page_no -= 1
- if self.end_page_no <= self.start_page_no or self.start_page_no < 0 or self.end_page_no < -1:
- self._doc.error_code = [-14]
- self.packages = ["pdfminer", "PyMuPDF", "PyPDF2", "pdfplumber"]
- self.has_init_pdf = [0] * len(self.packages)
- # 记录图片对象的md5,用于去除大量重复图片
- self.md5_image_obj_list = []
- # 记录该页是不是纯文本
- self.only_text_list = []
- # 是否提取特殊页
- self.convert_specific_page = 1
- # 初始化_page
- self._page = _Page(None, 0)
- # 需要直接转成image来识别的页面
- if type(page_need_to_image_dict) is not dict:
- self.page_need_to_image_dict = {}
- else:
- self.page_need_to_image_dict = page_need_to_image_dict
- @memory_decorator
- def init_package(self, package_name):
- # 各个包初始化
- try:
- laparams = LAParams(line_overlap=0.01,
- char_margin=0.3,
- line_margin=0.01,
- word_margin=0.01,
- # boxes_flow=0.1,
- boxes_flow=None,
- )
- if package_name == self.packages[0]:
- self.doc_pdfminer, self.device, self.interpreter = read_pdfminer(self.path, laparams)
- self.has_init_pdf[0] = 1
- elif package_name == self.packages[1]:
- self.doc_pymupdf = read_pymupdf(self.path)
- self.has_init_pdf[1] = 1
- elif package_name == self.packages[2]:
- self.doc_pypdf2, self.doc_pypdf2_new = read_pypdf2(self.path)
- self.has_init_pdf[2] = 1
- elif package_name == self.packages[3]:
- self.lt, self.doc_top, self.doc_pdfplumber = read_pdfplumber(self.path, laparams)
- self.has_init_pdf[3] = 1
- else:
- log("Only Support Packages " + str(self.packages))
- raise Exception
- except Exception as e:
- log(package_name + " cannot open pdf!")
- traceback.print_exc()
- self._doc.error_code = [-3]
- @memory_decorator
- def convert(self, limit_page_cnt=50):
- if self.has_init_pdf[0] == 0:
- self.init_package("pdfminer")
- if self._doc.error_code is not None:
- self._doc.error_code = None
- # pdfminer读不了直接转成图片识别
- self.get_all_page_image()
- return
- # 判断是否能读pdf
- try:
- pages = PDFPage.create_pages(self.doc_pdfminer)
- for page in pages:
- break
- pages = list(pages)
- # except pdfminer.psparser.PSEOF as e:
- except:
- # pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别
- log("pdfminer read failed! read by pymupdf!")
- traceback.print_exc()
- try:
- self.get_all_page_image()
- return
- except:
- traceback.print_exc()
- log("use pymupdf read failed!")
- self._doc.error_code = [-3]
- return
- # 每一页进行处理
- pages = PDFPage.create_pages(self.doc_pdfminer)
- pages = list(pages)
- page_count = len(pages)
- self.only_text_list = [-1] * len(pages)
- page_no = 0
- layout_list = []
- all_text_box_list = []
- for page in pages:
- # 指定pdf页码
- if self.start_page_no is not None and self.end_page_no is not None:
- if page_count < self.end_page_no:
- self.end_page_no = page_count
- if page_no < self.start_page_no or page_no >= self.end_page_no:
- page_no += 1
- continue
- # 限制pdf页数,只取前后各10页
- else:
- # if page_count > limit_page_cnt and int(limit_page_cnt / 2) <= page_no < page_count - int(
- # limit_page_cnt / 2):
- # page_no += 1
- # continue
- if page_count > limit_page_cnt and page_no >= limit_page_cnt:
- page_no += 1
- continue
- # 读取layout
- start_time = time.time()
- layout, layout_obj_list, max_y = self.read_layout(page, page_no)
- # 处理所需obj
- layout_obj_list = self.get_need_objs(layout_obj_list, max_y)
- all_text_box_list += layout_obj_list[1]
- layout_list.append([layout, layout_obj_list, max_y, page_no])
- log('read_layout page_no: ' + str(page_no) + ' cost: ' + str(time.time() - start_time))
- page_no += 1
- # 去跨页水印
- _, delete_water_mark_list = self.delete_water_mark(all_text_box_list, layout.bbox, times=int(len(layout_list)/2))
- # print('delete_water_mark_list', delete_water_mark_list)
- delete_water_mark_list = []
- for layout, layout_obj_list, max_y, page_no in layout_list:
- # for obj in layout_obj_list:
- # print('obj', obj)
- # 解析单页
- start_time = time.time()
- self._page = _Page(None, page_no)
- self._page.is_pdf = 1
- self.convert_page(layout, layout_obj_list, max_y, page_no, delete_water_mark_list)
- self._page.children.sort(key=lambda x: x.y)
- log('convert_page page_no: ' + str(page_no) + ' cost: ' + str(time.time() - start_time))
- if self._doc.error_code is None and self._page.error_code is not None:
- if self._page.error_code[0] in [-4, -3, 0]:
- continue
- else:
- self._doc.error_code = self._page.error_code
- break
- self._doc.add_child(self._page)
- self._doc.children, detete_header_footer_list = self.delete_header_footer(self._doc.children)
- if self.convert_specific_page and self.need_page_no is None:
- # 补充提取特定页
- if self.only_text_list.count(0) == 0:
- ratio = 0
- else:
- ratio = self.only_text_list.count(0) / (page_count - self.only_text_list.count(-1))
- if page_count > limit_page_cnt and ratio <= 0.2:
- page_no = 0
- find_flag = 0
- add_page_list = []
- for page in pages:
- # if not int(limit_page_cnt / 2) <= page_no < page_count - int(limit_page_cnt / 2):
- # page_no += 1
- # continue
- if not (page_no >= limit_page_cnt):
- page_no += 1
- continue
- # 解析单页
- start_time = time.time()
- self._page = _Page(page, page_no)
- layout, layout_obj_list, max_y = self.read_layout(page, page_no)
- layout_obj_list = self.get_need_objs(layout_obj_list, max_y)
- self.convert_page(layout, layout_obj_list, max_y, page_no, delete_water_mark_list, skip_image=1)
- self._page.children.sort(key=lambda x: x.y)
- log('convert_page add page_no: ' + str(page_no) + ' cost: ' + str(time.time() - start_time))
- # 删除页眉页脚
- pages, _ = self.delete_header_footer([self._page], detete_header_footer_list)
- self._page = pages[0]
- # 提取特殊部分,即下面关键词+表格
- re_str = '采购清单|采购需求|需求概况'
- if find_flag and len(self._page.children) > 0 and type(self._page.children[0]) == _Table:
- log('add page1 ' + str(page_no))
- add_page_list.append(self._page)
- if len(self._page.children) - 1 > 3:
- find_flag = 0
- for index in range(len(self._page.children)):
- obj = self._page.children[index]
- if not (type(obj) == _Sentence and re.search(re_str, obj.content)):
- continue
- next_obj = None
- if index + 1 < len(self._page.children):
- for j in range(index + 1, min(len(self._page.children), index + 5)):
- if type(self._page.children[j]) == _Table:
- next_obj = self._page.children[j]
- break
- if next_obj:
- if self._page not in add_page_list:
- add_page_list.append(self._page)
- log('add page2 ' + str(page_no))
- if len(self._page.children) - index - 1 > 3:
- find_flag = 0
- else:
- find_flag = 1
- page_no += 1
- if add_page_list:
- # self._doc.children = self._doc.children[:int(limit_page_cnt / 2)] \
- # + add_page_list \
- # + self._doc.children[int(limit_page_cnt / 2):]
- self._doc.children = self._doc.children[:limit_page_cnt] \
- + add_page_list
- self.delete_same_image()
- # self.delete_bold_text_duplicate()
- def delete_same_image(self, show=0):
- # 剔除大量重复图片
- md5_dict = {}
- for _md5, image_obj in self.md5_image_obj_list:
- if _md5 in md5_dict.keys():
- md5_dict[_md5] += [image_obj]
- else:
- md5_dict[_md5] = [image_obj]
- cnt_threshold = 10
- delete_obj_list = []
- for _md5 in md5_dict.keys():
- img_list = md5_dict.get(_md5)
- # print('len(md5_dict.get(_md5))', _md5, len(img_list))
- if len(img_list) >= cnt_threshold:
- if show:
- img_np = bytes2np(img_list[0].content)
- cv2.namedWindow('delete same img_np', cv2.WINDOW_NORMAL)
- cv2.imshow('delete same img_np', img_np)
- cv2.waitKey(0)
- delete_obj_list += img_list
- for page in self._doc.children:
- for obj in delete_obj_list:
- if obj in page.children:
- page.children.remove(obj)
- if show:
- for page in self._doc.children:
- for obj in page.children:
- if isinstance(obj, _Image):
- img_np = bytes2np(obj.content)
- cv2.imshow('page img_np', img_np)
- cv2.waitKey(0)
- def delete_header_footer(self, pages, delete_list=[]):
- sen_dict = {}
- for page in pages:
- for obj in page.children:
- if isinstance(obj, _Sentence):
- key = str(obj.content) + ' ' + str(int(obj.y))
- # print('key', key)
- if key in sen_dict.keys():
- sen_dict[key] += [obj]
- else:
- sen_dict[key] = [obj]
- # 把需删除的加上
- # print('delete_list', delete_list)
- for key in delete_list:
- if key in sen_dict:
- sen_dict[key] = sen_dict.get(key) * 10
- # print('sen_dict', sen_dict)
- delete_footer_header_list = []
- for key in sen_dict.keys():
- l = sen_dict.get(key)
- if len(l) >= 1 / 3 * max(10, len(pages)):
- delete_footer_header_list.append(key)
- for page in pages:
- new_children = []
- for obj in page.children:
- if isinstance(obj, _Sentence):
- if obj not in l:
- new_children.append(obj)
- else:
- new_children.append(obj)
- page.children = new_children
- # print('len(l)', len(l), len(pages))
- # print('delete_header_footer l[0]', l[0].content, l[0].y)
- return pages, delete_footer_header_list
- @memory_decorator
- def delete_bold_text_duplicate(self, lt_text_box_list):
- # 拿出所有LTChar
- lt_char_list = []
- for lt_text_box in lt_text_box_list:
- if '.......' in lt_text_box.get_text():
- # print('....... lt_text_box continue')
- continue
- for lt_text_line in lt_text_box:
- for lt_char in lt_text_line:
- if isinstance(lt_char, LTChar):
- lt_char_list.append(lt_char)
- # 找出需剔除的
- lt_char_list.sort(key=lambda x: (int(x.bbox[1]), x.bbox[0]))
- delete_list = []
- for i in range(len(lt_char_list)):
- lt_char1 = lt_char_list[i]
- bbox1 = lt_char1.bbox
- if lt_char1 in delete_list:
- continue
- for j in range(i + 1, len(lt_char_list)):
- lt_char2 = lt_char_list[j]
- bbox2 = lt_char2.bbox
- if lt_char2 in delete_list:
- continue
- if lt_char1.get_text() == lt_char2.get_text() and bbox_iou(bbox1, bbox2) >= 0.3 \
- and re.search('[\u4e00-\u9fff():、,。]', lt_char1.get_text()):
- delete_list.append(lt_char2)
- # 重新组装
- new_lt_text_box_list = []
- for lt_text_box in lt_text_box_list:
- new_lt_text_box = LTTextBoxHorizontal()
- for lt_text_line in lt_text_box:
- new_lt_text_line = LTTextLine(0.01)
- for lt_char in lt_text_line:
- if lt_char in delete_list:
- continue
- if isinstance(lt_char, LTChar):
- new_lt_text_line.add(lt_char)
- new_lt_text_box.add(new_lt_text_line)
- new_lt_text_box_list.append(new_lt_text_box)
- return new_lt_text_box_list
- def clean_text(self, _text):
- return re.sub("\s", "", _text)
- def get_text_lines(self, page, page_no):
- lt_line_list = []
- page_plumber = pdfPage(self.doc_pdfplumber, page, page_number=page_no, initial_doctop=self.doc_top)
- self.doc_top += page_plumber.height
- table_finder = TableFinder(page_plumber)
- all_width_zero = True
- for _edge in table_finder.get_edges():
- if _edge.get('linewidth') and _edge.get('linewidth') > 0:
- all_width_zero = False
- break
- for _edge in table_finder.get_edges():
- # print(_edge)
- if _edge.get('linewidth', 0.1) > 0 or all_width_zero:
- lt_line_list.append(LTLine(1, (float(_edge["x0"]), float(_edge["y0"])),
- (float(_edge["x1"]), float(_edge["y1"]))))
- log("pdf page_no %s has %s lines" % (str(page_no), str(len(lt_line_list))))
- return lt_line_list
- @memory_decorator
- def get_page_lines(self, lt_line_list, layout, page_no):
- lt_line_list = table_line_pdf(lt_line_list, layout, page_no)
- return lt_line_list
- @memory_decorator
- def recognize_text(self, layout, page_no, lt_text_list, lt_line_list):
- list_tables, filter_objs, _, connect_textbox_list = self.lt.recognize_table(lt_text_list, lt_line_list,
- from_pdf=True, is_reverse=False)
- # self._page.in_table_objs = filter_objs
- # print("=======text_len:%d:filter_len:%d"%(len(lt_text_list),len(filter_objs)))
- table_list = []
- for table in list_tables:
- _table = _Table(table["table"], table["bbox"])
- # self._page.children.append(_table)
- self._page.add_child(_table)
- table_list.append(_table)
- list_sentences = ParseUtils.recognize_sentences(lt_text_list, filter_objs,
- layout.bbox, page_no)
- for sentence in list_sentences:
- # print('sentence.text', sentence.text)
- _sen = _Sentence(sentence.text, sentence.bbox)
- self._page.add_child(_sen)
- # pdf对象需反向排序
- # self._page.is_reverse = True
- return table_list
- def is_text_legal(self, lt_text_list, page_no):
- # 无法识别pdf字符编码,整页用ocr
- text_temp = ""
- for _t in lt_text_list:
- text_temp += _t.get_text()
- if re.search('[(]cid:[0-9]+[)]', text_temp):
- log("page_no: " + str(page_no) + " text has cid! try pymupdf...")
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- self._page.add_child(_image)
- return False
- match1 = re.findall(get_garble_code(), text_temp)
- # match2 = re.search('[\u4e00-\u9fa5]', text_temp)
- if len(match1) > 8 and len(text_temp) > 10:
- log("page_no: " + str(page_no) + " garbled code! try pymupdf... " + text_temp[:20])
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- self._page.add_child(_image)
- return False
- return True
- @memory_decorator
- def judge_b_table(self, lt_text_list, table_list, page_no):
- table_h_list = []
- for table in table_list:
- table_h_list.append([table.bbox[1], table.bbox[3]])
- # 先分行
- lt_text_list.sort(key=lambda x: (x.bbox[1], x.bbox[0]))
- lt_text_row_list = []
- current_h = lt_text_list[0].bbox[1]
- row = []
- threshold = 2
- for lt_text in lt_text_list:
- bbox = lt_text.bbox
- if current_h - threshold <= bbox[1] <= current_h + threshold:
- row.append(lt_text)
- else:
- if row:
- lt_text_row_list.append(row)
- row = [lt_text]
- current_h = lt_text.bbox[1]
- if row:
- lt_text_row_list.append(row)
- # 判断文本中间是否是空格,或一行文本中间有多个
- is_b_table_cnt = 3
- tolerate_cnt = 2
- t_cnt = 0
- row_cnt = 0
- b_table_row_list = []
- all_b_table = []
- row_col_list = []
- all_row_col_list = []
- for row in lt_text_row_list:
- # 水印行跳过
- if len(row) == 1 and len(row[0].get_text()[:-1]) == 1:
- continue
- # 目录行跳过
- continue_flag = False
- for r in row:
- if re.search('[.·]{7,}', r.get_text()):
- continue_flag = True
- all_row_col_list = []
- break
- if continue_flag:
- continue
- if len(row) == 1:
- text = row[0].get_text()
- bbox = row[0].bbox
- match = re.search('[ ]{3,}', text)
- if match and re.search('[\u4e00-\u9fff]{2,}', text[:match.span()[0]]) \
- and re.search('[\u4e00-\u9fff]{2,}', text[match.span()[1]:]):
- row_cnt += 1
- t_cnt = 0
- b_table_row_list += row
- row_col_list += [row]
- else:
- # 容忍
- if t_cnt < tolerate_cnt:
- t_cnt += 1
- continue
- if b_table_row_list and row_cnt >= is_b_table_cnt:
- all_b_table.append(b_table_row_list)
- all_row_col_list.append(row_col_list)
- row_cnt = 0
- b_table_row_list = []
- row_col_list = []
- else:
- row_cnt += 1
- t_cnt = 0
- b_table_row_list += row
- row_col_list += [row]
- if b_table_row_list and row_cnt >= is_b_table_cnt:
- all_b_table.append(b_table_row_list)
- all_row_col_list.append(row_col_list)
- # print('b_table_row_list', b_table_row_list)
- # 排除大部分是两列的,因为前面已经新增了两列无边框的单独识别
- # print('len(all_row_col_list)', len(all_row_col_list))
- row_cnt = 0
- col_2_cnt = 0
- for row_col_list in all_row_col_list:
- for col_list in row_col_list:
- row_cnt += 1
- if len(col_list) == 2:
- col_2_cnt += 1
- # print('col_list', col_list)
- # print('row_cnt, col_2_cnt', row_cnt, col_2_cnt)
- if row_cnt == 0 or col_2_cnt / row_cnt >= 0.5:
- log("page_no: " + str(page_no) + ' is_b_table_flag False')
- return False
- # 对每个可能的b_table判断是否与table相交
- is_b_table_flag = False
- for b_table in all_b_table:
- # 判断在不在有边框表格的范围
- in_flag = False
- for table_h in table_h_list:
- for b in b_table:
- if min(table_h) <= b.bbox[1] <= max(table_h) or min(table_h) <= b.bbox[3] <= max(table_h):
- in_flag = True
- break
- if in_flag:
- break
- if in_flag:
- is_b_table_flag = False
- else:
- is_b_table_flag = True
- # print('is_b_table_flag True ', [[x.get_text(), x.bbox] for x in b_table])
- # print('table_h_list', table_h_list)
- break
- log("page_no: " + str(page_no) + ' is_b_table_flag ' + str(is_b_table_flag))
- # 保存判断为True的pdf
- # if is_b_table_flag:
- # self.save_b_table_pdf(page_no)
- return is_b_table_flag
- def save_b_table_pdf(self, page_no):
- # save_dir = r"D:\Project\format_conversion_maxcompute\save_b_table_pdf"
- save_dir = '/data/fangjiasheng/format_conversion_maxcompute/save_b_table_pdf'
- max_index = 200
- if os.path.exists(save_dir):
- file_list = glob(save_dir + '/*')
- if file_list:
- file_index_list = [int(re.split('[/.\\\\-]', x)[-3]) for x in file_list]
- file_index_list.sort(key=lambda x: x)
- index = file_index_list[-1] + 1
- else:
- index = 0
- if index > max_index:
- return
- else:
- return
- save_path = f'{save_dir}/{index}-{page_no}.pdf'
- try:
- shutil.copy(self.path, save_path)
- print("文件复制成功!")
- except Exception as e:
- print(f"文件复制失败:{e}")
- def char_to_text_box(self, char_list):
- lt_text_box_list = []
- # char分行 20241028
- new_text_line_list = []
- new_text_line = []
- lt_char_y = None
- for y in char_list:
- if lt_char_y is None:
- lt_char_y = y.bbox[1]
- if abs(lt_char_y - y.bbox[1]) >= 1:
- # new_lt_text_box.add(new_lt_text_line)
- # lt_text_box_list.append(new_lt_text_box)
- new_text_line_list.append(new_text_line)
- # print('new_text_line', lt_char_y, y.bbox[1], new_text_line)
- new_text_line = [y]
- lt_char_y = y.bbox[1]
- else:
- new_text_line.append(y)
- if new_text_line:
- new_text_line_list.append(new_text_line)
- # char分列
- temp_list = []
- for new_text_line in new_text_line_list:
- new_text_line.sort(key=lambda x: x.bbox[0])
- lt_char_x = new_text_line[0].bbox[2]
- split_text_line = []
- for y in new_text_line:
- if abs(lt_char_x - y.bbox[0]) > abs(y.bbox[2] - y.bbox[0]) / len(y.get_text()):
- temp_list.append(split_text_line)
- split_text_line = [y]
- else:
- split_text_line.append(y)
- lt_char_x = y.bbox[2]
- if split_text_line:
- temp_list.append(split_text_line)
- new_text_line_list = temp_list
- # LTFigure也有LTChar,拼成LTTextLine/LTTextBox放入 20241012
- text_box_char_dict = {}
- for new_text_line in new_text_line_list:
- new_text_line.sort(key=lambda x: x.bbox[0])
- new_lt_text_box = LTTextBoxHorizontal()
- new_lt_text_line = LTTextLine(0.01)
- for y in new_text_line:
- new_lt_text_line.add(y)
- new_lt_text_box.add(new_lt_text_line)
- lt_text_box_list.append(new_lt_text_box)
- key = new_lt_text_line.get_text() + str(new_lt_text_line.bbox)
- text_box_char_dict[key] = new_text_line
- # print('text_box_char_dict', text_box_char_dict)
- return lt_text_box_list, text_box_char_dict
- @memory_decorator
- def get_need_objs(self, obj_list, max_y):
- # 文字
- lt_char_list = []
- lt_text_box_list = []
- # 图像
- lt_image_list = []
- # 嵌套图表
- lt_figure_list = []
- # 线
- lt_line_list = []
- # text_box中单个字符映射
- text_box_char_dict = {}
- for x in obj_list:
- # 重置bbox
- x.set_bbox((x.x0, round(max_y - max(x.y0, x.y1), 1), x.x1, round(max_y - min(x.y0, x.y1), 1)))
- # 需重置内部LTChar
- if isinstance(x, (LTTextBoxHorizontal, LTTextBoxVertical)):
- for lt_text_line in x:
- new_lt_char_list = []
- new_lt_char_text = ''
- for lt_char in lt_text_line:
- if isinstance(lt_char, LTChar):
- lt_char.set_bbox((lt_char.x0, round(max_y - max(lt_char.y0, lt_char.y1), 1), lt_char.x1,
- round(max_y - min(lt_char.y0, lt_char.y1), 1)))
- # 填充颜色、描边颜色都为白色
- if lt_char.graphicstate.scolor == [1, 1, 1] and lt_char.graphicstate.ncolor == [1, 1, 1]:
- continue
- new_lt_char_list.append(lt_char)
- new_lt_char_text += lt_char.get_text()
- text_box_char_dict[new_lt_char_text + str(x.bbox)] = new_lt_char_list
- lt_text_line._objs = new_lt_char_list
- # 分类
- if isinstance(x, LTChar):
- lt_char_list.append(x)
- elif isinstance(x, (LTTextBoxHorizontal, LTTextBoxVertical)):
- lt_text_box_list.append(x)
- elif isinstance(x, LTImage):
- lt_image_list.append(x)
- elif isinstance(x, LTFigure):
- lt_figure_list.append(x)
- elif isinstance(x, (LTTextContainer, LTRect, LTLine, LTCurve)):
- lt_line_list.append(x)
- # print('len(obj_list)', len(obj_list))
- # print('len(lt_char_list)', len(lt_char_list))
- # print('len(lt_text_box_list)', len(lt_text_box_list))
- # if len(lt_text_box_list) >= 200:
- # for lt_text in lt_text_box_list:
- # print('>= 200 lt_text', lt_text.get_text())
- # print('len(lt_image_list)', len(lt_image_list))
- if lt_figure_list:
- temp_figure_list = []
- for sub_figure in lt_figure_list:
- sub_lt_char_list, sub_lt_text_box_list, sub_lt_image_list, sub_lt_figure_list, \
- sub_lt_line_list, sub_text_box_char_dict = self.get_need_objs(sub_figure, max_y)
- lt_char_list += sub_lt_char_list
- lt_text_box_list += sub_lt_text_box_list
- lt_image_list += sub_lt_image_list
- temp_figure_list += sub_lt_figure_list
- lt_line_list += sub_lt_line_list
- text_box_char_dict = {**text_box_char_dict, **sub_text_box_char_dict}
- lt_figure_list = temp_figure_list
- # LTChar拼成LTTextBox
- lt_char_list.sort(key=lambda z: (z.bbox[1], z.bbox[0]))
- add_lt_text_box_list, add_text_box_char_dict = self.char_to_text_box(lt_char_list)
- for lt_text_box in add_lt_text_box_list:
- if lt_text_box in lt_text_box_list:
- continue
- lt_text_box_list += add_lt_text_box_list
- lt_char_list = []
- text_box_char_dict = {**text_box_char_dict, **add_text_box_char_dict}
- lt_text_box_list = self.delete_water_mark_by_location(lt_text_box_list)
- # 分行后过滤
- temp_list = []
- for lt_text_box in lt_text_box_list:
- if lt_text_box.get_text() in ['', ' ', '\t', '\n', '\r']:
- continue
- temp_list.append(lt_text_box)
- if len(lt_text_box_list) != len(temp_list):
- log('filter lt_text_box_list ' + str(len(lt_text_box_list)) + ' -> ' + str(len(temp_list)))
- lt_text_box_list = temp_list
- return lt_char_list, lt_text_box_list, lt_image_list, lt_figure_list, lt_line_list, text_box_char_dict
- @memory_decorator
- def read_layout(self, page, page_no):
- layout = self.get_layout(page, page_no)
- if self._doc.error_code is not None:
- return
- if judge_error_code(layout):
- self._page.error_code = layout
- return
- # 翻转pdf中所有对象的y坐标
- # max_y, min_y = 0, 10000
- layout_obj_list = []
- max_y = layout.height
- for x in layout:
- layout_obj_list.append(x)
- return layout, layout_obj_list, max_y
- def split_text_box_by_lines(self, lt_line_list, lt_text_box_list):
- """
- 无单个字符位置信息,依赖图片长度及字符串长度判断,确定截断位置,分割text
- :param lt_line_list:
- :param lt_text_box_list:
- :return:
- """
- col_lines = []
- for line in lt_line_list:
- bbox = line.bbox
- if abs(bbox[1] - bbox[3]) > abs(bbox[0] - bbox[2]):
- col_lines.append(line)
- # 匹配中文字符
- reg = '[\u3000-\u303f]|[\u4e00-\u9fa5]'
- delete_text_box_list = []
- add_text_box_list = []
- for text_box in lt_text_box_list:
- text = text_box.get_text()
- bbox = text_box.bbox
- if len(text) == 0:
- continue
- split_x_list = []
- for line in col_lines:
- if bbox[0] < line.bbox[0] < bbox[2] and line.bbox[1] <= bbox[1] <= line.bbox[3]:
- split_x_list.append(line.bbox[0])
- if not split_x_list:
- continue
- # 计算单个字符长度,中文宽度是英文两倍
- chinese_text = ''.join(re.findall(reg, text))
- chinese_len = len(chinese_text)
- other_len = len(text) - chinese_len
- one_char_len = abs(bbox[2] - bbox[0]) / (chinese_len * 2 + other_len)
- # 根据分割距离计算取前几个字符
- temp_text = re.sub(reg, '我', text)
- start_x = bbox[0]
- start_cnt = 0
- split_text_box_list = []
- split_x_list.append(bbox[2])
- split_x_list.sort(key=lambda z: z)
- for x in split_x_list:
- distance = x - start_x
- char_cnt = int(distance / one_char_len)
- add_char_cnt = 0
- real_char_cnt = 0
- for c in temp_text:
- if add_char_cnt >= char_cnt:
- break
- if c == '我':
- add_char_cnt += 2
- else:
- add_char_cnt += 1
- real_char_cnt += 1
- real_text = text[start_cnt:start_cnt+real_char_cnt]
- split_text_box_list.append([real_text, start_x, x])
- start_x = x
- start_cnt = start_cnt+real_char_cnt
- # print('split_text_box_list', split_text_box_list)
- # 将分割好的文本做成对象
- if split_text_box_list:
- delete_text_box_list.append(text_box)
- lt_chars = [y for x in text_box for y in x]
- lt_char = lt_chars[0]
- # print('lt_char', lt_char)
- for text, start_x, x in split_text_box_list:
- new_lt_char = copy.deepcopy(lt_char)
- new_lt_char._text = text
- new_lt_char.set_bbox([start_x, lt_char.bbox[1], x, lt_char.bbox[3]])
- # print('new_lt_char', new_lt_char)
- new_lt_text_box = LTTextBoxHorizontal()
- new_lt_text_line = LTTextLine(0.01)
- new_lt_text_line.add(new_lt_char)
- new_lt_text_box.add(new_lt_text_line)
- add_text_box_list.append(new_lt_text_box)
- temp_list = []
- for text_box in lt_text_box_list:
- if text_box in delete_text_box_list:
- continue
- temp_list.append(text_box)
- lt_text_box_list = temp_list
- lt_text_box_list += add_text_box_list
- # print('lt_text_box_list', lt_text_box_list)
- return lt_text_box_list
- @memory_decorator
- def split_text_box_by_lines2(self, lt_line_list, lt_text_box_list, text_box_char_dict):
- """
- 有单个字符位置信息,再根据表格线截断位置,分割text
- :param lt_line_list:
- :param lt_text_box_list:
- :return:
- """
- col_lines = []
- for line in lt_line_list:
- bbox = line.bbox
- if abs(bbox[1] - bbox[3]) > abs(bbox[0] - bbox[2]):
- col_lines.append(line)
- delete_text_box_list = []
- add_text_box_list = []
- for text_box in lt_text_box_list:
- text = text_box.get_text()
- bbox = text_box.bbox
- if len(text) == 0:
- continue
- split_x_list = []
- for line in col_lines:
- if bbox[0] < line.bbox[0] < bbox[2] and line.bbox[1] <= bbox[1] <= line.bbox[3]:
- split_x_list.append(line.bbox[0])
- # print('split_x_list', split_x_list)
- if not split_x_list:
- continue
- split_x_list.append(bbox[2])
- # 获取字符位置信息
- key = text + str(bbox)
- # print('key', key)
- char_list = text_box_char_dict.get(key)
- # print('char_list', char_list)
- if not char_list or len(char_list) <= 1:
- continue
- char_x_list = [x.bbox[0] for x in char_list]
- char_x_list += split_x_list
- char_x_list.sort(key=lambda x: x)
- split_text_box_list = []
- start_index = 0
- start_x = char_x_list[0]
- for x in split_x_list:
- index = char_x_list.index(x)
- sub_text = text[start_index:index]
- if len(sub_text) == 0:
- continue
- end_x = char_x_list[index-1]
- if start_x == end_x:
- end_x += 1
- split_text_box_list.append([sub_text, start_x, end_x])
- start_index = index
- if index + 1 >= len(char_x_list):
- break
- start_x = char_x_list[index+1]
- # print('split_text_box_list', split_text_box_list)
- # 将分割好的文本做成对象
- if split_text_box_list:
- delete_text_box_list.append(text_box)
- lt_chars = [y for x in text_box for y in x]
- lt_char = lt_chars[0]
- # print('lt_char', lt_char)
- for text, start_x, x in split_text_box_list:
- new_lt_char = copy.deepcopy(lt_char)
- new_lt_char._text = text
- new_lt_char.set_bbox([start_x, lt_char.bbox[1], x, lt_char.bbox[3]])
- # print('new_lt_char', new_lt_char)
- new_lt_text_box = LTTextBoxHorizontal()
- new_lt_text_line = LTTextLine(0.01)
- new_lt_text_line.add(new_lt_char)
- new_lt_text_box.add(new_lt_text_line)
- add_text_box_list.append(new_lt_text_box)
- temp_list = []
- for text_box in lt_text_box_list:
- if text_box in delete_text_box_list:
- continue
- temp_list.append(text_box)
- lt_text_box_list = temp_list
- lt_text_box_list += add_text_box_list
- # print('lt_text_box_list', lt_text_box_list)
- return lt_text_box_list
- @memory_decorator
- def convert_page(self, layout, layout_obj_list, max_y, page_no, delete_water_mark_list, skip_image=0):
- # 若Page中一个obj都无,后面ocr整页识别 20240820
- if max_y == 0 and len(layout_obj_list) > 0:
- return
- # 若该页在page_need_to_image_dict中为True,则直接ocr整页识别
- if self.page_need_to_image_dict.get(page_no) is True:
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- _image.is_from_pdf = True
- _image.is_reverse = False
- self._page.add_child(_image)
- return
- lt_char_list, lt_text_box_list, lt_image_list, lt_figure_list, \
- lt_line_list, text_box_char_dict = layout_obj_list
- # 排除水印文字
- for water_mark in delete_water_mark_list:
- temp_list = []
- for lt_text_box in lt_text_box_list:
- if water_mark == lt_text_box.get_text():
- continue
- temp_list.append(lt_text_box)
- lt_text_box_list = temp_list
- # 判断纯文本
- if len(lt_image_list) == 0 and len(lt_text_box_list) == 0:
- self.only_text_list[page_no] = 0
- elif len(lt_image_list) == 0:
- self.only_text_list[page_no] = 1
- else:
- self.only_text_list[page_no] = 0
- # 跳过图片
- if skip_image:
- lt_image_list = []
- # 判断读出来的是乱码,但有图片直接识别
- all_text = ''.join([x.get_text() for x in lt_text_box_list])
- all_text = re.sub('[\s\d]', '', all_text)
- if len(re.findall(get_garble_code2(), all_text)) >= 3 and len(lt_image_list) >= 1:
- log('嵌入的文字是乱码1: ' + str(all_text[:10]))
- lt_text_box_list = []
- if 3 <= len(re.findall(get_traditional_chinese(), all_text)) <= len(all_text) / 2 and len(lt_image_list) >= 1:
- log('嵌入的文字是乱码2: ' + str(all_text[:10]))
- lt_text_box_list = []
- # 解决重复粗体字问题
- lt_text_box_list = self.delete_bold_text_duplicate(lt_text_box_list)
- # 删除单页水印字
- lt_text_box_list, _ = self.delete_water_mark(lt_text_box_list, layout.bbox, 15)
- log("page_no: " + str(page_no) + " len(lt_image_list), len(lt_text_box_list) " +
- str(len(lt_image_list)) + " " + str(len(lt_text_box_list)))
- # 按照字体颜色判断水印
- # lt_text_box_list = self.delete_water_mark_by_color(lt_text_box_list)
- # 按照字体旋转角度判断水印
- lt_text_box_list = self.delete_water_mark_by_rotate(lt_text_box_list)
- # 若该页图片数量过多,或无文本,则直接ocr整页识别
- if len(lt_image_list) > 4 or len(lt_text_box_list) == 0:
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- _image.is_from_pdf = True
- _image.is_reverse = False
- self._page.add_child(_image)
- # 正常读取该页对象
- else:
- # 图表对象
- # for image in lt_image_list:
- # try:
- # # print("pdf2text LTImage size", page_no, image.width, image.height)
- # # image_stream = image.stream.get_data()
- # print('image.stream.get_filters()', image.stream.get_filters())
- # image_stream = image.stream.get_data()
- # # 小的图忽略
- # if image.width <= 300 and image.height <= 300:
- # continue
- # # 查看提取的图片高宽,太大则用pdf输出图进行ocr识别
- # img_test = Image.open(io.BytesIO(image_stream))
- # # img_test = self.pdfminer_stream_to_image(image)
- # if image.height >= 1000 and image.width >= 1000:
- # page_image = self.get_page_image(page_no)
- # if judge_error_code(page_image):
- # self._page.error_code = page_image
- # else:
- # _image = _Image(page_image[1], page_image[0])
- # _image.is_from_pdf = True
- # _image.is_reverse = False
- # self._page.add_child(_image)
- # image_md5 = get_md5_from_bytes(page_image[1])
- # self.md5_image_obj_list.append([image_md5, _image])
- # return
- # # 比较小的图则直接保存用ocr识别
- # else:
- # temp_path = self.unique_type_dir + 'page' + str(page_no) \
- # + '_lt' + str(lt_image_list.index(image)) + '.jpg'
- # img_test.save(temp_path)
- # with open(temp_path, "rb") as ff:
- # image_stream = ff.read()
- # _image = _Image(image_stream, temp_path, image.bbox)
- # self._page.add_child(_image)
- # image_md5 = get_md5_from_bytes(image_stream)
- # self.md5_image_obj_list.append([image_md5, _image])
- # except Exception:
- # log("page_no: " + str(page_no) + " pdfminer read image fail! use pymupdf read image...")
- # traceback.print_exc()
- # pdf对象需反向排序
- # self._page.is_reverse = True
- status = self.pdfminer_read_page_images(lt_image_list, page_no)
- if not status:
- log('pymupdf 提取页面中图片 page_no: ' + str(page_no))
- status = self.pymupdf_read_page_images(page_no)
- if not status:
- log('pymupdf 整页转化为图片 page_no: ' + str(page_no))
- status = self.pymupdf_get_whole_page_image(page_no)
- if self.has_init_pdf[3] == 0:
- self.init_package("pdfplumber")
- if not self.is_text_legal(lt_text_box_list, page_no):
- return
- try:
- # lt_line_list = self.get_page_lines(layout, page_no)
- lt_line_list = self.get_page_lines(lt_line_list, layout, page_no)
- except:
- traceback.print_exc()
- lt_line_list = []
- self._page.error_code = [-13]
- # 根据表格线分割lt_text_box
- lt_text_box_list = self.split_text_box_by_lines2(lt_line_list, lt_text_box_list, text_box_char_dict)
- # 文本填入表格
- table_list = self.recognize_text(layout, page_no, lt_text_box_list, lt_line_list)
- # 根据text规律,判断该页是否可能有无边框表格
- try:
- b_table_list, _ = get_b_table_by_blank_colon(lt_text_box_list, table_list, layout.bbox, None)
- except:
- traceback.print_exc()
- b_table_list = []
- self._page.error_code = [-23]
- if b_table_list:
- for table in b_table_list:
- _table = _Table(table[0], table[1])
- table_list += [_table]
- self._page.add_child(_table)
- for t in table_list:
- self._page.table_bbox_list.append(t.bbox)
- if self.judge_b_table(lt_text_box_list, table_list, page_no):
- # log('judge_b_table match! ' + str(page_no))
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0],
- bbox=(10000, 10000, 10001, 10001))
- _image.is_from_pdf = True
- # _image.is_reverse = True
- _image.b_table_from_text = True
- _image.b_table_text_obj_list = lt_text_box_list
- _image.b_table_layout_size = (layout.width, layout.height)
- self._page.add_child(_image)
- @memory_decorator
- def get_layout(self, page, page_no):
- if self.has_init_pdf[0] == 0:
- self.init_package("pdfminer")
- if self._doc.error_code is not None:
- return
- # 获取该页layout
- start_time = time.time()
- try:
- if get_platform() == "Windows":
- layout = pdf_analyze(self.interpreter, page, self.device, page_no)
- else:
- layout = pdf_analyze(self.interpreter, page, self.device, page_no)
- except TimeoutError as e:
- log("page_no: " + str(page_no) + " pdfminer read page time out! " + str(time.time() - start_time))
- layout = [-4]
- except Exception:
- traceback.print_exc()
- log("page_no: " + str(page_no) + " pdfminer read page error! continue...")
- layout = [-3]
- log("page_no: " + str(page_no) + " get_layout cost: " + str(time.time() - start_time))
- return layout
- @memory_decorator
- def get_page_image(self, page_no):
- start_time = time.time()
- try:
- if self.has_init_pdf[1] == 0:
- self.init_package("PyMuPDF")
- if self._doc.error_code is not None:
- return
- # save_dir = self.path.split(".")[-2] + "_" + self.path.split(".")[-1]
- output = self.unique_type_dir + "page" + str(page_no) + ".png"
- page = self.doc_pymupdf.loadPage(page_no)
- rotate = int(0)
- zoom_x = 2.
- zoom_y = 2.
- mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate)
- pix = page.getPixmap(matrix=mat, alpha=False)
- pix.writePNG(output)
- # 输出图片resize
- self.resize_image(output)
- with open(output, "rb") as f:
- pdf_image = f.read()
- log("page_no: " + str(page_no) + ' get_page_image cost: ' + str(time.time() - start_time))
- return [output, pdf_image]
- except ValueError as e:
- traceback.print_exc()
- if str(e) == "page not in document":
- log("page_no: " + str(page_no) + " page not in document! continue...")
- return [0]
- elif "encrypted" in str(e):
- log("page_no: " + str(page_no) + " document need password")
- return [-7]
- except RuntimeError as e:
- if "cannot find page" in str(e):
- log("page_no: " + str(page_no) + " page cannot find in document! continue...")
- return [0]
- else:
- traceback.print_exc()
- return [-3]
- def get_all_page_image(self):
- start_time = time.time()
- if self.has_init_pdf[1] == 0:
- self.init_package("PyMuPDF")
- if self._doc.error_code is not None:
- return
- page_count = self.doc_pymupdf.page_count
- for page_no in range(page_count):
- # 限制pdf页数,只取前10页后10页
- if page_count > 20:
- if 10 <= page_no < page_count - 10:
- continue
- self._page = _Page(None, page_no)
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- else:
- _image = _Image(page_image[1], page_image[0])
- self._page.add_child(_image)
- # 报错继续读后面页面
- if self._doc.error_code is None and self._page.error_code is not None:
- continue
- self._doc.add_child(self._page)
- log('get_all_page_image cost: ' + str(time.time() - start_time))
- @memory_decorator
- def connect_table(self, html_list, show=0):
- if not html_list:
- return html_list
- # 判断初始条件1
- # 0: 前一页最后一个表格为A,后一页第一个表格为B
- # 1.1: A后无文本(除了页码),且B前无文本(除了页码)
- # 1.2: B前有文字(可能是页眉,小于60字),且B的第一行前几个单元格为空,且第一行不为空的单元格有文字较多的格子
- # 1.3: B前有文字(可能是页眉,小于60字),且B的第一行第一个单元格为空,且有文字的格子数量占所有格子的一半
- # 1.4: B前有文字(可能是页眉,小于60字),且B的第一行第一个单元格为纯数字序号
- # 1.5: A后有文字(除了页码还有页眉),且A的后面只有一行且中文不超过15个字
- connect_flag_list = []
- soup_list = []
- connect_rule_dict = {}
- for i, h in enumerate(html_list):
- soup = BeautifulSoup(h, 'lxml')
- soup_list.append(soup)
- # 找最后一个表格
- last_table_start, last_table_end = None, None
- match = re.finditer('<table', h)
- for m in match:
- last_table_start = m.span()[0]
- if last_table_start is not None:
- match = re.finditer('</table>', h[last_table_start:])
- for m in match:
- last_table_end = m.span()[1] + last_table_start
- # 补充规则,把表格也带上
- rule_a = [0, h[last_table_start:last_table_end]]
- # 最后一个表格后有无除了页码外的内容
- connect_flag1 = False
- if last_table_end is not None:
- match = re.findall('[^-/第页0-9,,]', re.sub('<div>|</div>', '', h[last_table_end:]))
- # print('match', match.group())
- # if not match or match.group() == '':
- if len(match) == 0:
- connect_flag1 = True
- # 有页脚
- if not connect_flag1:
- if len(re.findall('<div>', h[last_table_end:])) <= 1 \
- and len(re.findall('[\u4e00-\u9fff]', h[last_table_end:])) <= 60:
- connect_flag1 = True
- # 找第一个表格
- first_table_start, first_table_end = None, None
- match = re.finditer('<table', h)
- for m in match:
- first_table_start = m.span()[0]
- break
- if first_table_start is not None:
- match = re.finditer('</table>', h[first_table_start:])
- for m in match:
- first_table_end = m.span()[1] + first_table_start
- # 补充规则,把表格也带上
- rule_b = [0, h[first_table_start:first_table_end]]
- # 第一个表格前有无内容
- connect_flag2 = False
- if first_table_start is not None and first_table_start == 0:
- connect_flag2 = True
- # 有内容但是是页眉
- if not connect_flag2:
- tables = soup.findAll('table')
- if tables:
- first_table = tables[0]
- rows = first_table.findAll('tr')
- if rows:
- first_row = rows[0]
- col_text_len_list = [len(x.text) for x in first_row]
- col_text_list = [x.text for x in first_row]
- # 文字大于60且第一个为空
- if not connect_flag2 and len(h[:first_table_start]) <= 60 and col_text_len_list[0] == 0 and max(
- col_text_len_list) >= 30:
- connect_flag2 = True
- rule_b[0] = 1
- # 有文字格子数占一半一下且第一个格子为空
- if not connect_flag2 and col_text_len_list.count(0) >= len(col_text_len_list) / 2 and \
- col_text_len_list[0] == 0:
- connect_flag2 = True
- # 表格前最多只有一行且第一个格子为纯数字
- if not connect_flag2 and len(col_text_list) > 0 and \
- len(re.findall('<div>', h[:first_table_start])) <= 0 and \
- len(re.findall('\d', col_text_list[0])) == len(col_text_list[0]):
- connect_flag2 = True
- # if not connect_flag2 and len(re.findall('<div>', h[:first_table_start])) <= 0 and len(re.findall('[\u4e00-\u9fff]', h[:first_table_start])) <= 25:
- # connect_flag2 = True
- connect_flag_list.append([i, connect_flag2, connect_flag1])
- connect_rule_dict[i] = [rule_b, rule_a]
- if show:
- print('connect_flag_list', connect_flag_list)
- print('connect_rule_dict', connect_rule_dict)
- # 根据条件1合并需连接页码,形成组
- connect_pages_list = []
- if connect_flag_list:
- temp_list = [connect_flag_list[0]]
- for i in range(1, len(connect_flag_list)):
- c = connect_flag_list[i]
- if c[1] and temp_list[-1][2]:
- temp_list.append(c)
- else:
- if temp_list:
- connect_pages_list.append(temp_list)
- temp_list = [c]
- # connect_pages_list.append([c])
- if temp_list:
- connect_pages_list.append(temp_list)
- if show:
- print('connect_pages_list', connect_pages_list)
- # 判断后续条件:判断组内列数是否相同
- connect_pages_list2 = []
- for c_list in connect_pages_list:
- if len(c_list) == 1:
- connect_pages_list2.append(c_list)
- else:
- col_cnt_list = []
- # 单元格可能被复制了,相同的合并当做一列
- merge_col_cnt_list = []
- for c in c_list:
- soup = soup_list[c[0]]
- table1 = soup.findAll('table')[-1]
- table2 = soup.findAll('table')[0]
- tr1 = table1.findAll('tr')
- tr2 = table2.findAll('tr')
- td1 = tr1[-1].findAll('td')
- td2 = tr2[0].findAll('td')
- col_cnt_list.append([len(td2), len(td1)])
- # # 计算合并重复文本格子后的列数
- # last_text = td1[0].text
- # merge_td1 = [last_text]
- # for td in td1:
- # if td.text == last_text:
- # continue
- # else:
- # merge_td1.append(td.text)
- # last_text = td.text
- # last_text = td2[0].text
- # merge_td2 = [last_text]
- # for td in td2:
- # if td.text == last_text:
- # continue
- # else:
- # merge_td2.append(td.text)
- # last_text = td.text
- # merge_col_cnt_list.append([len(merge_td2), len(merge_td1)])
- # 判断
- new_c_list = [c_list[0]]
- # print('col_cnt_list', col_cnt_list)
- for i in range(len(col_cnt_list) - 1):
- if col_cnt_list[i][1] != col_cnt_list[i + 1][0]:
- # and merge_col_cnt_list[i][1] != merge_col_cnt_list[i + 1][0]:
- connect_pages_list2.append(new_c_list)
- new_c_list = [c_list[i + 1]]
- else:
- new_c_list.append(c_list[i + 1])
- if new_c_list:
- connect_pages_list2.append(new_c_list)
- if show:
- print('connect_pages_list2', connect_pages_list2)
- # 判断连接的两个表格是否需要补单元格内容
- for c_list in connect_pages_list2:
- for i in range(len(c_list) - 1):
- page_index1 = c_list[i][0]
- page_index2 = c_list[i + 1][0]
- html2 = html_list[page_index2]
- soup2 = soup_list[page_index2]
- rule1 = connect_rule_dict.get(page_index1)[1]
- rule2 = connect_rule_dict.get(page_index2)[0]
- # print('rule1', rule1)
- # if rule2[0]:
- table1 = BeautifulSoup(rule1[1], 'lxml').findAll('table')[0]
- table2 = BeautifulSoup(rule2[1], 'lxml').findAll('table')[0]
- add_td_value = []
- # 获取最后一行td
- for tr in table1.findAll('tr')[::-1]:
- temp_list = []
- for td in tr.findAll('td'):
- temp_list.append(td.get_text())
- add_td_value = temp_list
- break
- # print('add_td_value', add_td_value)
- tr_index = 0
- for tr in table2.findAll('tr'):
- temp_list = []
- for td in tr.findAll('td'):
- if len(td.get_text()) < 1:
- temp_list.append(0)
- else:
- temp_list.append(1)
- # print('temp_list', temp_list)
- if temp_list and add_td_value and len(temp_list) == len(add_td_value) \
- and 1 in temp_list and temp_list[0] != 1 \
- and 1 not in temp_list[:temp_list.index(1)]:
- for j in range(len(temp_list)):
- if temp_list[j] == 0:
- tr.findAll('td')[j].string = add_td_value[j]
- # else:
- # # 只有第一行,且列数大于3,且只有一列有值情况下,上下两行文本合并
- # if tr_index == 0 and len(temp_list) >= 3 and temp_list.count(1) == 1:
- # tr.findAll('td')[j].string += add_td_value[j]
- # print('tr.findAll(td)[0]', tr.findAll('td')[0])
- tr_index += 1
- soup2.findAll('table')[0].replace_with(table2)
- html_list[page_index2] = str(soup2)
- # 符合连接条件的拼接表格
- new_html_list = []
- for c_list in connect_pages_list2:
- if len(c_list) == 1:
- new_html_list.append(html_list[c_list[0][0]])
- continue
- new_html = ''
- for c in c_list:
- match = re.finditer('</table>', new_html)
- last_table_index = None
- for m in match:
- last_table_index = m.span()[0]
- new_html += html_list[c[0]]
- # print('html_list[c[0]]', html_list[c[0]])
- if last_table_index is None:
- continue
- match = re.finditer('<table border="1">', new_html[last_table_index:])
- first_table_index = None
- for m in match:
- first_table_index = last_table_index + m.span()[1]
- break
- if first_table_index is None:
- continue
- # print('re', re.findall('</table>.*?<table border="1">', new_html[last_table_index:first_table_index]))
- # 非贪婪匹配
- new_html_sub = re.sub('</table>.*?<table border="1">',
- '<tr><td>#@#@#</td></tr>',
- new_html[last_table_index:first_table_index])
- new_html = new_html[:last_table_index] + new_html_sub + new_html[first_table_index:]
- # print('new_html', new_html)
- # new_html = new_html[:-5]
- # ([-/第页0-9]|<div>|</div>)*
- # 非贪婪匹配
- # match = re.finditer('</table>.*?<table border="1">', new_html)
- # for m in match:
- # if '#@#@#' in m.group():
- #
- # new_html = re.sub('</table>.*#@#@#.*?<table border="1">',
- # '<tr><td>#@#@#</td></tr>',
- # new_html)
- # print('new_html', new_html)
- soup = BeautifulSoup(new_html, 'lxml')
- trs = soup.findAll('tr')
- decompose_trs = []
- for i in range(len(trs)):
- if trs[i].get_text() == '#@#@#':
- td1 = trs[i - 1].findAll('td')
- td2 = trs[i + 1].findAll('td')
- if td2[0].get_text() == '':
- # 解决连续多页是一行表格,该行会被去掉问题
- find_father = False
- for father, son in decompose_trs:
- # print('son', son)
- # print('td1', trs[i - 1])
- if father != '' and son == trs[i - 1]:
- td_father = father.findAll('td')
- for j in range(len(td_father)):
- # print('td_father[j].string3', td_father[j].string)
- td_father[j].string = td_father[j].get_text() + td2[j].get_text()
- # print('td_father[j].string4', td_father[j].string)
- find_father = True
- decompose_trs.append([father, trs[i + 1]])
- break
- if not find_father:
- for j in range(len(td1)):
- # print('td1[j].string1', td1[j].string)
- td1[j].string = td1[j].get_text() + td2[j].get_text()
- # print('td1[j].string2', td1[j].string)
- decompose_trs.append([trs[i - 1], trs[i + 1]])
- # print('trs[i + 1]', trs[i + 1])
- # trs[i + 1].decompose()
- # print('trs[i-1]', trs[i-1])
- # trs[i].decompose()
- decompose_trs.append(['', trs[i]])
- # print('decompose_trs', decompose_trs)
- # for father, son in decompose_trs:
- # print('father', father)
- # print('son', son)
- # print('len(decompose_trs)', len(decompose_trs))
- for father, son in decompose_trs:
- for tr in trs:
- if tr == son:
- tr.decompose()
- break
- new_html = str(soup)
- new_html_list.append(new_html)
- html_str = ''
- for h in new_html_list:
- html_str += h
- return [html_str]
- def get_html(self):
- if self._doc.error_code is not None:
- return self._doc.error_code
- self.convert()
- if self._doc.error_code is not None:
- return self._doc.error_code
- html = self._doc.get_html(return_list=True)
- if self._doc.error_code is not None:
- return self._doc.error_code
- # 表格连接
- try:
- html = self.connect_table(html)
- except:
- traceback.print_exc()
- return [-12]
- return html
- @memory_decorator
- def delete_water_mark(self, lt_text_list, page_bbox, times=5):
- # 删除过多重复字句,为水印
- duplicate_dict = {}
- for _obj in lt_text_list:
- t = _obj.get_text()
- if t in duplicate_dict.keys():
- duplicate_dict[t][0] += 1
- duplicate_dict[t][1].append(_obj)
- else:
- duplicate_dict[t] = [1, [_obj]]
- delete_text = []
- for t in duplicate_dict.keys():
- if duplicate_dict[t][0] >= times:
- obj_list = duplicate_dict[t][1]
- obj_list.sort(key=lambda x: x.bbox[3])
- obj_distance_h = abs(obj_list[-1].bbox[3] - obj_list[0].bbox[1])
- obj_list.sort(key=lambda x: x.bbox[2])
- obj_distance_w = abs(obj_list[-1].bbox[2] - obj_list[0].bbox[0])
- if obj_distance_h >= abs(page_bbox[1] - page_bbox[3]) * 0.7 \
- and obj_distance_w >= abs(page_bbox[0] - page_bbox[2]) * 0.7:
- delete_text.append(t)
- # 排除字符少的
- temp_list = []
- for t in delete_text:
- if len(t) <= 5:
- continue
- temp_list.append(t)
- delete_text = temp_list
- temp_text_list = []
- for _obj in lt_text_list:
- t = _obj.get_text()
- if t not in delete_text:
- temp_text_list.append(_obj)
- return temp_text_list, delete_text
- @memory_decorator
- def delete_water_mark_by_location(self, lt_text_box_list):
- x_text_box_dict = {}
- # 水印,x坐标相同,且长度为1
- for lt_text_box in lt_text_box_list:
- x1, y1, x2, y2 = lt_text_box.bbox
- text = lt_text_box.get_text()
- if len(text) != 1:
- continue
- key = f'{x1}-{x2}-{text}'
- if key in x_text_box_dict:
- x_text_box_dict[key] += [lt_text_box]
- else:
- x_text_box_dict[key] = [lt_text_box]
- len1 = len(lt_text_box_list)
- for key, box_list in x_text_box_dict.items():
- if len(box_list) >= 3:
- for box in box_list:
- if box in lt_text_box_list:
- lt_text_box_list.remove(box)
- len2 = len(lt_text_box_list)
- if len1 != len2:
- log('delete_water_mark_by_location box num ' + str(len1) + ' -> ' + str(len2))
- return lt_text_box_list
- def delete_water_mark_by_color(self, lt_text_list):
- # 删除浅色字体,大概率为水印
- # 1. 单个char颜色透明度0.8以上
- # 2. 整个text_box中所有char颜色透明度0.4以上
- water_mark_text_box_list = []
- threshold1 = 0.8
- threshold2 = 0.4
- for lt_text_box in lt_text_list:
- # print('lt_text_box11', lt_text_box.get_text())
- for lt_text_line in lt_text_box:
- water_mark_char_cnt = 0
- lt_text_line_len = 0
- for lt_char in lt_text_line:
- lt_text_line_len += 1
- color = lt_char.graphicstate.ncolor
- print('color', lt_char.get_text(), color, lt_char.matrix)
- find_flag = 0
- if color is None:
- continue
- elif isinstance(color, (tuple, list)):
- r, g, b = color
- if r >= threshold2 and g >= threshold2 and b >= threshold2:
- water_mark_char_cnt += 1
- if r >= threshold1 and g >= threshold1 and b >= threshold1:
- find_flag = 1
- else:
- if color >= threshold2:
- water_mark_char_cnt += 1
- if color >= threshold1:
- find_flag = 1
- if find_flag:
- print('water mark char', lt_char.get_text(), color)
- water_mark_text_box_list.append(lt_text_box)
- lt_char._text = ''
- if lt_text_line_len == water_mark_char_cnt:
- for lt_char in lt_text_line:
- lt_char._text = ''
- water_mark_text_box_list.append(lt_text_box)
- return lt_text_list
- def delete_water_mark_by_rotate(self, lt_text_list):
- water_mark_text_box_list = []
- sin_range = [0.3, 0.94]
- for lt_text_box in lt_text_list:
- if '.......' in lt_text_box.get_text():
- # print('....... lt_text_box continue')
- continue
- for lt_text_line in lt_text_box:
- for lt_char in lt_text_line:
- matrix = lt_char.matrix
- # print('matrix', lt_char.get_text(), matrix)
- if matrix is None:
- continue
- _, b, c, _, _, _ = matrix
- if abs(b) == abs(c) and b != c \
- and sin_range[0] <= abs(b) <= sin_range[1] \
- and sin_range[0] <= abs(c) <= sin_range[1]:
- # print('water mark char', lt_char.get_text(), matrix)
- water_mark_text_box_list.append(lt_text_box)
- lt_char._text = ''
- return lt_text_list
- def resize_image(self, img_path, max_size=2000):
- _img = cv2.imread(img_path)
- if _img.shape[0] <= max_size or _img.shape[1] <= max_size:
- return
- else:
- resize_axis = 0 if _img.shape[0] >= _img.shape[1] else 1
- ratio = max_size / _img.shape[resize_axis]
- new_shape = [0, 0]
- new_shape[resize_axis] = max_size
- new_shape[1 - resize_axis] = int(_img.shape[1 - resize_axis] * ratio)
- _img = cv2.resize(_img, (new_shape[1], new_shape[0]))
- cv2.imwrite(img_path, _img)
- def get_single_pdf(self, path, page_no):
- start_time = time.time()
- try:
- pdf_origin = copy.deepcopy(self.doc_pypdf2)
- pdf_new = copy.deepcopy(self.doc_pypdf2_new)
- pdf_new.addPage(pdf_origin.getPage(page_no))
- path_new = path.split(".")[0] + "_split.pdf"
- with open(path_new, "wb") as ff:
- pdf_new.write(ff)
- log("page_no: " + str(page_no) + " get_single_pdf cost: " + str(time.time() - start_time))
- return path_new
- except PyPDF2.utils.PdfReadError as e:
- return [-3]
- except Exception as e:
- log("page_no: " + str(page_no) + " get_single_pdf error!")
- return [-3]
- def pymupdf_read_page_images(self, page_no):
- try:
- self.init_package("PyMuPDF")
- # 获取指定页面
- page = self.doc_pymupdf.load_page(page_no)
- # 获取页面中所有图片的信息
- image_list = page.get_images(full=True)
- # 存储提取的图片信息
- extracted_images = []
- # 遍历图片列表
- for img_index, img_info in enumerate(image_list):
- xref = img_info[0] # 图片xref编号
- base_image = self.doc_pymupdf.extract_image(xref)
- image_bytes = base_image["image"] # 图片字节数据
- image_ext = base_image["ext"] # 图片扩展名
- # 获取图片在页面中的位置和大小
- bbox = img_info[0:4] # x0, y0, x1, y1
- # print('img_info', img_info)
- width = img_info[2] - img_info[0] # 计算宽度
- height = img_info[3] - img_info[1] # 计算高度
- # 构建图片信息字典
- img_data = {
- "xref": xref,
- "width": width,
- "height": height,
- "image": image_bytes,
- "ext": image_ext,
- "bbox": bbox
- }
- extracted_images.append(img_data)
- image_obj_list = []
- for index, d in enumerate(extracted_images):
- temp_path = self.unique_type_dir + 'page' + str(page_no) \
- + '_lt2_' + str(index) + '.jpg'
- image_bytes = d.get("image")
- bbox = d.get('bbox')
- with open(temp_path, 'wb') as f:
- f.write(image_bytes)
- _image = _Image(image_bytes, temp_path, bbox)
- image_md5 = get_md5_from_bytes(image_bytes)
- image_obj_list.append([_image, image_md5])
- except:
- traceback.print_exc()
- return False
- for _image, image_md5 in image_obj_list:
- self._page.add_child(_image)
- self.md5_image_obj_list.append([image_md5, _image])
- return True
- def pymupdf_get_whole_page_image(self, page_no):
- image_obj_list = []
- page_image = self.get_page_image(page_no)
- if judge_error_code(page_image):
- self._page.error_code = page_image
- return False
- else:
- _image = _Image(page_image[1], page_image[0])
- _image.is_from_pdf = True
- _image.is_reverse = False
- image_md5 = get_md5_from_bytes(page_image[1])
- image_obj_list.append([_image, image_md5])
- for _image, image_md5 in image_obj_list:
- self._page.add_child(_image)
- self.md5_image_obj_list.append([image_md5, _image])
- return True
- def pdfminer_read_page_images(self, lt_image_list, page_no):
- # 图表对象
- image_obj_list = []
- for image in lt_image_list:
- try:
- # print("pdf2text LTImage size", page_no, image.width, image.height)
- # image_stream = image.stream.get_data()
- # print('image.stream.get_filters()', image.stream.get_filters())
- image_stream = image.stream.get_data()
- # 小的图忽略
- if image.width <= 300 and image.height <= 300:
- continue
- # 查看提取的图片高宽,太大则用pdf输出图进行ocr识别
- img_test = Image.open(io.BytesIO(image_stream))
- # img_test = self.pdfminer_stream_to_image(image)
- # if image.height >= 1000 and image.width >= 1000:
- # page_image = self.get_page_image(page_no)
- # if judge_error_code(page_image):
- # self._page.error_code = page_image
- # else:
- # _image = _Image(page_image[1], page_image[0])
- # _image.is_from_pdf = True
- # _image.is_reverse = False
- # image_md5 = get_md5_from_bytes(page_image[1])
- # image_obj_list.append([_image, image_md5])
- # # 比较小的图则直接保存用ocr识别
- # else:
- temp_path = self.unique_type_dir + 'page' + str(page_no) \
- + '_lt_' + str(lt_image_list.index(image)) + '.jpg'
- img_test.save(temp_path)
- with open(temp_path, "rb") as ff:
- image_stream = ff.read()
- _image = _Image(image_stream, temp_path, image.bbox)
- self._page.add_child(_image)
- image_md5 = get_md5_from_bytes(image_stream)
- self.md5_image_obj_list.append([image_md5, _image])
- except Exception:
- log("page_no: " + str(page_no) + " pdfminer read image fail!")
- traceback.print_exc()
- return False
- for _image, image_md5 in image_obj_list:
- self._page.add_child(_image)
- self.md5_image_obj_list.append([image_md5, _image])
- return True
- def get_text_font():
- def flags_decomposer(flags):
- """Make font flags human readable."""
- l = []
- if flags & 2 ** 0:
- l.append("superscript")
- if flags & 2 ** 1:
- l.append("italic")
- if flags & 2 ** 2:
- l.append("serifed")
- else:
- l.append("sans")
- if flags & 2 ** 3:
- l.append("monospaced")
- else:
- l.append("proportional")
- if flags & 2 ** 4:
- l.append("bold")
- return ", ".join(l)
- def get_underlined_textLines(page):
- """
- 获取某页pdf上的所有下划线文本信息
- :param page: fitz中的一页
- :return: list of tuples,每个tuple都是一个完整的下划线覆盖的整体:[(下划线句, 所在blk_no, 所在line_no), ...]
- """
- paths = page.get_drawings() # get drawings on the current page
- # 获取该页内所有的height很小的bbox。因为下划线其实大多是这种矩形
- # subselect things we may regard as lines
- lines = []
- for p in paths:
- for item in p["items"]:
- if item[0] == "l": # an actual line
- p1, p2 = item[1:]
- if p1.y == p2.y:
- lines.append((p1, p2))
- elif item[0] == "re": # a rectangle: check if height is small
- r = item[1]
- if r.width > r.height and r.height <= 2:
- lines.append((r.tl, r.tr)) # take top left / right points
- # 获取该页的`max_lineheight`,用于下面比较距离使用
- blocks = page.get_text("dict", flags=11)["blocks"]
- max_lineheight = 0
- for b in blocks:
- for l in b["lines"]:
- bbox = fitz.Rect(l["bbox"])
- if bbox.height > max_lineheight:
- max_lineheight = bbox.height
- underlined_res = []
- # 开始对下划线内容进行查询
- # make a list of words
- words = page.get_text("words")
- # if underlined, the bottom left / right of a word
- # should not be too far away from left / right end of some line:
- for wdx, w in enumerate(words): # w[4] is the actual word string
- r = fitz.Rect(w[:4]) # first 4 items are the word bbox
- for p1, p2 in lines: # check distances for start / end points
- if abs(r.bl - p1) <= max_lineheight: # 当前word的左下满足下划线左下
- if abs(r.br - p2) <= max_lineheight: # 当前word的右下满足下划线右下(单个词,无空格)
- print(f"Word '{w[4]}' is underlined! Its block-line number is {w[-3], w[-2]}")
- underlined_res.append((w[4], w[-3], w[-2])) # 分别是(下划线词,所在blk_no,所在line_no)
- break # don't check more lines
- else: # 继续寻找同line右侧的有缘人,因为有些下划线覆盖的词包含多个词,多个词之间有空格
- curr_line_num = w[-2] # line nunmber
- for right_wdx in range(wdx + 1, len(words), 1):
- _next_w = words[right_wdx]
- if _next_w[-2] != curr_line_num: # 当前遍历到的右侧word已经不是当前行的了(跨行是不行的)
- break
- _r_right = fitz.Rect(_next_w[:4]) # 获取当前同行右侧某word的方框4点
- if abs(_r_right.br - p2) <= max_lineheight: # 用此word右下点和p2(目标下划线右上点)算距离,距离要小于max_lineheight
- print(
- f"Word '{' '.join([_one_word[4] for _one_word in words[wdx:right_wdx + 1]])}' is underlined! " +
- f"Its block-line number is {w[-3], w[-2]}")
- underlined_res.append(
- (' '.join([_one_word[4] for _one_word in words[wdx:right_wdx + 1]]),
- w[-3], w[-2])
- ) # 分别是(下划线词,所在blk_no,所在line_no)
- break # don't check more lines
- return underlined_res
- _p = r'C:\Users\Administrator\Desktop\test_pdf\error2-2.pdf'
- doc_pymupdf = read_pymupdf(_p)
- page = doc_pymupdf[0]
- blocks = page.get_text("dict", flags=11)["blocks"]
- for b in blocks: # iterate through the text blocks
- for l in b["lines"]: # iterate through the text lines
- for s in l["spans"]: # iterate through the text spans
- print("")
- font_properties = "Font: '%s' (%s), size %g, color #%06x" % (
- s["font"], # font name
- flags_decomposer(s["flags"]), # readable font flags
- s["size"], # font size
- s["color"], # font color
- )
- print(s)
- print("Text: '%s'" % s["text"]) # simple print of text
- print(font_properties)
- get_underlined_textLines(page)
- # 以下为现成pdf单页解析接口
- class ParseSentence:
- def __init__(self, bbox, fontname, fontsize, _text, _title, title_text, _pattern, title_degree, is_outline,
- outline_location, page_no):
- (x0, y0, x1, y1) = bbox
- self.x0 = x0
- self.y0 = y0
- self.x1 = x1
- self.y1 = y1
- self.bbox = bbox
- self.fontname = fontname
- self.fontsize = fontsize
- self.text = _text
- self.title = _title
- self.title_text = title_text
- self.groups = _pattern
- self.title_degree = title_degree
- self.is_outline = is_outline
- self.outline_location = outline_location
- self.page_no = page_no
- def __repr__(self):
- return "%s,%s,%s,%d,%s" % (self.text, self.title, self.is_outline, self.outline_location, str(self.bbox))
- class ParseUtils:
- @staticmethod
- def getFontinfo(_page):
- for _obj in _page._objs:
- if isinstance(_obj, (LTTextBoxHorizontal, LTTextBoxVertical)):
- for textline in _obj._objs:
- done = False
- for lchar in textline._objs:
- if isinstance(lchar, (LTChar)):
- _obj.fontname = lchar.fontname
- _obj.fontsize = lchar.size
- done = True
- break
- if done:
- break
- @staticmethod
- def recognize_sentences(list_textbox, filter_objs, page_bbox, page_no,
- remove_space=True, sourceP_LB=True):
- list_textbox.sort(key=lambda x: x.bbox[0])
- list_textbox.sort(key=lambda x: x.bbox[3], reverse=sourceP_LB)
- cluster_textbox = []
- for _textbox in list_textbox:
- if _textbox in filter_objs:
- continue
- _find = False
- for _ct in cluster_textbox:
- if abs(_ct["y"] - _textbox.bbox[1]) < 5:
- _find = True
- _ct["textbox"].append(_textbox)
- if not _find:
- cluster_textbox.append({"y": _textbox.bbox[1], "textbox": [_textbox]})
- cluster_textbox.sort(key=lambda x: x["y"], reverse=sourceP_LB)
- list_sentences = []
- for _line in cluster_textbox:
- _textboxs = _line["textbox"]
- _textboxs.sort(key=lambda x: x.bbox[0])
- _linetext = _textboxs[0].get_text()
- for _i in range(1, len(_textboxs)):
- if abs(_textboxs[_i].bbox[0] - _textboxs[_i - 1].bbox[2]) > 60:
- if _linetext and _linetext[-1] not in (",", ",", "。", ".", "、", ";"):
- _linetext += "=,="
- _linetext += _textboxs[_i].get_text()
- _linetext = re.sub("[\s\r\n]", "", _linetext)
- _bbox = (_textboxs[0].bbox[0], _textboxs[0].bbox[1],
- _textboxs[-1].bbox[2], _textboxs[-1].bbox[3])
- _title = None
- _pattern_groups = None
- title_text = ""
- if not _title:
- _groups = ParseUtils.find_title_by_pattern(_textboxs[0].get_text())
- if _groups:
- _title = _groups[0][0]
- title_text = _groups[0][1]
- _pattern_groups = _groups
- if not _title:
- _groups = ParseUtils.find_title_by_pattern(_linetext)
- if _groups:
- _title = _groups[0][0]
- title_text = _groups[0][1]
- _pattern_groups = _groups
- if not _title:
- _title = ParseUtils.rec_incenter(_bbox, page_bbox)
- title_degree = 2
- if not _title:
- _linetext = _linetext.replace("=,=", ",")
- else:
- _linetext = _linetext.replace("=,=", "")
- title_degree = int(_title.split("_")[1])
- # 页码
- if ParseUtils.rec_incenter(_bbox, page_bbox) and re.search("^\d+$", _linetext) is not None:
- continue
- if _linetext == "" or re.search("^,+$", _linetext) is not None:
- continue
- is_outline = False
- outline_location = -1
- _search = re.search("(?P<text>.+?)\.{5,}(?P<nums>\d+)$", _linetext)
- if _search is not None:
- is_outline = True
- _linetext = _search.group("text")
- outline_location = int(_search.group("nums"))
- list_sentences.append(
- ParseSentence(_bbox, _textboxs[-1].__dict__.get("fontname"), _textboxs[-1].__dict__.get("fontsize"),
- _linetext, _title, title_text, _pattern_groups, title_degree, is_outline,
- outline_location, page_no))
- # for _sen in list_sentences:
- # print(_sen.__dict__)
- return list_sentences
- @staticmethod
- def find_title_by_pattern(_text,
- _pattern="(?P<title_1>(?P<title_1_index_0_0>^第?)(?P<title_1_index_1_1>[一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+)(?P<title_1_index_2_0>[、章]))|" \
- "(?P<title_3>^(?P<title_3_index_0_1>[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+))|" \
- "(?P<title_4>^(?P<title_4_index_0_0>第?)(?P<title_4_index_1_1>[一二三四五六七八九十]+)(?P<title_4_index_2_0>[节]))|" \
- "(?P<title_11>^(?P<title_11_index_0_0>\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-])(?P<title_11_index_1_1>\d{1,2})(?P<title_11_index_2_0>[\..、\s\-]))|" \
- "(?P<title_10>^(?P<title_10_index_0_0>\d{1,2}[\..、\s\-]\d{1,2}[\..、\s\-])(?P<title_10_index_1_1>\d{1,2})(?P<title_10_index_2_0>[\..、\s\-]))|" \
- "(?P<title_7>^(?P<title_7_index_0_0>\d{1,2}[\..、\s\-])(?P<title_7_index_1_1>\d{1,2})(?P<title_7_index_2_0>[\..、\s\-]))|" \
- "(?P<title_6>^(?P<title_6_index_0_1>\d{1,2})(?P<title_6_index_1_0>[\..、\s\-]))|" \
- "(?P<title_15>^(?P<title_15_index_0_0>(?)(?P<title_15_index_1_1>\d{1,2})(?P<title_15_index_2_0>)))|" \
- "(?P<title_17>^(?P<title_17_index_0_0>(?)(?P<title_17_index_1_1>[a-zA-Z]+)(?P<title_17_index_2_0>)))|"
- "(?P<title_19>^(?P<title_19_index_0_0>(?)(?P<title_19_index_1_1>[一二三四五六七八九十ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]+)(?P<title_19_index_2_0>)))|" \
- ):
- _se = re.search(_pattern, _text)
- groups = []
- if _se is not None:
- _gd = _se.groupdict()
- for k, v in _gd.items():
- if v is not None:
- groups.append((k, v))
- if len(groups):
- groups.sort(key=lambda x: x[0])
- return groups
- return None
- @staticmethod
- def rec_incenter(o_bbox, p_bbox):
- p_width = p_bbox[2] - p_bbox[0]
- l_space = (o_bbox[0] - p_bbox[0]) / p_width
- r_space = (p_bbox[2] - o_bbox[2]) / p_width
- if abs((l_space - r_space)) < 0.1 and l_space > 0.2:
- return "title_2"
- @staticmethod
- def is_first_title(_title):
- if _title is None:
- return False
- if re.search("^\d+$", _title) is not None:
- if int(_title) == 1:
- return True
- return False
- if re.search("^[一二三四五六七八九十百]+$", _title) is not None:
- if _title == "一":
- return True
- return False
- if re.search("^[a-z]+$", _title) is not None:
- if _title == "a":
- return True
- return False
- if re.search("^[A-Z]+$", _title) is not None:
- if _title == "A":
- return True
- return False
- if re.search("^[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]$", _title) is not None:
- if _title == "Ⅰ":
- return True
- return False
- return False
- @staticmethod
- def get_next_title(_title):
- if re.search("^\d+$", _title) is not None:
- return str(int(_title) + 1)
- if re.search("^[一二三四五六七八九十百]+$", _title) is not None:
- _next_title = ParseUtils.make_increase(['一', '二', '三', '四', '五', '六', '七', '八', '九', '十'],
- re.sub("[十百]", '', _title))
- _next_title = list(_next_title)
- _next_title.reverse()
- if _next_title[-1] != "十":
- if len(_next_title) >= 2:
- _next_title.insert(-1, '十')
- if len(_next_title) >= 4:
- _next_title.insert(-3, '百')
- if _title[0] == "十":
- if _next_title == "十":
- _next_title = ["二", "十"]
- _next_title.insert(0, "十")
- _next_title = "".join(_next_title)
- return _next_title
- if re.search("^[a-z]+$", _title) is not None:
- _next_title = ParseUtils.make_increase([chr(i + ord('a')) for i in range(26)], _title)
- _next_title = list(_next_title)
- _next_title.reverse()
- return "".join(_next_title)
- if re.search("^[A-Z]+$", _title) is not None:
- _next_title = ParseUtils.make_increase([chr(i + ord('A')) for i in range(26)], _title)
- _next_title = list(_next_title)
- _next_title.reverse()
- return "".join(_next_title)
- if re.search("^[ⅠⅡⅢⅣⅤⅥⅦⅧⅨⅩⅪⅫ]$", _title) is not None:
- _sort = ["Ⅰ", "Ⅱ", "Ⅲ", "Ⅳ", "Ⅴ", "Ⅵ", "Ⅶ", "Ⅷ", "Ⅸ", "Ⅹ", "Ⅺ", "Ⅻ"]
- _index = _sort.index(_title)
- if _index < len(_sort) - 1:
- return _sort[_index + 1]
- return None
- @staticmethod
- def make_increase(_sort, _title, _add=1):
- if len(_title) == 0 and _add == 0:
- return ""
- if len(_title) == 0 and _add == 1:
- return _sort[0]
- _index = _sort.index(_title[-1])
- next_index = (_index + _add) % len(_sort)
- next_chr = _sort[next_index]
- if _index == len(_sort) - 1:
- _add = 1
- else:
- _add = 0
- return next_chr + ParseUtils.make_increase(_sort, _title[:-1], _add)
- @staticmethod
- def rec_serial(_text, o_bbox, p_bbox, fontname, _pattern="(?P<title_1>^[一二三四五六七八九十]+[、])|" \
- "(?P<title_2>^\d+[\.、\s])|" \
- "(?P<title_3>^\d+\.\d+[\.、\s])|" \
- "(?P<title_4>^\d+\.\d+\.\d+[\.、\s])|" \
- "(?P<title_5>^\d+\.\d+\.\d+\.\d+[\.、\s])"):
- # todo :recog the serial of the sentence
- _se = re.search(_pattern, _text)
- if _se is not None:
- _gd = _se.groupdict()
- for k, v in _gd.items():
- if v is not None:
- return k
- return None
- if __name__ == '__main__':
- _pp = r'D:\Project\format_conversion_maxcompute\save_b_table_pdf/e-116.pdf'
- # _pp = r'C:\Users\Administrator\Downloads\1746582280828.pdf'
- _html = PDFConvert(_pp, r"D:\Project\format_conversion_maxcompute\format_convert\temp", None).get_html()
- with open('../result.html', 'w', encoding='utf-8') as f:
- f.write('<!DOCTYPE HTML><head><meta charset="UTF-8"></head>' + _html[0])
|