# encoding=utf8 import base64 import json import multiprocessing import os import pickle import random import sys import time import uuid import cv2 import torch from werkzeug.exceptions import NotFound sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from botr.yolov8.yolo_interface import yolo from botr.yolov8.model import Predictor from atc.atc_interface import AtcModels, atc from idc.idc_interface import IdcModels, idc from isr.isr_interface import IsrModels, isr import traceback import requests from format_convert import _global from format_convert.utils import get_platform, get_sequential_data, judge_error_code, request_post, get_ip_port, \ get_intranet_ip, get_logger, log, get_args_from_config, get_using_ip, np2bytes, set_flask_global from ocr.ocr_interface import ocr, OcrModels from otr.otr_interface import otr, OtrModels from format_convert.libreoffice_interface import office_convert import numpy as np from format_convert.max_compute_config import max_compute MAX_COMPUTE = max_compute if get_platform() == "Windows": FROM_REMOTE = False only_test_ocr = False if only_test_ocr: ip_port_flag = {} ip_port_dict = get_ip_port() for _k in ip_port_dict.keys(): ip_port_flag.update({_k: {"ocr": 0, "otr": 0, "convert": 0, "office": 0 }}) _global.update({"ip_port_flag": ip_port_flag}) ip_port_dict["http://127.0.0.1"]["ocr"] = ["17000"] ip_port_dict["http://127.0.0.1"]["otr"] = ["18000"] _global.update({"ip_port": ip_port_dict}) else: FROM_REMOTE = True if MAX_COMPUTE: FROM_REMOTE = False # ip_port_dict = get_ip_port() # ip = 'http://127.0.0.1' # ocr_port_list = ip_port_dict.get(ip).get("ocr") # otr_port_list = ip_port_dict.get(ip).get("otr") lock = multiprocessing.RLock() # 连接redis数据库 # redis_db = redis.StrictRedis(host='192.168.2.103', port='6379', # db=1, password='bidi123456', health_check_interval=300) redis_db = None def _interface(_dict, time_out=60, retry_times=3): try: # 重试 model_type = _dict.get("model_type") while retry_times: ip_port = interface_pool(model_type) if judge_error_code(ip_port): return ip_port _url = ip_port + "/" + model_type # base64_stream = base64.b64encode(pickle.dumps(_dict)) r = json.loads(request_post(_url, {"data": json.dumps(_dict), "model_type": model_type}, time_out=time_out)) log("get _interface return") if type(r) == list: # 接口连不上换个端口重试 if retry_times <= 1: return r else: retry_times -= 1 log("retry post _interface... left times " + str(retry_times) + " " + model_type) continue if judge_error_code(r): return r return r break except TimeoutError: return [-5] except requests.exceptions.ConnectionError as e: return [-2] def from_office_interface(src_path, dest_path, target_format, retry_times=1, from_remote=FROM_REMOTE): try: # Win10跳出超时装饰器 # if get_platform() == "Windows": # # origin_office_convert = office_convert.__wrapped__ # # file_path = origin_office_convert(src_path, dest_path, target_format, retry_times) # file_path = office_convert(src_path, dest_path, target_format, retry_times) # else: # # 将装饰器包装为一个类,否则多进程Pickle会报错 it's not the same object as xxx 问题, # # timeout_decorator_obj = my_timeout_decorator.TimeoutClass(office_convert, 180, TimeoutError) # # file_path = timeout_decorator_obj.run(src_path, dest_path, target_format, retry_times) # # file_path = office_convert(src_path, dest_path, target_format, retry_times) if from_remote: # 重试 retry_times_1 = 1 retry_times_2 = 2 while retry_times_1 and retry_times_2: # _ip = ip_pool("soffice", _random=True) # _port = port_pool("soffice", _random=True) # _ip = interface_ip_list[0] # _port = "16002" # _ip, _port = interface_pool("soffice") # ip_port = from_schedule_interface("office") ip_port = interface_pool_gunicorn("office") if judge_error_code(ip_port): return ip_port _url = ip_port + "/soffice" with open(src_path, "rb") as f: file_bytes = f.read() base64_stream = base64.b64encode(file_bytes) start_time = time.time() r = json.loads(request_post(_url, {"src_path": src_path, "dest_path": dest_path, "file": base64_stream, "target_format": target_format, "retry_times": retry_times}, time_out=25)) log("get interface return") log("office use time " + str(time.time()-start_time)) if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: return r else: retry_times_1 -= 1 log("retry post office_interface... left times " + str(retry_times_1)) continue file_str = r.get("data") if judge_error_code(file_str): if retry_times_2 <= 1: return file_str else: retry_times_2 -= 1 continue file_bytes = eval(file_str) uid1 = src_path.split(os.sep)[-1].split(".")[0] file_path = dest_path + uid1 + "." + target_format if not os.path.exists(os.path.dirname(file_path)): os.makedirs(os.path.dirname(file_path), mode=0o777) with open(file_path, "wb") as f: f.write(file_bytes) break else: file_path = office_convert(src_path, dest_path, target_format, retry_times) if judge_error_code(file_path): return file_path return file_path except TimeoutError: log("from_office_interface timeout error!") return [-5] except: log("from_office_interface error!") print("from_office_interface", traceback.print_exc()) return [-1] def from_ocr_interface(image_stream, is_table=False, only_rec=False, from_remote=FROM_REMOTE): log("into from_ocr_interface") try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if from_remote: retry_times_1 = 3 # 重试 while retry_times_1: # _ip = ip_pool("ocr", _random=True) # _port = port_pool("ocr", _random=True) # if _ip == interface_ip_list[1]: # _port = ocr_port_list[0] # _ip, _port = interface_pool("ocr") # ip_port = _ip + ":" + _port # ip_port = from_schedule_interface("ocr") ip_port = interface_pool_gunicorn("ocr") if judge_error_code(ip_port): return ip_port _url = ip_port + "/ocr" r = json.loads(request_post(_url, {"data": base64_stream, "md5": _global.get("md5"), "only_rec": only_rec }, time_out=60)) log("get interface return") if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: # if is_table: return r, r # else: # return r else: retry_times_1 -= 1 log("retry post ocr_interface... left times " + str(retry_times_1)) continue if judge_error_code(r): return r break else: if globals().get("global_ocr_model") is None: print("=========== init ocr model ===========") globals().update({"global_ocr_model": OcrModels().get_model()}) r = ocr(data=base64_stream, ocr_model=globals().get("global_ocr_model"), only_rec=only_rec) except TimeoutError: # if is_table: return [-5], [-5] # else: # return [-5] except requests.exceptions.ConnectionError as e: # if is_table: return [-2], [-2] # else: # return [-2] _dict = r text_list = eval(_dict.get("text")) bbox_list = eval(_dict.get("bbox")) if text_list is None: text_list = [] if bbox_list is None: bbox_list = [] if is_table: return text_list, bbox_list else: if text_list and bbox_list: text = get_sequential_data(text_list, bbox_list, html=True) if judge_error_code(text): return text else: text = "" return text except Exception as e: log("from_ocr_interface error!") # print("from_ocr_interface", e, global_type) if is_table: return [-1], [-1] else: return [-1] def from_gpu_interface_flask(_dict, model_type, predictor_type): log("into from_gpu_interface") start_time = time.time() try: # 调用接口 _dict.update({"predictor_type": predictor_type, "model_type": model_type}) if model_type == "ocr": use_zlib = True else: use_zlib = False result = _interface(_dict, time_out=30, retry_times=2, use_zlib=use_zlib) log("from_gpu_interface finish size " + str(sys.getsizeof(_dict)) + " time " + str(time.time()-start_time)) return result except Exception as e: log("from_gpu_interface error!") log("from_gpu_interface failed " + str(time.time()-start_time)) traceback.print_exc() return [-2] def from_gpu_interface_redis(_dict, model_type, predictor_type): log("into from_gpu_interface") start_time = time.time() try: # 调用接口 _uuid = uuid.uuid1().hex _dict.update({"predictor_type": predictor_type, "model_type": model_type, "uuid": _uuid}) _time = time.time() log("pickle.dumps(_dict)" + str(_dict)) redis_db.rpush("producer_"+model_type, pickle.dumps(_dict)) log("producer_" + model_type + " len " + str(redis_db.llen("producer_" + model_type))) log("to producer_" + model_type + " time " + str(time.time()-_time)) _time = time.time() time_out = 300 while True: time.sleep(0.2) if time.time() - _time > time_out: raise Exception if redis_db.hexists("consumer_"+model_type, _uuid): time1 = time.time() result = redis_db.hget("consumer_"+model_type, _uuid) log("from consumer_"+model_type + " time " + str(time.time()-time1)) break result = pickle.loads(result) log("from_gpu_interface finish - size " + str(sys.getsizeof(_dict)) + " - time " + str(time.time()-start_time)) return result except Exception as e: log("from_gpu_interface error!") log("from_gpu_interface failed " + str(time.time()-start_time)) traceback.print_exc() return [-2] # def from_gpu_flask_sm(_dict, model_type, predictor_type): # log("into from_gpu_share_memory") # start_time = time.time() # shm = None # try: # # 放入共享内存 # _time = time.time() # np_data = _dict.get("inputs") # shm = to_share_memory(np_data) # log("data into share memory " + str(shm.name) + " " + str(time.time()-_time)) # # # 调用接口 # _time = time.time() # _dict.pop("inputs") # _dict.update({"predictor_type": predictor_type, "model_type": model_type, # "sm_name": shm.name, "sm_shape": np_data.shape, # "sm_dtype": str(np_data.dtype)}) # result = _interface(_dict, time_out=30, retry_times=2) # log("_interface cost " + str(time.time()-_time)) # # # 读取共享内存 # _time = time.time() # sm_name = result.get("sm_name") # sm_shape = result.get("sm_shape") # sm_dtype = result.get("sm_dtype") # sm_dtype = get_np_type(sm_dtype) # if sm_name: # outputs = from_share_memory(sm_name, sm_shape, sm_dtype) # else: # log("from_share_memory failed!") # raise Exception # log("data from share memory " + sm_name + " " + str(time.time()-_time)) # # log("from_gpu_interface finish - size " + str(sys.getsizeof(_dict)) + " - time " + str(time.time()-start_time)) # return {"preds": outputs, "gpu_time": result.get("gpu_time")} # except Exception as e: # log("from_gpu_interface failed " + str(time.time()-start_time)) # traceback.print_exc() # return [-2] # finally: # # del b # Unnecessary; merely emphasizing the array is no longer used # if shm: # try: # shm.close() # shm.unlink() # except FileNotFoundError: # log("share memory " + shm.name + " not exists!") # except Exception: # traceback.print_exc() # # # def from_gpu_share_memory(_dict, model_type, predictor_type): # log("into from_gpu_share_memory") # start_time = time.time() # try: # _dict.update({"model_type": model_type, "predictor_type": predictor_type}) # outputs, gpu_time = share_memory_pool(_dict) # log("from_gpu_share_memory finish - size " + str(sys.getsizeof(_dict)) + " - time " + str(time.time()-start_time)) # return {"preds": outputs, "gpu_time": float(gpu_time)} # except Exception as e: # log("from_gpu_interface failed " + str(time.time()-start_time)) # traceback.print_exc() # return [-2] def from_otr_interface2(image_stream): log("into from_otr_interface") try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if globals().get("global_otr_model") is None: globals().update({"global_otr_model": OtrModels().get_model()}) print("=========== init otr model ===========") r = otr(data=base64_stream, otr_model=globals().get("global_otr_model")) except TimeoutError: return [-5], [-5], [-5], [-5], [-5] except requests.exceptions.ConnectionError as e: log("from_otr_interface") print("from_otr_interface", traceback.print_exc()) return [-2], [-2], [-2], [-2], [-2] # 处理结果 _dict = r points = eval(_dict.get("points")) split_lines = eval(_dict.get("split_lines")) bboxes = eval(_dict.get("bboxes")) outline_points = eval(_dict.get("outline_points")) lines = eval(_dict.get("lines")) # print("from_otr_interface len(bboxes)", len(bboxes)) if points is None: points = [] if split_lines is None: split_lines = [] if bboxes is None: bboxes = [] if outline_points is None: outline_points = [] if lines is None: lines = [] return points, split_lines, bboxes, outline_points, lines except Exception as e: log("from_otr_interface error!") print("from_otr_interface", traceback.print_exc()) return [-1], [-1], [-1], [-1], [-1] def from_otr_interface(image_stream, is_from_pdf=False, from_remote=FROM_REMOTE): log("into from_otr_interface") try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if from_remote: log("from remote") retry_times_1 = 3 # 重试 while retry_times_1: # _ip = ip_pool("otr", _random=True) # _port = port_pool("otr", _random=True) # if _ip == interface_ip_list[1]: # _port = otr_port_list[0] ip_port = interface_pool_gunicorn("otr") # ip_port = from_schedule_interface("otr") if judge_error_code(ip_port): return ip_port _url = ip_port + "/otr" r = json.loads(request_post(_url, {"data": base64_stream, "is_from_pdf": is_from_pdf, "md5": _global.get("md5")}, time_out=60)) log("get interface return") if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: return r else: retry_times_1 -= 1 log("retry post otr_interface... left times " + str(retry_times_1)) continue if judge_error_code(r): return r break else: log("from local") log("otr_model " + str(globals().get("global_otr_model"))) if globals().get("global_otr_model") is None: print("=========== init otr model ===========") globals().update({"global_otr_model": OtrModels().get_model()}) log("init finish") r = otr(data=base64_stream, otr_model=globals().get("global_otr_model"), is_from_pdf=is_from_pdf) # r = otr(data=base64_stream, otr_model=None, is_from_pdf=is_from_pdf) except TimeoutError: return [-5] except requests.exceptions.ConnectionError as e: log("from_otr_interface") print("from_otr_interface", traceback.print_exc()) return [-2] # 处理结果 _dict = r list_line = eval(_dict.get("list_line")) return list_line except Exception as e: log("from_otr_interface error!") print("from_otr_interface", traceback.print_exc()) return [-1] def from_isr_interface(image_stream, from_remote=FROM_REMOTE): log("into from_isr_interface") # os.environ["CUDA_VISIBLE_DEVICES"] = "0" start_time = time.time() try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if from_remote: retry_times_1 = 3 # 重试 while retry_times_1: ip_port = interface_pool_gunicorn("isr") if judge_error_code(ip_port): return ip_port _url = ip_port + "/isr" r = json.loads(request_post(_url, {"data": base64_stream, "md5": _global.get("md5")}, time_out=60)) log("get interface return") if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: return r else: retry_times_1 -= 1 log("retry post isr_interface... left times " + str(retry_times_1)) continue if judge_error_code(r): return r break else: if globals().get("global_isr_model") is None: print("=========== init isr model ===========") isr_yolo_model, isr_model = IsrModels().get_model() globals().update({"global_isr_yolo_model": isr_yolo_model}) globals().update({"global_isr_model": isr_model}) r = isr(data=base64_stream, isr_yolo_model=globals().get("global_isr_yolo_model"), isr_model=globals().get("global_isr_model")) except TimeoutError: return [-5] except requests.exceptions.ConnectionError as e: return [-2] _dict = r if from_remote: image_string = _dict.get("image") if judge_error_code(image_string): return image_string # [1]代表检测不到印章,直接返回 if isinstance(image_string, list) and image_string == [1]: return image_string image_base64 = image_string.encode("utf-8") image_bytes = base64.b64decode(image_base64) buffer = np.frombuffer(image_bytes, dtype=np.uint8) image_np = cv2.imdecode(buffer, 1) else: image_np = _dict.get("image") log("from_isr_interface cost time " + str(time.time()-start_time)) return image_np except Exception as e: log("from_isr_interface error!") traceback.print_exc() return [-11] finally: # os.environ["CUDA_VISIBLE_DEVICES"] = "-1" pass def from_idc_interface(image_stream, from_remote=FROM_REMOTE): log("into from_idc_interface") start_time = time.time() try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if from_remote: retry_times_1 = 3 # 重试 while retry_times_1: ip_port = interface_pool_gunicorn("idc") if judge_error_code(ip_port): return ip_port _url = ip_port + "/idc" r = json.loads(request_post(_url, {"data": base64_stream, "md5": _global.get("md5")}, time_out=60)) log("get interface return") if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: return r else: retry_times_1 -= 1 log("retry post idc_interface... left times " + str(retry_times_1)) continue if judge_error_code(r): return r break else: if globals().get("global_idc_model") is None: print("=========== init idc model ===========") idc_model = IdcModels().get_model() globals().update({"global_idc_model": idc_model}) r = idc(data=base64_stream, model=globals().get("global_idc_model")) except TimeoutError: return [-5] except requests.exceptions.ConnectionError as e: return [-2] _dict = r angle = _dict.get("angle") log("from_idc_interface cost time " + str(time.time()-start_time)) return angle except Exception as e: log("from_idc_interface error!") traceback.print_exc() return [-11] def from_atc_interface(text, from_remote=FROM_REMOTE): log("into from_atc_interface") start_time = time.time() try: # 调用接口 try: if from_remote: retry_times_1 = 3 # 重试 while retry_times_1: ip_port = interface_pool_gunicorn("atc") if judge_error_code(ip_port): return ip_port _url = ip_port + "/atc" r = json.loads(request_post(_url, {"data": text, "md5": _global.get("md5")}, time_out=60)) log("get interface return") if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: return r else: retry_times_1 -= 1 log("retry post atc_interface... left times " + str(retry_times_1)) continue if judge_error_code(r): return r break else: if globals().get("global_atc_model") is None: print("=========== init atc model ===========") atc_model = AtcModels().get_model() globals().update({"global_atc_model": atc_model}) r = atc(data=text, model=globals().get("global_atc_model")) except TimeoutError: return [-5] except requests.exceptions.ConnectionError as e: return [-2] _dict = r classification = _dict.get("classification") log("from_atc_interface cost time " + str(time.time()-start_time)) return classification except Exception as e: log("from_atc_interface error!") traceback.print_exc() return [-11] def from_yolo_interface(image_stream, from_remote=FROM_REMOTE): log("into from_yolo_interface") start_time = time.time() try: base64_stream = base64.b64encode(image_stream) # 调用接口 try: if from_remote: retry_times_1 = 3 # 重试 while retry_times_1: ip_port = interface_pool_gunicorn("yolo") if judge_error_code(ip_port): return ip_port _url = ip_port + "/yolo" log('yolo _url ' + _url) r = json.loads(request_post(_url, {"data": base64_stream, "md5": _global.get("md5")}, time_out=60)) log("get interface return") if type(r) == list: # 接口连不上换个端口重试 if retry_times_1 <= 1: return r else: retry_times_1 -= 1 log("retry post yolo_interface... left times " + str(retry_times_1)) continue if judge_error_code(r): return r break else: if globals().get("global_yolo_predictor") is None: print("=========== init yolo model ===========") ROOT = os.path.abspath(os.path.dirname(__file__)) + '/../' model_path = ROOT + 'botr/yolov8/weights.pt' image_size = 640 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") yolo_predictor = Predictor(image_size, device, model_path) globals().update({"global_yolo_predictor": yolo_predictor}) r = yolo(data=base64_stream, predictor=globals().get("global_yolo_predictor")) except TimeoutError: return [-5] except requests.exceptions.ConnectionError as e: return [-2] _dict = r b_table_list = _dict.get("b_table_list") log("from_yolo_interface cost time " + str(time.time()-start_time)) return b_table_list except Exception as e: log("from_yolo_interface error!") traceback.print_exc() return [-11] # def from_schedule_interface(interface_type): # try: # _ip = "http://" + get_intranet_ip() # _port = ip_port_dict.get(_ip).get("schedule")[0] # _url = _ip + ":" + _port + "/schedule" # data = {"interface_type": interface_type} # result = json.loads(request_post(_url, data, time_out=10)).get("data") # if judge_error_code(result): # return result # _ip, _port = result # log("from_schedule_interface " + _ip + " " + _port) # return _ip + ":" + _port # except requests.exceptions.ConnectionError as e: # log("from_schedule_interface ConnectionError") # return [-2] # except: # log("from_schedule_interface error!") # traceback.print_exc() # return [-1] def interface_pool(interface_type, use_gunicorn=True): ip_port_flag = _global.get("ip_port_flag") ip_port_dict = _global.get("ip_port") try: if use_gunicorn: _ip = "http://127.0.0.1" _port = ip_port_dict.get(_ip).get(interface_type)[0] ip_port = _ip + ":" + str(_port) log(ip_port) return ip_port # 负载均衡, 选取ip interface_load_list = [] for _ip in ip_port_flag.keys(): if ip_port_dict.get(_ip).get(interface_type): load_scale = ip_port_flag.get(_ip).get(interface_type) / len(ip_port_dict.get(_ip).get(interface_type)) interface_load_list.append([_ip, load_scale]) if not interface_load_list: raise NotFound interface_load_list.sort(key=lambda x: x[-1]) _ip = interface_load_list[0][0] # 负载均衡, 选取port ip_type_cnt = ip_port_flag.get(_ip).get(interface_type) ip_type_total = len(ip_port_dict.get(_ip).get(interface_type)) if ip_type_cnt == 0: ip_type_cnt = random.randint(0, ip_type_total-1) port_index = ip_type_cnt % ip_type_total _port = ip_port_dict.get(_ip).get(interface_type)[port_index] # 更新flag current_flag = ip_type_cnt if current_flag >= 10000: ip_port_flag[_ip][interface_type] = 0 else: ip_port_flag[_ip][interface_type] = current_flag + 1 _global.update({"ip_port_flag": ip_port_flag}) # log(str(_global.get("ip_port_flag"))) ip_port = _ip + ":" + str(_port) log(ip_port) return ip_port except NotFound: log("cannot read ip from config! checkout config") return [-2] except: traceback.print_exc() return [-1] def interface_pool_gunicorn(interface_type): ip_port_flag_dict = _global.get("ip_port_flag") ip_port_dict = _global.get("ip_port") try: if ip_port_dict is None or ip_port_flag_dict is None: print('_global', _global.get_dict()) raise NotFound # 负载均衡, 选取有该接口的ip min_cnt = 10000. interface_cnt = 0 _ip = None port_list = [] for key in ip_port_flag_dict.keys(): temp_port_list = get_args_from_config(ip_port_dict, key, interface_type) if not temp_port_list: continue interface_cnt = ip_port_flag_dict.get(key).get(interface_type) if interface_cnt is not None and interface_cnt / len(temp_port_list[0]) < min_cnt: _ip = key min_cnt = interface_cnt / len(temp_port_list[0]) port_list = temp_port_list[0] # 选取端口 if interface_type == "office": # 刚开始随机,后续求余 if min_cnt == 0: _port = port_list[random.randint(0, len(port_list)-1)] ip_port_flag_dict[_ip][interface_type] = int(_port[-2:]) else: _port = port_list[interface_cnt % len(port_list)] else: # 使用gunicorn则直接选第一个 _port = port_list[0] # 更新flag if ip_port_flag_dict.get(_ip).get(interface_type) >= 10000: ip_port_flag_dict[_ip][interface_type] = 0 else: ip_port_flag_dict[_ip][interface_type] += 1 _global.update({"ip_port_flag": ip_port_flag_dict}) ip_port = _ip + ":" + str(_port) log(interface_type) log(ip_port) return ip_port except NotFound: log("ip_port or ip_port_dict is None! checkout config") return [-2] except: traceback.print_exc() return [-1] def interface_pool_gunicorn_old(interface_type): ip_flag_list = _global.get("ip_flag") ip_port_flag_dict = _global.get("ip_port_flag") ip_port_dict = _global.get("ip_port") try: if ip_flag_list is None or ip_port_dict is None or ip_port_flag_dict is None: raise NotFound if interface_type == "office": # _ip = "http://127.0.0.1" _ip = get_using_ip() # 选取端口 port_list = ip_port_dict.get(_ip).get("MASTER").get(interface_type) ip_type_cnt = ip_port_flag_dict.get(_ip).get(interface_type) if ip_type_cnt == 0: _port = port_list[random.randint(0, len(port_list)-1)] else: _port = port_list[ip_type_cnt % len(port_list)] # 更新flag if ip_port_flag_dict.get(_ip).get(interface_type) >= 10000: ip_port_flag_dict[_ip][interface_type] = 0 else: ip_port_flag_dict[_ip][interface_type] += 1 _global.update({"ip_port_flag": ip_port_flag_dict}) else: # 负载均衡, 选取ip ip_flag_list.sort(key=lambda x: x[1]) if ip_flag_list[-1][1] == 0: ip_index = random.randint(0, len(ip_flag_list)-1) else: ip_index = 0 _ip = ip_flag_list[ip_index][0] if "master" in _ip: port_index = 1 else: port_index = 0 _ip = _ip.split("_")[0] # 选取端口, 使用gunicorn则直接选第一个 # _port = ip_port_dict.get(_ip).get("MASTER").get(interface_type)[0] log("_ip " + _ip) log("interface_type " + interface_type) port_list = get_args_from_config(ip_port_dict, _ip, interface_type) log("port_list" + str(port_list)) if port_index >= len(port_list): port_index = 0 _port = port_list[port_index][0] # # 选取端口, 使用gunicorn则直接选第一个 # _ip = _ip.split("_")[0] # port_list = get_args_from_config(ip_port_dict, _ip, interface_type) # if # print(port_list) # _port = port_list[0][0] # 更新flag if ip_flag_list[ip_index][1] >= 10000: ip_flag_list[ip_index][1] = 0 else: ip_flag_list[ip_index][1] += + 1 _global.update({"ip_flag": ip_flag_list}) ip_port = _ip + ":" + str(_port) log(ip_port) return ip_port except NotFound: log("ip_flag or ip_port_dict is None! checkout config") return [-2] except: traceback.print_exc() return [-1] # def share_memory_pool(args_dict): # np_data = args_dict.get("inputs") # _type = args_dict.get("model_type") # args_dict.update({"sm_shape": np_data.shape, "sm_dtype": str(np_data.dtype)}) # # if _type == 'ocr': # port_list = ocr_port_list # elif _type == 'otr': # port_list = otr_port_list # else: # log("type error! only support ocr otr") # raise Exception # # # 循环判断是否有空的share memory # empty_sm_list = None # sm_list_name = "" # while empty_sm_list is None: # for p in port_list: # sm_list_name = "sml_"+_type+"_"+str(p) # sm_list = get_share_memory_list(sm_list_name) # if sm_list[0] == "0": # lock.acquire(timeout=0.1) # if sm_list[0] == "0": # sm_list[0] = "1" # sm_list[-1] = "0" # empty_sm_list = sm_list # break # else: # continue # lock.release() # # log(str(os.getppid()) + " empty_sm_list " + sm_list_name) # # # numpy放入共享内存 # _time = time.time() # release_share_memory(get_share_memory("psm_" + str(os.getpid()))) # shm = to_share_memory(np_data) # log("data into share memory " + str(shm.name) + " " + str(time.time()-_time)) # # # 参数放入共享内存列表 # empty_sm_list[1] = args_dict.get("md5") # empty_sm_list[2] = args_dict.get("model_type") # empty_sm_list[3] = args_dict.get("predictor_type") # empty_sm_list[4] = args_dict.get("args") # empty_sm_list[5] = str(shm.name) # empty_sm_list[6] = str(args_dict.get("sm_shape")) # empty_sm_list[7] = args_dict.get("sm_dtype") # empty_sm_list[-1] = "1" # # log("empty_sm_list[7] " + empty_sm_list[7]) # close_share_memory_list(empty_sm_list) # # # 循环判断是否完成 # finish_sm_list = get_share_memory_list(sm_list_name) # while True: # if finish_sm_list[-1] == "0": # break # # # 读取共享内存 # _time = time.time() # sm_name = finish_sm_list[5] # sm_shape = finish_sm_list[6] # sm_shape = eval(sm_shape) # sm_dtype = finish_sm_list[7] # gpu_time = finish_sm_list[8] # sm_dtype = get_np_type(sm_dtype) # outputs = from_share_memory(sm_name, sm_shape, sm_dtype) # log(args_dict.get("model_type") + " " + args_dict.get("predictor_type") + " outputs " + str(outputs.shape)) # log("data from share memory " + sm_name + " " + str(time.time()-_time)) # # # 释放 # release_share_memory(get_share_memory(sm_name)) # # # 重置share memory list # finish_sm_list[-1] = "0" # finish_sm_list[0] = "0" # # close_share_memory_list(finish_sm_list) # return outputs, gpu_time # def interface_pool(interface_type): # try: # ip_port_dict = _global.get("ip_port") # ip_list = list(ip_port_dict.keys()) # _ip = random.choice(ip_list) # if interface_type != 'office': # _port = ip_port_dict.get(_ip).get(interface_type)[0] # else: # _port = random.choice(ip_port_dict.get(_ip).get(interface_type)) # log(_ip + ":" + _port) # return _ip + ":" + _port # except Exception as e: # traceback.print_exc() # return [-1] # def ip_pool(interface_type, _random=False): # ip_flag_name = interface_type + '_ip_flag' # ip_flag = globals().get(ip_flag_name) # if ip_flag is None: # if _random: # _r = random.randint(0, len(interface_ip_list)-1) # ip_flag = _r # globals().update({ip_flag_name: ip_flag}) # ip_index = _r # else: # ip_flag = 0 # globals().update({ip_flag_name: ip_flag}) # ip_index = 0 # else: # ip_index = ip_flag % len(interface_ip_list) # ip_flag += 1 # # if ip_flag >= 10000: # ip_flag = 0 # globals().update({ip_flag_name: ip_flag}) # # log("ip_pool " + interface_type + " " + str(ip_flag) + " " + str(interface_ip_list[ip_index])) # return interface_ip_list[ip_index] # # # def port_pool(interface_type, _random=False): # port_flag_name = interface_type + '_port_flag' # # port_flag = globals().get(port_flag_name) # if port_flag is None: # if _random: # if interface_type == "ocr": # _r = random.randint(0, len(ocr_port_list)-1) # elif interface_type == "otr": # _r = random.randint(0, len(otr_port_list)-1) # else: # _r = random.randint(0, len(soffice_port_list)-1) # port_flag = _r # globals().update({port_flag_name: port_flag}) # port_index = _r # else: # port_flag = 0 # globals().update({port_flag_name: port_flag}) # port_index = 0 # else: # if interface_type == "ocr": # port_index = port_flag % len(ocr_port_list) # elif interface_type == "otr": # port_index = port_flag % len(otr_port_list) # else: # port_index = port_flag % len(soffice_port_list) # port_flag += 1 # # if port_flag >= 10000: # port_flag = 0 # globals().update({port_flag_name: port_flag}) # # if interface_type == "ocr": # log("port_pool " + interface_type + " " + str(port_flag) + " " + ocr_port_list[port_index]) # return ocr_port_list[port_index] # elif interface_type == "otr": # log("port_pool " + interface_type + " " + str(port_flag) + " " + otr_port_list[port_index]) # return otr_port_list[port_index] # else: # log("port_pool " + interface_type + " " + str(port_flag) + " " + soffice_port_list[port_index]) # return soffice_port_list[port_index] if __name__ == "__main__": _global._init() set_flask_global() _img = cv2.imread(r"C:/Users/Administrator/Desktop/test_b_table/error11.png") _img_bytes = np2bytes(_img) b_list = from_yolo_interface(_img_bytes, from_remote=True) for l in b_list: for b in l: cv2.rectangle(_img, (int(b[0]), int(b[1])), (int(b[2]), int(b[3])), (0, 0, 255), 2) cv2.namedWindow('img', cv2.WINDOW_NORMAL) cv2.imshow('img', _img) cv2.waitKey(0)