123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457 |
- import base64
- import inspect
- import json
- import logging
- import os
- import random
- import sys
- import time
- from werkzeug.exceptions import NotFound
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- import traceback
- import requests
- from format_convert import _global
- from format_convert.utils import get_platform, get_sequential_data, judge_error_code, request_post, get_ip_port, \
- get_intranet_ip, get_logger, log, memory_decorator
- from ocr.ocr_interface import ocr, OcrModels
- from otr.otr_interface import otr, OtrModels
- from format_convert.libreoffice_interface import office_convert
- # 远程GPU接口
- # # interface_ip_list = ['http://192.168.2.102', 'http://192.168.2.103']
- # # interface_ip_list = ['http://172.16.160.65', 'http://172.16.160.64', 'http://172.16.160.66', 'http://172.16.160.67']
- # interface_ip_list = ['http://172.16.160.65', 'http://172.16.160.65']
- # # ocr_port_list = ["15011", "15013", "15015"]
- # # ocr_port_list = ["15011", "15013", "15015", "15017", "15019"]
- # # otr_port_list = ["15012", "15014", "15016", "15018", "15020"]
- # ocr_port_list = ["15011", "15013", "15015"]
- # otr_port_list = ["15012", "15014", "15016"]
- # # ocr_port_list = ["15011", "15013", "15015", "15017", "15019", "15021"]
- # # otr_port_list = ["15012", "15014", "15016", "15018", "15020", "15022"]
- # soffice_port_list = ["16000", "16001", "16002", "16003", "16004", "16005",
- # "16006", "16007", "16008", "16009"]
- # # ocr_port_list = ["15011", "15013"]
- # # otr_port_list = ["15012"]
- if get_platform() == "Windows":
- FROM_REMOTE = False
- else:
- FROM_REMOTE = True
- # _global = {}
- # ip_port_flag = {}
- # ip_port_dict = get_ip_port()
- # for _k in ip_port_dict.keys():
- # ip_port_flag.update({_k: {"ocr": 0,
- # "otr": 0,
- # "convert": 0,
- # "office": 0
- # }})
- # _global.update({"ip_port_flag": ip_port_flag})
- # _global.update({"ip_port": ip_port_dict})
- def from_office_interface(src_path, dest_path, target_format, retry_times=1, from_remote=FROM_REMOTE):
- try:
- # Win10跳出超时装饰器
- # if get_platform() == "Windows":
- # # origin_office_convert = office_convert.__wrapped__
- # # file_path = origin_office_convert(src_path, dest_path, target_format, retry_times)
- # file_path = office_convert(src_path, dest_path, target_format, retry_times)
- # else:
- # # 将装饰器包装为一个类,否则多进程Pickle会报错 it's not the same object as xxx 问题,
- # # timeout_decorator_obj = my_timeout_decorator.TimeoutClass(office_convert, 180, TimeoutError)
- # # file_path = timeout_decorator_obj.run(src_path, dest_path, target_format, retry_times)
- #
- # file_path = office_convert(src_path, dest_path, target_format, retry_times)
- if from_remote:
- # 重试
- retry_times_1 = 1
- retry_times_2 = 2
- while retry_times_1 and retry_times_2:
- # _ip = ip_pool("soffice", _random=True)
- # _port = port_pool("soffice", _random=True)
- # _ip = interface_ip_list[0]
- # _port = "16002"
- # _ip, _port = interface_pool("soffice")
- # ip_port = from_schedule_interface("office")
- ip_port = interface_pool("office")
- if judge_error_code(ip_port):
- return ip_port
- _url = ip_port + "/soffice"
- with open(src_path, "rb") as f:
- file_bytes = f.read()
- base64_stream = base64.b64encode(file_bytes)
- start_time = time.time()
- r = json.loads(request_post(_url, {"src_path": src_path,
- "dest_path": dest_path,
- "file": base64_stream,
- "target_format": target_format,
- "retry_times": retry_times}, time_out=25))
- log("office use time " + str(time.time()-start_time))
- if type(r) == list:
- # 接口连不上换个端口重试
- if retry_times_1 <= 1:
- return r
- else:
- retry_times_1 -= 1
- log("retry post office_interface... left times " + str(retry_times_1))
- continue
- file_str = r.get("data")
- if judge_error_code(file_str):
- if retry_times_2 <= 1:
- return file_str
- else:
- retry_times_2 -= 1
- continue
- file_bytes = eval(file_str)
- uid1 = src_path.split(os.sep)[-1].split(".")[0]
- file_path = dest_path + uid1 + "." + target_format
- if not os.path.exists(os.path.dirname(file_path)):
- os.makedirs(os.path.dirname(file_path), mode=0o777)
- with open(file_path, "wb") as f:
- f.write(file_bytes)
- break
- else:
- file_path = office_convert(src_path, dest_path, target_format, retry_times)
- if judge_error_code(file_path):
- return file_path
- return file_path
- except TimeoutError:
- log("from_office_interface timeout error!")
- return [-5]
- except:
- log("from_office_interface error!")
- print("from_office_interface", traceback.print_exc())
- return [-1]
- def from_ocr_interface(image_stream, is_table=False, from_remote=FROM_REMOTE):
- log("into from_ocr_interface")
- try:
- base64_stream = base64.b64encode(image_stream)
- # 调用接口
- try:
- if from_remote:
- retry_times_1 = 3
- # 重试
- while retry_times_1:
- # _ip = ip_pool("ocr", _random=True)
- # _port = port_pool("ocr", _random=True)
- # if _ip == interface_ip_list[1]:
- # _port = ocr_port_list[0]
- # _ip, _port = interface_pool("ocr")
- # ip_port = _ip + ":" + _port
- # ip_port = from_schedule_interface("ocr")
- ip_port = interface_pool("ocr")
- if judge_error_code(ip_port):
- return ip_port
- _url = ip_port + "/ocr"
- r = json.loads(request_post(_url, {"data": base64_stream}, time_out=60))
- if type(r) == list:
- # 接口连不上换个端口重试
- if retry_times_1 <= 1:
- if is_table:
- return r, r
- else:
- return r
- else:
- retry_times_1 -= 1
- log("retry post ocr_interface... left times " + str(retry_times_1))
- continue
- if judge_error_code(r):
- return r
- break
- else:
- if globals().get("global_ocr_model") is None:
- globals().update({"global_ocr_model": OcrModels().get_model()})
- print("=========== init ocr model ===========")
- r = ocr(data=base64_stream, ocr_model=globals().get("global_ocr_model"))
- except TimeoutError:
- if is_table:
- return [-5], [-5]
- else:
- return [-5]
- except requests.exceptions.ConnectionError as e:
- if is_table:
- return [-2], [-2]
- else:
- return [-2]
- _dict = r
- text_list = eval(_dict.get("text"))
- bbox_list = eval(_dict.get("bbox"))
- if text_list is None:
- text_list = []
- if bbox_list is None:
- bbox_list = []
- if is_table:
- return text_list, bbox_list
- else:
- if text_list and bbox_list:
- text = get_sequential_data(text_list, bbox_list, html=True)
- if judge_error_code(text):
- return text
- else:
- text = ""
- return text
- except Exception as e:
- log("from_ocr_interface error!")
- # print("from_ocr_interface", e, global_type)
- if is_table:
- return [-1], [-1]
- else:
- return [-1]
- def from_otr_interface2(image_stream):
- log("into from_otr_interface")
- try:
- base64_stream = base64.b64encode(image_stream)
- # 调用接口
- try:
- if globals().get("global_otr_model") is None:
- globals().update({"global_otr_model": OtrModels().get_model()})
- print("=========== init otr model ===========")
- r = otr(data=base64_stream, otr_model=globals().get("global_otr_model"))
- except TimeoutError:
- return [-5], [-5], [-5], [-5], [-5]
- except requests.exceptions.ConnectionError as e:
- log("from_otr_interface")
- print("from_otr_interface", traceback.print_exc())
- return [-2], [-2], [-2], [-2], [-2]
- # 处理结果
- _dict = r
- points = eval(_dict.get("points"))
- split_lines = eval(_dict.get("split_lines"))
- bboxes = eval(_dict.get("bboxes"))
- outline_points = eval(_dict.get("outline_points"))
- lines = eval(_dict.get("lines"))
- # print("from_otr_interface len(bboxes)", len(bboxes))
- if points is None:
- points = []
- if split_lines is None:
- split_lines = []
- if bboxes is None:
- bboxes = []
- if outline_points is None:
- outline_points = []
- if lines is None:
- lines = []
- return points, split_lines, bboxes, outline_points, lines
- except Exception as e:
- log("from_otr_interface error!")
- print("from_otr_interface", traceback.print_exc())
- return [-1], [-1], [-1], [-1], [-1]
- def from_otr_interface(image_stream, is_from_pdf=False, from_remote=FROM_REMOTE):
- log("into from_otr_interface")
- try:
- base64_stream = base64.b64encode(image_stream)
- # 调用接口
- try:
- if from_remote:
- retry_times_1 = 3
- # 重试
- while retry_times_1:
- # _ip = ip_pool("otr", _random=True)
- # _port = port_pool("otr", _random=True)
- # if _ip == interface_ip_list[1]:
- # _port = otr_port_list[0]
- ip_port = interface_pool("otr")
- # ip_port = from_schedule_interface("otr")
- if judge_error_code(ip_port):
- return ip_port
- _url = ip_port + "/otr"
- r = json.loads(request_post(_url, {"data": base64_stream, "is_from_pdf": is_from_pdf}, time_out=60))
- if type(r) == list:
- # 接口连不上换个端口重试
- if retry_times_1 <= 1:
- return r
- else:
- retry_times_1 -= 1
- log("retry post otr_interface... left times " + str(retry_times_1))
- continue
- if judge_error_code(r):
- return r
- break
- else:
- if globals().get("global_otr_model") is None:
- globals().update({"global_otr_model": OtrModels().get_model()})
- print("=========== init otr model ===========")
- r = otr(data=base64_stream, otr_model=globals().get("global_otr_model"), is_from_pdf=is_from_pdf)
- except TimeoutError:
- return [-5]
- except requests.exceptions.ConnectionError as e:
- log("from_otr_interface")
- print("from_otr_interface", traceback.print_exc())
- return [-2]
- # 处理结果
- _dict = r
- list_line = eval(_dict.get("list_line"))
- return list_line
- except Exception as e:
- log("from_otr_interface error!")
- print("from_otr_interface", traceback.print_exc())
- return [-1]
- # def from_schedule_interface(interface_type):
- # try:
- # _ip = "http://" + get_intranet_ip()
- # _port = ip_port_dict.get(_ip).get("schedule")[0]
- # _url = _ip + ":" + _port + "/schedule"
- # data = {"interface_type": interface_type}
- # result = json.loads(request_post(_url, data, time_out=10)).get("data")
- # if judge_error_code(result):
- # return result
- # _ip, _port = result
- # log("from_schedule_interface " + _ip + " " + _port)
- # return _ip + ":" + _port
- # except requests.exceptions.ConnectionError as e:
- # log("from_schedule_interface ConnectionError")
- # return [-2]
- # except:
- # log("from_schedule_interface error!")
- # traceback.print_exc()
- # return [-1]
- def interface_pool(interface_type):
- ip_port_flag = _global.get("ip_port_flag")
- ip_port_dict = _global.get("ip_port")
- try:
- # 负载均衡, 选取ip
- interface_load_list = []
- for _ip in ip_port_flag.keys():
- if ip_port_dict.get(_ip).get(interface_type):
- load_scale = ip_port_flag.get(_ip).get(interface_type) / len(ip_port_dict.get(_ip).get(interface_type))
- interface_load_list.append([_ip, load_scale])
- if not interface_load_list:
- raise NotFound
- interface_load_list.sort(key=lambda x: x[-1])
- _ip = interface_load_list[0][0]
- # 负载均衡, 选取port
- ip_type_cnt = ip_port_flag.get(_ip).get(interface_type)
- ip_type_total = len(ip_port_dict.get(_ip).get(interface_type))
- if ip_type_cnt == 0:
- ip_type_cnt = random.randint(0, ip_type_total-1)
- port_index = ip_type_cnt % ip_type_total
- _port = ip_port_dict.get(_ip).get(interface_type)[port_index]
- # 更新flag
- current_flag = ip_type_cnt
- if current_flag >= 10000:
- ip_port_flag[_ip][interface_type] = 0
- else:
- ip_port_flag[_ip][interface_type] = current_flag + 1
- _global.update({"ip_port_flag": ip_port_flag})
- log(str(_global.get("ip_port_flag")))
- ip_port = _ip + ":" + str(_port)
- log(ip_port)
- return ip_port
- except NotFound:
- log("cannot read ip from config! checkout config")
- return [-2]
- except:
- traceback.print_exc()
- return [-1]
- # def interface_pool(interface_type):
- # try:
- # ip_port_dict = _global.get("ip_port")
- # ip_list = list(ip_port_dict.keys())
- # _ip = random.choice(ip_list)
- # if interface_type != 'office':
- # _port = ip_port_dict.get(_ip).get(interface_type)[0]
- # else:
- # _port = random.choice(ip_port_dict.get(_ip).get(interface_type))
- # log(_ip + ":" + _port)
- # return _ip + ":" + _port
- # except Exception as e:
- # traceback.print_exc()
- # return [-1]
- # def ip_pool(interface_type, _random=False):
- # ip_flag_name = interface_type + '_ip_flag'
- # ip_flag = globals().get(ip_flag_name)
- # if ip_flag is None:
- # if _random:
- # _r = random.randint(0, len(interface_ip_list)-1)
- # ip_flag = _r
- # globals().update({ip_flag_name: ip_flag})
- # ip_index = _r
- # else:
- # ip_flag = 0
- # globals().update({ip_flag_name: ip_flag})
- # ip_index = 0
- # else:
- # ip_index = ip_flag % len(interface_ip_list)
- # ip_flag += 1
- #
- # if ip_flag >= 10000:
- # ip_flag = 0
- # globals().update({ip_flag_name: ip_flag})
- #
- # log("ip_pool " + interface_type + " " + str(ip_flag) + " " + str(interface_ip_list[ip_index]))
- # return interface_ip_list[ip_index]
- #
- #
- # def port_pool(interface_type, _random=False):
- # port_flag_name = interface_type + '_port_flag'
- #
- # port_flag = globals().get(port_flag_name)
- # if port_flag is None:
- # if _random:
- # if interface_type == "ocr":
- # _r = random.randint(0, len(ocr_port_list)-1)
- # elif interface_type == "otr":
- # _r = random.randint(0, len(otr_port_list)-1)
- # else:
- # _r = random.randint(0, len(soffice_port_list)-1)
- # port_flag = _r
- # globals().update({port_flag_name: port_flag})
- # port_index = _r
- # else:
- # port_flag = 0
- # globals().update({port_flag_name: port_flag})
- # port_index = 0
- # else:
- # if interface_type == "ocr":
- # port_index = port_flag % len(ocr_port_list)
- # elif interface_type == "otr":
- # port_index = port_flag % len(otr_port_list)
- # else:
- # port_index = port_flag % len(soffice_port_list)
- # port_flag += 1
- #
- # if port_flag >= 10000:
- # port_flag = 0
- # globals().update({port_flag_name: port_flag})
- #
- # if interface_type == "ocr":
- # log("port_pool " + interface_type + " " + str(port_flag) + " " + ocr_port_list[port_index])
- # return ocr_port_list[port_index]
- # elif interface_type == "otr":
- # log("port_pool " + interface_type + " " + str(port_flag) + " " + otr_port_list[port_index])
- # return otr_port_list[port_index]
- # else:
- # log("port_pool " + interface_type + " " + str(port_flag) + " " + soffice_port_list[port_index])
- # return soffice_port_list[port_index]
|