123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- import base64
- import copy
- import ctypes
- import gc
- import hashlib
- import inspect
- import multiprocessing
- import os
- import random
- import traceback
- from glob import glob, iglob
- import threading
- import time
- import urllib
- import psutil
- import requests
- import json
- import sys
- from multiprocessing import Process, Pool
- __dir__ = os.path.dirname(os.path.abspath(__file__))
- sys.path.append(os.path.abspath(os.path.join(__dir__, '..')))
- from format_convert.convert import convert
- from ocr.ocr_interface import ocr, OcrModels
- from otr.otr_interface import otr, OtrModels
- from format_convert.judge_platform import get_platform
- class myThread(threading.Thread):
- def __init__(self, threadName):
- threading.Thread.__init__(self)
- self.threadName = threadName
- def run(self):
- while True:
- start_time = time.time()
- test_convert()
- print(self.threadName, "finish!", time.time()-start_time)
- class myThread_appendix(threading.Thread):
- def __init__(self, threadName, _list):
- threading.Thread.__init__(self)
- self.threadName = threadName
- self._list = _list
- def run(self):
- start_time = time.time()
- test_appendix_downloaded(self._list)
- print(self.threadName, "finish!", time.time()-start_time)
- def test_ocr():
- with open("test_files/开标记录表3_page_0.png", "rb") as f:
- base64_data = base64.b64encode(f.read())
- # print(base64_data)
- url = local_url + ":15011" + '/ocr'
- # url = 'http://127.0.0.1:15013/ocr'
- r = requests.post(url, data=base64_data, timeout=2000)
- # print("test:", r.content.decode("utf-8"))
- def test_otr():
- with open("test_files/开标记录表3_page_0.png", "rb") as f:
- base64_data = base64.b64encode(f.read())
- # print(base64_data)
- url = local_url + ":15017" + '/otr'
- # url = 'http://127.0.0.1:15013/ocr'
- r = requests.post(url, data=base64_data, timeout=2000)
- # print("test:", r.content.decode("utf-8"))
- def test_convert():
- # path = "开标记录表3.pdf"
- # path = "test_files/开标记录表3_page_0.png"
- # path = "test_files/1.docx"
- # path = '光明食品(集团)有限公司2017年度经审计的合并及母公司财务报表.pdf'
- # path = '光明.pdf'
- # path = 'D:/BIDI_DOC/比地_文档/Oracle11g学生成绩管理系统.docx'
- # path = "C:\\Users\\Administrator\\Desktop\\1600825332753119.doc"
- # path = "temp/complex/8.png"
- # path = "合同备案.doc"
- # path = "1.png"
- # path = "1.pdf"
- # path = "(清单)衢州市第二人民医院二期工程电缆采购项目.xls"
- # path = "D:\\Project\\format_conversion\\appendix_test\\temp\\00fb3e52bc7e11eb836000163e0ae709" + \
- # "\\00fb43acbc7e11eb836000163e0ae709.png"
- # path = "D:\\BIDI_DOC\\比地_文档\\8a949486788ccc6d017969f189301d41.pdf"
- # path = "be8a17f2cc1b11eba26800163e0857b6.docx"
- # path = "江苏省通州中等专业学校春节物资采购公 告.docx"
- # path = "test_files/1.zip"
- # path = "C:\\Users\\Administrator\\Desktop\\33f52292cdad11ebb58300163e0857b6.zip"
- path = "C:\\Users\\Administrator\\Desktop\\Test_Interface\\1623392355541.zip"
- with open(path, "rb") as f:
- base64_data = base64.b64encode(f.read())
- # print(base64_data)
- url = _url + '/convert'
- # url = 'http://127.0.0.1:15014/convert'
- # headers = {'Content-Type': 'application/json'}
- headers = {
- 'Connection': 'keep-alive'
- }
- data = urllib.parse.urlencode({"file": base64_data, "type": path.split(".")[-1]}).encode('utf-8')
- req = urllib.request.Request(url, data=data, headers=headers)
- with urllib.request.urlopen(req) as response:
- _dict = eval(response.read().decode("utf-8"))
- result = _dict.get("result")
- is_success = _dict.get("is_success")
- print("is_success", is_success)
- print("len(result)", len(result))
- for i in range(len(result)):
- print("=================")
- print(result[i])
- print("-----------------")
- # print(len(eval(r.content.decode("utf-8")).get("result")))
- # print(r.content)
- def test_appendix_downloaded(_list):
- # 直接使用下载好的附件
- i = 0
- # for docid_file in glob("/mnt/html_files/*"):
- for docid_file in _list:
- if i % 100 == 0:
- print("Loop", i)
- # print(docid_file)
- for file_path in iglob(docid_file + "/*"):
- print(file_path)
- with open(file_path, "rb") as f:
- base64_data = base64.b64encode(f.read())
- url = _url + '/convert'
- # print(url)
- try:
- # headers = {
- # 'Connection': 'keep-alive'
- # }
- # data = urllib.parse.urlencode({"file": base64_data, "type": file_path.split(".")[-1]}).encode('utf-8')
- # req = urllib.request.Request(url, data=data, headers=headers)
- # with urllib.request.urlopen(req, timeout=2000) as response:
- # _dict = eval(response.read().decode("utf-8"))
- # timeout=2000
- r = requests.post(url, data={"file": base64_data,
- "type": file_path.split(".")[-1]}, timeout=2000)
- _dict = eval(r.content.decode("utf-8"))
- print("is_success:", _dict.get("is_success"))
- except Exception as e:
- print("docid " + str(docid_file) + " time out!", e)
- i += 1
- def test_convert_maxcompute():
- try:
- ocr_model = OcrModels().get_model()
- otr_model = OtrModels().get_model()
- path_list = []
- path_suffix = "未命名4.pdf"
- if get_platform() == "Windows":
- path_prefix = "C:\\Users\\Administrator\\Desktop\\Test_ODPS\\"
- # path_prefix = "C:\\Users\\Administrator\\Desktop\\"
- path_list.append(path_prefix + path_suffix)
- else:
- path_list.append(path_suffix)
- result_list = []
- for path in path_list:
- with open(path, "rb") as f:
- base64_data = base64.b64encode(f.read())
- # print("------------")
- # print(base64_data)
- # print('------------')
- data = {"file": base64_data, "type": path.split(".")[-1]}
- result_dict = convert(data, ocr_model, otr_model)
- print("garbage object num:%d" % (len(gc.garbage)))
- _unreachable = gc.collect()
- print("unreachable object num:%d" % (_unreachable))
- print("garbage object num:%d" % (len(gc.garbage)))
- result_list.append(result_dict)
- for result_dict in result_list:
- result = result_dict.get("result_text")
- is_success = result_dict.get("is_success")
- for i in range(len(result)):
- print("=================", "is_success", is_success, i, "in", len(result))
- # _dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
- # _dir = os.path.abspath(_dir) + os.sep
- # if i == 0:
- # with open(_dir + "result.html", "w") as ff:
- # ff.write(result[i])
- # else:
- # with open(_dir + "result.html", "a") as ff:
- # ff.write("<div>=================================================</div>")
- # ff.write(result[i])
- # print("write result to", _dir + "result.html")
- del otr_model
- del ocr_model
- gc.collect()
- except Exception as e:
- print(e)
- usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
- print("memory 2", str(usage))
- def getMDFFromFile(path):
- _length = 0
- try:
- _md5 = hashlib.md5()
- with open(path, "rb") as ff:
- while True:
- data = ff.read(4096)
- if not data:
- break
- _length += len(data)
- _md5.update(data)
- return _md5.hexdigest(), _length
- except Exception as e:
- traceback.print_exc()
- return None, _length
- def get_base64():
- path = "C:\\Users\\Administrator\\Desktop\\Test_ODPS\\1623430252934.doc"
- with open(path, "rb") as f:
- base64_data = base64.b64encode(f.read())
- print("------------")
- print(base64_data)
- print('------------')
- print(getMDFFromFile(path))
- def test_init_model():
- class MyThread(threading.Thread):
- def __init__(self):
- super(MyThread, self).__init__()
- self.ocr_model = OcrModels().get_model()
- self.otr_model = OtrModels().get_model()
- def run(self):
- self.result = random.randint(1, 10)
- def get_result(self):
- return self.result
- def _async_raise(self, tid, exctype):
- """raises the exception, performs cleanup if needed"""
- tid = ctypes.c_long(tid)
- if not inspect.isclass(exctype):
- exctype = type(exctype)
- res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
- if res == 0:
- raise ValueError("invalid thread id")
- elif res != 1:
- ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
- raise SystemError("PyThreadState_SetAsyncExc failed")
- def stop_thread(self, tid):
- self._async_raise(tid, SystemExit)
- class GetModel:
- def __init__(self):
- # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
- # print("memory 2", str(usage))
- return
- def process(self):
- thread = MyThread()
- thread.start()
- thread.join()
- result = thread.get_result()
- print(result)
- if thread.is_alive():
- thread.stop_thread(thread.ident)
- # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
- # print("memory 3", str(usage))
- m = GetModel()
- m.process()
- # spawn模式复制进程,否则模型挂起
- # multiprocessing.set_start_method('spawn', force=True)
- ocr_model = ""
- otr_model = ""
- class TestProcess:
- def __init__(self):
- super(TestProcess, self).__init__()
- self.process_num = 2
- self.data_list = []
- self.result_list = []
- self.current_data = ""
- self.result_num = 0
- def child_process_1(self):
- # 初始化模型
- globals().update({"ocr_model": OcrModels().get_model()})
- globals().update({"otr_model": OtrModels().get_model()})
- # 循环转换
- for data in self.data_list:
- self.current_data = data
- # self.child_process_2()
- p = Process(target=self.child_process_2)
- p.start()
- p.join()
- if p.is_alive():
- print("p.close")
- p.close()
- # 初始化
- self.data_list = []
- # 删除之前模型
- global ocr_model, otr_model
- del ocr_model
- del otr_model
- gc.collect()
- def child_process_2(self):
- global ocr_model, otr_model
- result = convert(self.current_data, ocr_model, otr_model)
- print("result", result.get("is_success"))
- self.result_list.append(result)
- print("len(self.result_list)======================", len(self.result_list))
- self.result_num += 1
- def process(self, path_list):
- for path in path_list:
- with open(path, "rb") as f:
- base64_data = base64.b64encode(f.read())
- data = {"file": base64_data, "type": path.split(".")[-1]}
- self.data_list.append(data)
- # 攒够10条数据执行
- if len(self.data_list) == self.process_num:
- p = Process(target=self.child_process_1)
- p.start()
- p.join()
- p.close()
- print("init data_list result_list!")
- self.data_list = []
- print("self.result_num", self.result_num)
- def test_convert_process():
- t = TestProcess()
- t.process(["1623430252934.doc", "1623430252934.doc"])
- usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024
- print("----- memory info start - test_convert_process" + " - " + str(usage) + " GB")
- # t.process(["1.docx", "1.docx"])
- # usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024
- # print("----- memory info start - test_convert_process" + " - " + str(usage) + " GB")
- gpu_url = "http://192.168.2.101"
- memory_url = "http://47.97.90.190"
- local_url = "http://127.0.0.1"
- production_url = "http://47.98.57.0"
- _url = local_url + ":15015"
- if __name__ == '__main__':
- # test_convert()
- # test_convert_process()
- test_convert_maxcompute()
- # test_init_model()
- # test_ocr()
- # test_otr()
- # test_appendix_downloaded()
- # get_base64()
- # print(getMDFFromFile("C:\\Users\\Administrator\\Desktop\\Test_ODPS\\1624900794475.docx"))
- # 多线程调用 #####################################
- # threads_num = 30
- # thread_list = []
- # glob_list = glob("html_files/*")
- # sub_num = int(len(glob_list) / threads_num)
- # print(len(glob_list), sub_num)
- #
- # for i in range(threads_num):
- # if i == threads_num - 1:
- # _list = glob_list[i*sub_num:]
- # else:
- # _list = glob_list[i*sub_num:(i+1)*sub_num]
- # print(i*sub_num, len(_list))
- #
- # thread = myThread_appendix("Thread-"+str(i), _list)
- # thread_list.append(thread)
- #
- # for thread in thread_list:
- # thread.start()
- # for thread in thread_list:
- # thread.join()
|