import base64 import inspect import json import logging import os import random import sys import time from werkzeug.exceptions import NotFound sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") import traceback import requests from format_convert import _global from format_convert.utils import get_platform, get_sequential_data, judge_error_code, request_post, get_ip_port, \ get_intranet_ip, get_logger, log, memory_decorator from ocr.ocr_interface import ocr, OcrModels from otr.otr_interface import otr, OtrModels from format_convert.libreoffice_interface import office_convert # 远程GPU接口 # # interface_ip_list = ['http://192.168.2.102', 'http://192.168.2.103'] # # interface_ip_list = ['http://172.16.160.65', 'http://172.16.160.64', 'http://172.16.160.66', 'http://172.16.160.67'] # interface_ip_list = ['http://172.16.160.65', 'http://172.16.160.65'] # # ocr_port_list = ["15011", "15013", "15015"] # # ocr_port_list = ["15011", "15013", "15015", "15017", "15019"] # # otr_port_list = ["15012", "15014", "15016", "15018", "15020"] # ocr_port_list = ["15011", "15013", "15015"] # otr_port_list = ["15012", "15014", "15016"] # # ocr_port_list = ["15011", "15013", "15015", "15017", "15019", "15021"] # # otr_port_list = ["15012", "15014", "15016", "15018", "15020", "15022"] # soffice_port_list = ["16000", "16001", "16002", "16003", "16004", "16005", # "16006", "16007", "16008", "16009"] # # ocr_port_list = ["15011", "15013"] # # otr_port_list = ["15012"] if get_platform() == "Windows": FROM_REMOTE = False else: FROM_REMOTE = True # _global = {} # ip_port_flag = {} # ip_port_dict = get_ip_port() # for _k in ip_port_dict.keys(): # ip_port_flag.update({_k: {"ocr": 0, # "otr": 0, # "convert": 0, # "office": 0 # }}) # _global.update({"ip_port_flag": ip_port_flag}) # _global.update({"ip_port": ip_port_dict}) def from_office_interface(src_path, dest_path, target_format, retry_times=1, from_remote=FROM_REMOTE): try: # Win10跳出超时装饰器 # if get_platform() == "Windows": # # origin_office_convert = office_convert.__wrapped__ # # file_path = origin_office_convert(src_path, dest_path, target_format, retry_times) # file_path = office_convert(src_path, dest_path, target_format, retry_times) # else: # # 将装饰器包装为一个类,否则多进程Pickle会报错 it's not the same object as xxx 问题, # # timeout_decorator_obj = my_timeout_decorator.TimeoutClass(office_convert, 180, TimeoutError) # # file_path = timeout_decorator_obj.run(src_path, dest_path, target_format, retry_times) # # file_path = office_convert(src_path, dest_path, target_format, retry_times) if from_remote: # 重试 retry_times_1 = 1 retry_times_2 = 2 while retry_times_1 and retry_times_2: # _ip = ip_pool("soffice", _random=True) # _port = port_pool("soffice", _random=True) # _ip = interface_ip_list[0] # _port = "16002" # _ip, _port = interface_pool("soffice") # ip_port = from_schedule_interface("office") ip_port = interface_pool("office") if judge_error_code(ip_port): return ip_port _url = ip_port + "/soffice" with open(src_path, "rb") as f: file_bytes = f.read() base64_stream = base64.b64encode(file_bytes) start_time = time.time() r = json.loads(request_post(_url, {"src_path": src_path, "dest_path": dest_path, "file": base64_stream, "target_format": target_format, "retry_times": retry_times}, time_out=25)) log("office use time " + str(time.time()-start_time)) if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: return r else: retry_times_1 -= 1 log("retry post office_interface... left times " + str(retry_times_1)) continue file_str = r.get("data") if judge_error_code(file_str): if retry_times_2 <= 1: return file_str else: retry_times_2 -= 1 continue file_bytes = eval(file_str) uid1 = src_path.split(os.sep)[-1].split(".")[0] file_path = dest_path + uid1 + "." + target_format if not os.path.exists(os.path.dirname(file_path)): os.makedirs(os.path.dirname(file_path), mode=0o777) with open(file_path, "wb") as f: f.write(file_bytes) break else: file_path = office_convert(src_path, dest_path, target_format, retry_times) if judge_error_code(file_path): return file_path return file_path except TimeoutError: log("from_office_interface timeout error!") return [-5] except: log("from_office_interface error!") print("from_office_interface", traceback.print_exc()) return [-1] def from_ocr_interface(image_stream, is_table=False, from_remote=FROM_REMOTE): log("into from_ocr_interface") try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if from_remote: retry_times_1 = 3 # 重试 while retry_times_1: # _ip = ip_pool("ocr", _random=True) # _port = port_pool("ocr", _random=True) # if _ip == interface_ip_list[1]: # _port = ocr_port_list[0] # _ip, _port = interface_pool("ocr") # ip_port = _ip + ":" + _port # ip_port = from_schedule_interface("ocr") ip_port = interface_pool("ocr") if judge_error_code(ip_port): return ip_port _url = ip_port + "/ocr" r = json.loads(request_post(_url, {"data": base64_stream}, time_out=60)) if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: if is_table: return r, r else: return r else: retry_times_1 -= 1 log("retry post ocr_interface... left times " + str(retry_times_1)) continue if judge_error_code(r): return r break else: if globals().get("global_ocr_model") is None: globals().update({"global_ocr_model": OcrModels().get_model()}) print("=========== init ocr model ===========") r = ocr(data=base64_stream, ocr_model=globals().get("global_ocr_model")) except TimeoutError: if is_table: return [-5], [-5] else: return [-5] except requests.exceptions.ConnectionError as e: if is_table: return [-2], [-2] else: return [-2] _dict = r text_list = eval(_dict.get("text")) bbox_list = eval(_dict.get("bbox")) if text_list is None: text_list = [] if bbox_list is None: bbox_list = [] if is_table: return text_list, bbox_list else: if text_list and bbox_list: text = get_sequential_data(text_list, bbox_list, html=True) if judge_error_code(text): return text else: text = "" return text except Exception as e: log("from_ocr_interface error!") # print("from_ocr_interface", e, global_type) if is_table: return [-1], [-1] else: return [-1] def from_otr_interface2(image_stream): log("into from_otr_interface") try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if globals().get("global_otr_model") is None: globals().update({"global_otr_model": OtrModels().get_model()}) print("=========== init otr model ===========") r = otr(data=base64_stream, otr_model=globals().get("global_otr_model")) except TimeoutError: return [-5], [-5], [-5], [-5], [-5] except requests.exceptions.ConnectionError as e: log("from_otr_interface") print("from_otr_interface", traceback.print_exc()) return [-2], [-2], [-2], [-2], [-2] # 处理结果 _dict = r points = eval(_dict.get("points")) split_lines = eval(_dict.get("split_lines")) bboxes = eval(_dict.get("bboxes")) outline_points = eval(_dict.get("outline_points")) lines = eval(_dict.get("lines")) # print("from_otr_interface len(bboxes)", len(bboxes)) if points is None: points = [] if split_lines is None: split_lines = [] if bboxes is None: bboxes = [] if outline_points is None: outline_points = [] if lines is None: lines = [] return points, split_lines, bboxes, outline_points, lines except Exception as e: log("from_otr_interface error!") print("from_otr_interface", traceback.print_exc()) return [-1], [-1], [-1], [-1], [-1] def from_otr_interface(image_stream, is_from_pdf=False, from_remote=FROM_REMOTE): log("into from_otr_interface") try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if from_remote: retry_times_1 = 3 # 重试 while retry_times_1: # _ip = ip_pool("otr", _random=True) # _port = port_pool("otr", _random=True) # if _ip == interface_ip_list[1]: # _port = otr_port_list[0] ip_port = interface_pool("otr") # ip_port = from_schedule_interface("otr") if judge_error_code(ip_port): return ip_port _url = ip_port + "/otr" r = json.loads(request_post(_url, {"data": base64_stream, "is_from_pdf": is_from_pdf}, time_out=60)) if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: return r else: retry_times_1 -= 1 log("retry post otr_interface... left times " + str(retry_times_1)) continue if judge_error_code(r): return r break else: if globals().get("global_otr_model") is None: globals().update({"global_otr_model": OtrModels().get_model()}) print("=========== init otr model ===========") r = otr(data=base64_stream, otr_model=globals().get("global_otr_model"), is_from_pdf=is_from_pdf) except TimeoutError: return [-5] except requests.exceptions.ConnectionError as e: log("from_otr_interface") print("from_otr_interface", traceback.print_exc()) return [-2] # 处理结果 _dict = r list_line = eval(_dict.get("list_line")) return list_line except Exception as e: log("from_otr_interface error!") print("from_otr_interface", traceback.print_exc()) return [-1] # def from_schedule_interface(interface_type): # try: # _ip = "http://" + get_intranet_ip() # _port = ip_port_dict.get(_ip).get("schedule")[0] # _url = _ip + ":" + _port + "/schedule" # data = {"interface_type": interface_type} # result = json.loads(request_post(_url, data, time_out=10)).get("data") # if judge_error_code(result): # return result # _ip, _port = result # log("from_schedule_interface " + _ip + " " + _port) # return _ip + ":" + _port # except requests.exceptions.ConnectionError as e: # log("from_schedule_interface ConnectionError") # return [-2] # except: # log("from_schedule_interface error!") # traceback.print_exc() # return [-1] def interface_pool(interface_type): ip_port_flag = _global.get("ip_port_flag") ip_port_dict = _global.get("ip_port") try: # 负载均衡, 选取ip interface_load_list = [] for _ip in ip_port_flag.keys(): if ip_port_dict.get(_ip).get(interface_type): load_scale = ip_port_flag.get(_ip).get(interface_type) / len(ip_port_dict.get(_ip).get(interface_type)) interface_load_list.append([_ip, load_scale]) if not interface_load_list: raise NotFound interface_load_list.sort(key=lambda x: x[-1]) _ip = interface_load_list[0][0] # 负载均衡, 选取port ip_type_cnt = ip_port_flag.get(_ip).get(interface_type) ip_type_total = len(ip_port_dict.get(_ip).get(interface_type)) if ip_type_cnt == 0: ip_type_cnt = random.randint(0, ip_type_total-1) port_index = ip_type_cnt % ip_type_total _port = ip_port_dict.get(_ip).get(interface_type)[port_index] # 更新flag current_flag = ip_type_cnt if current_flag >= 10000: ip_port_flag[_ip][interface_type] = 0 else: ip_port_flag[_ip][interface_type] = current_flag + 1 _global.update({"ip_port_flag": ip_port_flag}) log(str(_global.get("ip_port_flag"))) ip_port = _ip + ":" + str(_port) log(ip_port) return ip_port except NotFound: log("cannot read ip from config! checkout config") return [-2] except: traceback.print_exc() return [-1] # def interface_pool(interface_type): # try: # ip_port_dict = _global.get("ip_port") # ip_list = list(ip_port_dict.keys()) # _ip = random.choice(ip_list) # if interface_type != 'office': # _port = ip_port_dict.get(_ip).get(interface_type)[0] # else: # _port = random.choice(ip_port_dict.get(_ip).get(interface_type)) # log(_ip + ":" + _port) # return _ip + ":" + _port # except Exception as e: # traceback.print_exc() # return [-1] # def ip_pool(interface_type, _random=False): # ip_flag_name = interface_type + '_ip_flag' # ip_flag = globals().get(ip_flag_name) # if ip_flag is None: # if _random: # _r = random.randint(0, len(interface_ip_list)-1) # ip_flag = _r # globals().update({ip_flag_name: ip_flag}) # ip_index = _r # else: # ip_flag = 0 # globals().update({ip_flag_name: ip_flag}) # ip_index = 0 # else: # ip_index = ip_flag % len(interface_ip_list) # ip_flag += 1 # # if ip_flag >= 10000: # ip_flag = 0 # globals().update({ip_flag_name: ip_flag}) # # log("ip_pool " + interface_type + " " + str(ip_flag) + " " + str(interface_ip_list[ip_index])) # return interface_ip_list[ip_index] # # # def port_pool(interface_type, _random=False): # port_flag_name = interface_type + '_port_flag' # # port_flag = globals().get(port_flag_name) # if port_flag is None: # if _random: # if interface_type == "ocr": # _r = random.randint(0, len(ocr_port_list)-1) # elif interface_type == "otr": # _r = random.randint(0, len(otr_port_list)-1) # else: # _r = random.randint(0, len(soffice_port_list)-1) # port_flag = _r # globals().update({port_flag_name: port_flag}) # port_index = _r # else: # port_flag = 0 # globals().update({port_flag_name: port_flag}) # port_index = 0 # else: # if interface_type == "ocr": # port_index = port_flag % len(ocr_port_list) # elif interface_type == "otr": # port_index = port_flag % len(otr_port_list) # else: # port_index = port_flag % len(soffice_port_list) # port_flag += 1 # # if port_flag >= 10000: # port_flag = 0 # globals().update({port_flag_name: port_flag}) # # if interface_type == "ocr": # log("port_pool " + interface_type + " " + str(port_flag) + " " + ocr_port_list[port_index]) # return ocr_port_list[port_index] # elif interface_type == "otr": # log("port_pool " + interface_type + " " + str(port_flag) + " " + otr_port_list[port_index]) # return otr_port_list[port_index] # else: # log("port_pool " + interface_type + " " + str(port_flag) + " " + soffice_port_list[port_index]) # return soffice_port_list[port_index]