import base64 import json import os import random import sys import time from glob import glob import requests sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from pdfminer.converter import PDFPageAggregator from pdfminer.layout import LAParams, LTLine from pdfminer.pdfdocument import PDFDocument from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter from pdfminer.pdfpage import PDFPage from pdfminer.pdfparser import PDFParser from pdfplumber import PDF from otr.table_line_pdf import _plot sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from format_convert.utils import get_platform, request_post, get_md5_from_bytes from format_convert.convert import to_html import multiprocessing as mp def test_one(p, page_no_range=None, from_remote=False, timeout=300, save_middle=None): start_time = time.time() with open(p, "rb") as f: file_bytes = f.read() file_base64 = base64.b64encode(file_bytes) _md5 = get_md5_from_bytes(file_bytes) data = {"file": file_base64, "type": p.split(".")[-1], "filemd5": _md5, 'page_no': page_no_range, 'timeout': timeout, 'save_middle': save_middle} if from_remote: _url = 'http://121.46.18.113:15010/convert' # _url = 'http://192.168.2.103:15010/convert' # _url = 'http://192.168.2.102:15011/convert' # _url = 'http://172.16.160.65:15010/convert' # _url = 'http://127.0.0.1:15010/convert' result = json.loads(request_post(_url, data, time_out=timeout+20)) text_str = "" for t in result.get("result_html"): text_str += t to_html(os.path.dirname(os.path.abspath(__file__)) + "/../result.html", text_str) else: print("only support remote!") print(_md5) print('第', page_no_range.split(',')[0], '页到第', page_no_range.split(',')[-1], '页') print("result_text", result.get("result_text")[0][:20]) print("is_success", result.get("is_success")) print(time.time()-start_time) def test_path(): # _url = 'http://121.46.18.113:15010/convert' _url = 'http://192.168.0.115:15010/convert' print(_url) p = '/data/fangjiasheng/format_conversion_maxcompute/1.png' data = {"file_path": p, "type": p.split(".")[-1], "filemd5": 100, 'page_no': '1,-1', 'timeout': 10000, 'save_middle': None} print(str(data)) # result = json.loads(request_post(_url, data, time_out=1000)) result = json.loads(requests.post(_url, data)) text_str = "" for t in result.get("result_html"): text_str += t to_html(os.path.dirname(os.path.abspath(__file__)) + "/../result.html", text_str) print("result_text", result.get("result_text")[0][:20]) print("is_success", result.get("is_success")) def test_duplicate(path_list, process_no=None): start_time = time.time() # random.shuffle(path_list) for i in range(10): if i % 10 == 0: if process_no is not None: print("Process", process_no, i*len(path_list), time.time()-start_time) else: print("Loop", i*len(path_list), time.time()-start_time) for p in path_list: test_one(p, from_remote=True) def test_maxcompute(p, page_no_range=None): from format_convert import convert start_time = time.time() with open(p, "rb") as f: file_bytes = f.read() file_base64 = base64.b64encode(file_bytes) _md5 = get_md5_from_bytes(file_bytes) data = {"file": file_base64, "type": p.split(".")[-1], "filemd5": _md5, 'page_no': page_no_range} result = convert.convert(data) text_str = "" for t in result.get("result_html"): text_str += t to_html(os.path.dirname(os.path.abspath(__file__)) + "/../result.html", text_str) print(_md5) print('第', page_no_range.split(',')[0], '页到第', page_no_range.split(',')[-1], '页') print("result_text", result.get("result_text")[0][:20]) print("is_success", result.get("is_success")) print(time.time()-start_time) if __name__ == '__main__': if get_platform() == "Windows": # file_path = "C:/Users/Administrator/Desktop/2.png" # file_path = "C:/Users/Administrator/Desktop/test_xls/error4.xls" # file_path = "C:/Users/Administrator/Desktop/test_doc/error5.doc" # file_path = "D:/BIDI_DOC/比地_文档/1677829036789.pdf" # file_path = "D:/BIDI_DOC/比地_文档/2022/Test_ODPS/1624325845476.pdf" # file_path = "C:/Users/Administrator/Downloads/W020230512399773694376.jpg" # file_path = "C:/Users/Administrator/Desktop/test_doc/error14.docx" file_path = "C:/Users/Administrator/Desktop/test_image/error9-1.png" # file_path = "C:/Users/Administrator/Desktop/test_b_table/error1.png" # file_path = "C:/Users/Administrator/Desktop/test_pdf/直接读表格线error/error62.pdf" # file_path = "C:/save_b_table/0-0895e32470613dd7be1139eefd1342c4.png" else: file_path = "1660296734009.pdf" test_one(file_path, page_no_range='1,-1', from_remote=True, timeout=1000, save_middle=None) # test_path() # file_path = "C:/Users/Administrator/Downloads/" # file_path = r"C:\Users\Administrator\Desktop\test_pdf\直接读表格线error/" # file_path = r"C:\Users\Administrator\Desktop\test_pdf\表格连接error/" # file_path = r"C:\Users\Administrator\Desktop\test_b_table/" file_path = r"C:\Users\Administrator\Desktop\test_pdf\普通error/" test_pdf_list = [['6df7f2bd5e8cac99a15a6c012e0d82a8.pdf', '34,52'], ['ca6a86753400d6dd6a1b324c5678b7fb.pdf', '18,69'], ['a8380bf795c71caf8185fb11395df138.pdf', '27,38'], ['7fd2ce6b08d086c98158b6f2fa0293b0.pdf', '32,48'], ['dd1adb4dc2014c7abcf403ef15a01eb5.pdf', '2,12'], ['error50.pdf', '1,-1'], ['error59.pdf', '1,-1'], ['error60.pdf', '1,-1'], ['error61.pdf', '1,-1'], ['error7.pdf', '39,57'], ['error8.pdf', '7,12'], ['error23.pdf', '1,-1'] ] index = 11 # test_one(file_path+test_pdf_list[index][0], page_no_range=test_pdf_list[index][1], from_remote=True) # from pdfplumber.table import TableFinder # fp = open(file_path+test_pdf_list[index][0], 'rb') # parser = PDFParser(fp) # doc_pdfminer = PDFDocument(parser) # rsrcmgr = PDFResourceManager() # laparams = LAParams(line_overlap=0.01, # char_margin=0.3, # line_margin=0.01, # word_margin=0.01, # boxes_flow=0.1, ) # device = PDFPageAggregator(rsrcmgr, laparams=laparams) # interpreter = PDFPageInterpreter(rsrcmgr, device) # doc_top = 0 # doc_pdfplumber = PDF(fp) # pages = PDFPage.create_pages(doc_pdfminer) # from pdfplumber.page import Page as pdfPage # for page in pages: # page_plumber = pdfPage(doc_pdfplumber, page, page_number=1, initial_doctop=doc_top) # table_finder = TableFinder(page_plumber) # all_width_zero = True # for _edge in table_finder.get_edges(): # if _edge.get('linewidth') and _edge.get('linewidth') > 0: # all_width_zero = False # break # lt_line_list = [] # for _edge in table_finder.get_edges(): # # print(_edge) # if _edge.get('linewidth', 0.1) > 0 or all_width_zero: # lt_line_list.append(LTLine(1, (float(_edge["x0"]), float(_edge["y0"])), # (float(_edge["x1"]), float(_edge["y1"])))) # _plot(lt_line_list, 'table', 1, 1) # 测试maxcompute模式 # _process = mp.Process(target=test_maxcompute, args=(file_path, '1,-1',)) # _process.start() # _process.join()