import base64 import copy import ctypes import gc import hashlib import inspect import multiprocessing import os import random import traceback from glob import glob, iglob import threading import time import urllib import psutil import requests import json import sys from multiprocessing import Process, Pool __dir__ = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) from format_convert.convert import convert from ocr.ocr_interface import ocr, OcrModels from otr.otr_interface import otr, OtrModels from format_convert.judge_platform import get_platform class myThread(threading.Thread): def __init__(self, threadName): threading.Thread.__init__(self) self.threadName = threadName def run(self): while True: start_time = time.time() test_convert() print(self.threadName, "finish!", time.time()-start_time) class myThread_appendix(threading.Thread): def __init__(self, threadName, _list): threading.Thread.__init__(self) self.threadName = threadName self._list = _list def run(self): start_time = time.time() test_appendix_downloaded(self._list) print(self.threadName, "finish!", time.time()-start_time) def test_ocr(): with open("test_files/开标记录表3_page_0.png", "rb") as f: base64_data = base64.b64encode(f.read()) # print(base64_data) url = local_url + ":15011" + '/ocr' # url = 'http://127.0.0.1:15013/ocr' r = requests.post(url, data=base64_data, timeout=2000) # print("test:", r.content.decode("utf-8")) def test_otr(): with open("test_files/开标记录表3_page_0.png", "rb") as f: base64_data = base64.b64encode(f.read()) # print(base64_data) url = local_url + ":15017" + '/otr' # url = 'http://127.0.0.1:15013/ocr' r = requests.post(url, data=base64_data, timeout=2000) # print("test:", r.content.decode("utf-8")) def test_convert(): # path = "开标记录表3.pdf" # path = "test_files/开标记录表3_page_0.png" # path = "test_files/1.docx" # path = '光明食品(集团)有限公司2017年度经审计的合并及母公司财务报表.pdf' # path = '光明.pdf' # path = 'D:/BIDI_DOC/比地_文档/Oracle11g学生成绩管理系统.docx' # path = "C:\\Users\\Administrator\\Desktop\\1600825332753119.doc" # path = "temp/complex/8.png" # path = "合同备案.doc" # path = "1.png" # path = "1.pdf" # path = "(清单)衢州市第二人民医院二期工程电缆采购项目.xls" # path = "D:\\Project\\format_conversion\\appendix_test\\temp\\00fb3e52bc7e11eb836000163e0ae709" + \ # "\\00fb43acbc7e11eb836000163e0ae709.png" # path = "D:\\BIDI_DOC\\比地_文档\\8a949486788ccc6d017969f189301d41.pdf" # path = "be8a17f2cc1b11eba26800163e0857b6.docx" # path = "江苏省通州中等专业学校春节物资采购公 告.docx" # path = "test_files/1.zip" # path = "C:\\Users\\Administrator\\Desktop\\33f52292cdad11ebb58300163e0857b6.zip" path = "C:\\Users\\Administrator\\Desktop\\Test_Interface\\1623392355541.zip" with open(path, "rb") as f: base64_data = base64.b64encode(f.read()) # print(base64_data) url = _url + '/convert' # url = 'http://127.0.0.1:15014/convert' # headers = {'Content-Type': 'application/json'} headers = { 'Connection': 'keep-alive' } data = urllib.parse.urlencode({"file": base64_data, "type": path.split(".")[-1]}).encode('utf-8') req = urllib.request.Request(url, data=data, headers=headers) with urllib.request.urlopen(req) as response: _dict = eval(response.read().decode("utf-8")) result = _dict.get("result") is_success = _dict.get("is_success") print("is_success", is_success) print("len(result)", len(result)) for i in range(len(result)): print("=================") print(result[i]) print("-----------------") # print(len(eval(r.content.decode("utf-8")).get("result"))) # print(r.content) def test_appendix_downloaded(_list): # 直接使用下载好的附件 i = 0 # for docid_file in glob("/mnt/html_files/*"): for docid_file in _list: if i % 100 == 0: print("Loop", i) # print(docid_file) for file_path in iglob(docid_file + "/*"): print(file_path) with open(file_path, "rb") as f: base64_data = base64.b64encode(f.read()) url = _url + '/convert' # print(url) try: # headers = { # 'Connection': 'keep-alive' # } # data = urllib.parse.urlencode({"file": base64_data, "type": file_path.split(".")[-1]}).encode('utf-8') # req = urllib.request.Request(url, data=data, headers=headers) # with urllib.request.urlopen(req, timeout=2000) as response: # _dict = eval(response.read().decode("utf-8")) # timeout=2000 r = requests.post(url, data={"file": base64_data, "type": file_path.split(".")[-1]}, timeout=2000) _dict = eval(r.content.decode("utf-8")) print("is_success:", _dict.get("is_success")) except Exception as e: print("docid " + str(docid_file) + " time out!", e) i += 1 def test_convert_maxcompute(): try: ocr_model = OcrModels().get_model() otr_model = OtrModels().get_model() path_list = [] path_suffix = "未命名4.pdf" if get_platform() == "Windows": path_prefix = "C:\\Users\\Administrator\\Desktop\\Test_ODPS\\" # path_prefix = "C:\\Users\\Administrator\\Desktop\\" path_list.append(path_prefix + path_suffix) else: path_list.append(path_suffix) result_list = [] for path in path_list: with open(path, "rb") as f: base64_data = base64.b64encode(f.read()) # print("------------") # print(base64_data) # print('------------') data = {"file": base64_data, "type": path.split(".")[-1]} result_dict = convert(data, ocr_model, otr_model) print("garbage object num:%d" % (len(gc.garbage))) _unreachable = gc.collect() print("unreachable object num:%d" % (_unreachable)) print("garbage object num:%d" % (len(gc.garbage))) result_list.append(result_dict) for result_dict in result_list: result = result_dict.get("result_text") is_success = result_dict.get("is_success") for i in range(len(result)): print("=================", "is_success", is_success, i, "in", len(result)) # _dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..") # _dir = os.path.abspath(_dir) + os.sep # if i == 0: # with open(_dir + "result.html", "w") as ff: # ff.write(result[i]) # else: # with open(_dir + "result.html", "a") as ff: # ff.write("
=================================================
") # ff.write(result[i]) # print("write result to", _dir + "result.html") del otr_model del ocr_model gc.collect() except Exception as e: print(e) usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 print("memory 2", str(usage)) def getMDFFromFile(path): _length = 0 try: _md5 = hashlib.md5() with open(path, "rb") as ff: while True: data = ff.read(4096) if not data: break _length += len(data) _md5.update(data) return _md5.hexdigest(), _length except Exception as e: traceback.print_exc() return None, _length def get_base64(): path = "C:\\Users\\Administrator\\Desktop\\Test_ODPS\\1623430252934.doc" with open(path, "rb") as f: base64_data = base64.b64encode(f.read()) print("------------") print(base64_data) print('------------') print(getMDFFromFile(path)) def test_init_model(): class MyThread(threading.Thread): def __init__(self): super(MyThread, self).__init__() self.ocr_model = OcrModels().get_model() self.otr_model = OtrModels().get_model() def run(self): self.result = random.randint(1, 10) def get_result(self): return self.result def _async_raise(self, tid, exctype): """raises the exception, performs cleanup if needed""" tid = ctypes.c_long(tid) if not inspect.isclass(exctype): exctype = type(exctype) res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) if res == 0: raise ValueError("invalid thread id") elif res != 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) raise SystemError("PyThreadState_SetAsyncExc failed") def stop_thread(self, tid): self._async_raise(tid, SystemExit) class GetModel: def __init__(self): # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss # print("memory 2", str(usage)) return def process(self): thread = MyThread() thread.start() thread.join() result = thread.get_result() print(result) if thread.is_alive(): thread.stop_thread(thread.ident) # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss # print("memory 3", str(usage)) m = GetModel() m.process() # spawn模式复制进程,否则模型挂起 # multiprocessing.set_start_method('spawn', force=True) ocr_model = "" otr_model = "" class TestProcess: def __init__(self): super(TestProcess, self).__init__() self.process_num = 2 self.data_list = [] self.result_list = [] self.current_data = "" self.result_num = 0 def child_process_1(self): # 初始化模型 globals().update({"ocr_model": OcrModels().get_model()}) globals().update({"otr_model": OtrModels().get_model()}) # 循环转换 for data in self.data_list: self.current_data = data # self.child_process_2() p = Process(target=self.child_process_2) p.start() p.join() if p.is_alive(): print("p.close") p.close() # 初始化 self.data_list = [] # 删除之前模型 global ocr_model, otr_model del ocr_model del otr_model gc.collect() def child_process_2(self): global ocr_model, otr_model result = convert(self.current_data, ocr_model, otr_model) print("result", result.get("is_success")) self.result_list.append(result) print("len(self.result_list)======================", len(self.result_list)) self.result_num += 1 def process(self, path_list): for path in path_list: with open(path, "rb") as f: base64_data = base64.b64encode(f.read()) data = {"file": base64_data, "type": path.split(".")[-1]} self.data_list.append(data) # 攒够10条数据执行 if len(self.data_list) == self.process_num: p = Process(target=self.child_process_1) p.start() p.join() p.close() print("init data_list result_list!") self.data_list = [] print("self.result_num", self.result_num) def test_convert_process(): t = TestProcess() t.process(["1623430252934.doc", "1623430252934.doc"]) usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024 print("----- memory info start - test_convert_process" + " - " + str(usage) + " GB") # t.process(["1.docx", "1.docx"]) # usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024 # print("----- memory info start - test_convert_process" + " - " + str(usage) + " GB") gpu_url = "http://192.168.2.101" memory_url = "http://47.97.90.190" local_url = "http://127.0.0.1" production_url = "http://47.98.57.0" _url = local_url + ":15015" if __name__ == '__main__': # test_convert() # test_convert_process() test_convert_maxcompute() # test_init_model() # test_ocr() # test_otr() # test_appendix_downloaded() # get_base64() # print(getMDFFromFile("C:\\Users\\Administrator\\Desktop\\Test_ODPS\\1624900794475.docx")) # 多线程调用 ##################################### # threads_num = 30 # thread_list = [] # glob_list = glob("html_files/*") # sub_num = int(len(glob_list) / threads_num) # print(len(glob_list), sub_num) # # for i in range(threads_num): # if i == threads_num - 1: # _list = glob_list[i*sub_num:] # else: # _list = glob_list[i*sub_num:(i+1)*sub_num] # print(i*sub_num, len(_list)) # # thread = myThread_appendix("Thread-"+str(i), _list) # thread_list.append(thread) # # for thread in thread_list: # thread.start() # for thread in thread_list: # thread.join()