import base64 import concurrent.futures import json import os import random import sys import time import traceback from glob import glob import requests sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from format_convert.utils import get_platform, request_post, get_md5_from_bytes from format_convert.convert import to_html import multiprocessing as mp html_output_dir = os.path.dirname(os.path.abspath(__file__)) + "/../html_output/" def test_one(p, page_no_range=None, timeout=300, save_middle=None, save_html=False): if type(p) == tuple: p, page_no_range, timeout, save_middle, save_html = p start_time = time.time() with open(p, "rb") as f: file_bytes = f.read() file_base64 = base64.b64encode(file_bytes) _md5 = get_md5_from_bytes(file_bytes) data = {"file": file_base64, "type": p.split(".")[-1], "filemd5": _md5, 'page_no': page_no_range, 'timeout': timeout, 'save_middle': save_middle} # _url = 'http://dianxin.bidizhaobiao.com:15010/convert' # _url = 'http://192.168.2.103:15010/convert' # _url = 'http://192.168.2.102:15010/convert' # _url = 'http://172.16.160.65:15010/convert' _url = 'http://127.0.0.1:15010/convert' text_str = "" try: result = json.loads(request_post(_url, data, time_out=timeout+20)) print('result', result) for t in result.get("result_html"): text_str += t to_html(os.path.dirname(os.path.abspath(__file__)) + "/../result.html", text_str) if save_html: new_path = html_output_dir + p.split(os.sep)[-1].split('.')[0] + '.html' if 0 < len(text_str) <= 3 and text_str[0] == '-': print(new_path, text_str) else: to_html(new_path, text_str) print(_md5) # print('第', page_no_range.split(',')[0], '页到第', page_no_range.split(',')[-1], '页') print("result_text", result.get("result_text")[0][:20]) print("is_success", result.get("is_success")) except: traceback.print_exc() print(_md5) print("is_success", 0) print(time.time()-start_time) return p, 1 def test_path(): # _url = 'http://121.46.18.113:15010/convert' _url = 'http://192.168.0.115:15010/convert' print(_url) p = '/data/fangjiasheng/format_conversion_maxcompute/1.png' data = {"file_path": p, "type": p.split(".")[-1], "filemd5": 100, 'page_no': '1,-1', 'timeout': 10000, 'save_middle': None} print(str(data)) # result = json.loads(request_post(_url, data, time_out=1000)) result = json.loads(requests.post(_url, data)) text_str = "" for t in result.get("result_html"): text_str += t to_html(os.path.dirname(os.path.abspath(__file__)) + "/../result.html", text_str) print("result_text", result.get("result_text")[0][:20]) print("is_success", result.get("is_success")) def test_duplicate(path_list, process_no=None): start_time = time.time() # random.shuffle(path_list) for i in range(10): if i % 10 == 0: if process_no is not None: print("Process", process_no, i*len(path_list), time.time()-start_time) else: print("Loop", i*len(path_list), time.time()-start_time) for p in path_list: test_one(p, from_remote=True) def test_maxcompute(p, page_no_range=None): from format_convert import convert start_time = time.time() with open(p, "rb") as f: file_bytes = f.read() file_base64 = base64.b64encode(file_bytes) _md5 = get_md5_from_bytes(file_bytes) data = {"file": file_base64, "type": p.split(".")[-1], "filemd5": _md5, 'page_no': page_no_range} result = convert.convert(data) text_str = "" for t in result.get("result_html"): text_str += t to_html(os.path.dirname(os.path.abspath(__file__)) + "/../result.html", text_str) print(_md5) print('第', page_no_range.split(',')[0], '页到第', page_no_range.split(',')[-1], '页') print("result_text", result.get("result_text")[0][:20]) print("is_success", result.get("is_success")) print(time.time()-start_time) def run_files(thread_num=20): paths = glob(r'C:\Users\Administrator\Downloads\招标文件内容提取\*') temp_list = [] for _path in paths: new_path = html_output_dir + _path.split(os.sep)[-1].split('.')[0] + '.html' if os.path.exists(new_path): continue temp_list.append(_path) paths = temp_list print('len(paths)', len(paths)) with concurrent.futures.ThreadPoolExecutor(max_workers=thread_num) as executor: tasks = [] for _path in paths: tasks.append((_path, '1,-1', 10000, None, True)) # 提交任务给线程池 results = executor.map(test_one, tasks) for result in results: print(result) def test_kimi(): MOONSHOT_API_KEY = 'sk-ZqQBQfVBrs1lIilWVgggYqFwGcMu5pjlCeQf2SZL1KDlg1Pj' paths = glob(html_output_dir + '*.html') for p in paths[:100]: with open(p, 'r', encoding='utf-8') as f: _str = f.read() print('len(_str)', len(_str)) data = { 'model': 'moonshot-v1-8k', 'messages': [ { "role": "user", "content": _str } ], } _url = 'https://api.moonshot.cn/v1/tokenizers/estimate-token-count' headers = {'Content-Type': 'application/json', "Authorization": "Bearer " + MOONSHOT_API_KEY} result = requests.post(_url, json=data, data=None, headers=headers, timeout=100) print(result.text) if __name__ == '__main__': if get_platform() == "Windows": # file_path = "C:/Users/Administrator/Downloads/1750737587843.ofd" # file_path = r'D:\Project\format_conversion_maxcompute\save_b_table_pdf/e-1.pdf' # file_path = "D:/BIDI_DOC/比地_文档/1677829036789.pdf" # file_path = "C:/Users/Administrator/Desktop/test_xls/error4.xlsx" # file_path = "C:/Users/Administrator/Desktop/test_doc/error17.docx" # file_path = "C:/Users/Administrator/Desktop/test_swf/error2.swf" # file_path = "C:/Users/Administrator/Desktop/test_rar/error1.rar" # file_path = "C:/Users/Administrator/Desktop/test_image/error18.png" # file_path = "C:/Users/Administrator/Desktop/test_b_table/error29.png" # file_path = "C:/Users/Administrator/Desktop/test_pdf/普通error/error6.pdf" # file_path = "C:/Users/Administrator/Desktop/test_table_head/error2.pdf" # file_path = "C:/Users/Administrator/Desktop/test_wps/error2.wps" file_path = "C:/Users/Administrator/Desktop/test_ofd/1750381792388.ofd" else: file_path = "1660296734009.pdf" # test_one(file_path, page_no_range="1,-1", timeout=1000, save_middle=None) test_one(file_path, page_no_range=None, timeout=1000, save_middle=None) # run_files() # test_kimi() # test_path() # file_path = "C:/Users/Administrator/Downloads/" # file_path = r"C:\Users\Administrator\Desktop\test_pdf\直接读表格线error/" # file_path = r"C:\Users\Administrator\Desktop\test_pdf\表格连接error/" # file_path = r"C:\Users\Administrator\Desktop\test_b_table/" # file_path = r"C:\Users\Administrator\Desktop\test_pdf\普通error/" # test_pdf_list = [['6df7f2bd5e8cac99a15a6c012e0d82a8.pdf', '34,52'], # ['ca6a86753400d6dd6a1b324c5678b7fb.pdf', '18,69'], # ['a8380bf795c71caf8185fb11395df138.pdf', '27,38'], # ['7fd2ce6b08d086c98158b6f2fa0293b0.pdf', '32,48'], # ['dd1adb4dc2014c7abcf403ef15a01eb5.pdf', '2,12'], # ['error50.pdf', '1,-1'], # ['error59.pdf', '1,-1'], # ['error60.pdf', '1,-1'], # ['error61.pdf', '1,-1'], # ['error7.pdf', '39,57'], # ['error8.pdf', '7,12'], # ['error23.pdf', '1,-1'] # ] # index = 11 # test_one(file_path+test_pdf_list[index][0], page_no_range=test_pdf_list[index][1], from_remote=True) # 测试maxcompute模式 # _process = mp.Process(target=test_maxcompute, args=(file_path, '1,-1',)) # _process.start() # _process.join()