# encoding=utf8 import argparse import base64 import io import json import pickle import sys import os import threading import numpy as np import redis sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") import time import traceback os.environ['FLAGS_eager_delete_tensor_gb'] = '0' from format_convert.utils import request_post, test_gpu, get_intranet_ip, log, get_md5_from_bytes, \ to_share_memory, from_share_memory, get_np_type, namespace_to_dict, get_share_memory_list, get_ip_port, \ release_share_memory, get_share_memory, close_share_memory_list from flask import Flask, request from format_convert import _global from ocr.tools.infer import utility from ocr.ppocr.utils.logging import get_logger logger = get_logger() # 接口配置 app = Flask(__name__) ocr_model_dir = os.path.dirname(os.path.abspath(__file__)) + "/model/2.0/" lock = threading.RLock() ip_port_dict = get_ip_port() ip = 'http://127.0.0.1' ocr_port_list = ip_port_dict.get(ip).get("ocr") otr_port_list = ip_port_dict.get(ip).get("otr") # redis_db = redis.StrictRedis(host='127.0.0.1', port='6379', # db=1, password='bidi123456', health_check_interval=300) redis_db = None # @app.route('/ocr', methods=['POST']) def _ocr_gpu_flask(): start_time = time.time() log("into _ocr") _global._init() _global.update({"port": globals().get("port")}) log("into _ocr -> _global " + str(time.time()-start_time)) start_time = time.time() try: if not request.form: log("ocr no data!") return json.dumps({"text": str([-9]), "bbox": str([-9])}) log("judge request.form " + str(time.time()-start_time)) start_time1 = time.time() result = pickle.loads(base64.b64decode(request.form.get("data"))) # 解压 inputs = result.get("inputs") # 解压numpy decompressed_array = io.BytesIO() decompressed_array.write(inputs) decompressed_array.seek(0) inputs = np.load(decompressed_array, allow_pickle=True)['arr_0'] log("inputs.shape" + str(inputs.shape)) args = result.get("args") predictor_type = result.get("predictor_type") model_type = result.get("model_type") _md5 = result.get("md5") _global.update({"md5": _md5}) log("read data " + str(time.time()-start_time1)) # 获取对应predictor if globals().get(predictor_type) is None: start_time1 = time.time() log("=== init " + model_type + " " + predictor_type + " model ===") predictor, input_tensor, output_tensors = \ utility.create_predictor(args, predictor_type, logger) globals().update({predictor_type: {"predictor": predictor, "input_tensor": input_tensor, "output_tensors": output_tensors}}) log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===") else: predictor = globals().get(predictor_type).get("predictor") input_tensor = globals().get(predictor_type).get("input_tensor") output_tensors = globals().get(predictor_type).get("output_tensors") # 设置模型输入,运行 input_tensor.copy_from_cpu(inputs) with lock: start_time1 = time.time() predictor.run() gpu_time = round(float(time.time()-start_time1), 2) # 获取模型输出 outputs = [] for output_tensor in output_tensors: output = output_tensor.copy_to_cpu() outputs.append(output) preds = outputs[0] # 压缩numpy compressed_array = io.BytesIO() np.savez_compressed(compressed_array, preds) compressed_array.seek(0) preds = compressed_array.read() # 释放内存 predictor.clear_intermediate_tensor() predictor.try_shrink_memory() finish_time = round(float(time.time()-start_time), 2) log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time)) return base64.b64encode(pickle.dumps({"preds": preds, "gpu_time": gpu_time, "elapse": finish_time})) except Exception as e: finish_time = round(float(time.time()-start_time), 2) traceback.print_exc() return base64.b64encode(pickle.dumps({"preds": None, "gpu_time": 0., "elapse": finish_time})) def _ocr_gpu_redis(): start_time = time.time() log("into _ocr") _global._init() _global.update({"port": globals().get("port")}) log("into _ocr -> _global " + str(time.time()-start_time)) while True: start_time = time.time() try: if redis_db.llen("producer_ocr") == 0: continue log("judge llen " + str(time.time()-start_time)) _time = time.time() result = redis_db.lpop("producer_ocr") if result is None: continue result = pickle.loads(result) log("from producer_ocr time " + str(time.time() - _time)) _time = time.time() inputs = result.get("inputs") # # 解压numpy # decompressed_array = io.BytesIO() # decompressed_array.write(inputs) # decompressed_array.seek(0) # inputs = np.load(decompressed_array, allow_pickle=True)['arr_0'] # log("inputs.shape " + str(inputs.shape)) # log("numpy decompress " + str(time.time()-_time)) args = result.get("args") _uuid = result.get("uuid") predictor_type = result.get("predictor_type") model_type = result.get("model_type") _md5 = result.get("md5") _global.update({"md5": _md5}) log("read data " + str(time.time()-_time)) # 获取对应predictor if globals().get(predictor_type) is None: start_time1 = time.time() log("=== init " + model_type + " " + predictor_type + " model ===") predictor, input_tensor, output_tensors = \ utility.create_predictor(args, predictor_type, logger) globals().update({predictor_type: {"predictor": predictor, "input_tensor": input_tensor, "output_tensors": output_tensors}}) log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===") else: predictor = globals().get(predictor_type).get("predictor") input_tensor = globals().get(predictor_type).get("input_tensor") output_tensors = globals().get(predictor_type).get("output_tensors") # 设置模型输入,运行 input_tensor.copy_from_cpu(inputs) start_time1 = time.time() predictor.run() gpu_time = round(float(time.time()-start_time1), 2) # 获取模型输出 _time = time.time() outputs = [] for output_tensor in output_tensors: output = output_tensor.copy_to_cpu() outputs.append(output) preds = outputs[0] log("output_tensors " + str(time.time()-_time)) # # 压缩numpy # _time = time.time() # compressed_array = io.BytesIO() # np.savez_compressed(compressed_array, preds) # compressed_array.seek(0) # preds = compressed_array.read() # log("numpy compress " + str(time.time()-_time)) # 写入redis finish_time = round(float(time.time()-start_time), 2) _time = time.time() redis_db.hset("consumer_ocr", _uuid, pickle.dumps({"preds": preds, "gpu_time": gpu_time, "elapse": finish_time})) log("to consumer_ocr " + str(time.time()-_time)) # 释放内存 predictor.clear_intermediate_tensor() predictor.try_shrink_memory() log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time)) except Exception as e: traceback.print_exc() # @app.route('/ocr', methods=['POST']) def _ocr_gpu_flask_sm(): start_time = time.time() log("into _ocr") _global._init() _global.update({"port": globals().get("port")}) log("into _ocr -> _global " + str(time.time()-start_time)) start_time = time.time() try: if not request.form: log("ocr no data!") return json.dumps({"text": str([-9]), "bbox": str([-9])}) log("judge request.form " + str(time.time()-start_time)) _time = time.time() result = json.loads(request.form.get("data")) predictor_type = result.get("predictor_type") model_type = result.get("model_type") args = result.get("args") args = namespace_to_dict(args, reverse=True) _md5 = result.get("md5") sm_name = result.get("sm_name") sm_shape = result.get("sm_shape") sm_dtype = result.get("sm_dtype") sm_dtype = get_np_type(sm_dtype) _global.update({"md5": _md5}) log("read data " + str(time.time()-_time)) # 读取共享内存 _time = time.time() if sm_name: inputs = from_share_memory(sm_name, sm_shape, sm_dtype) else: log("from_share_memory failed!") raise Exception log("data from share memory " + sm_name + " " + str(time.time()-_time)) # 获取对应predictor if globals().get(predictor_type) is None: start_time1 = time.time() log("=== init " + model_type + " " + predictor_type + " model ===") predictor, input_tensor, output_tensors = \ utility.create_predictor(args, predictor_type, logger) globals().update({predictor_type: {"predictor": predictor, "input_tensor": input_tensor, "output_tensors": output_tensors}}) log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===") else: predictor = globals().get(predictor_type).get("predictor") input_tensor = globals().get(predictor_type).get("input_tensor") output_tensors = globals().get(predictor_type).get("output_tensors") _time = time.time() with lock: # 设置模型输入 input_tensor.copy_from_cpu(inputs) # 运行 predictor.run() # 获取模型输出 outputs = [] for output_tensor in output_tensors: output = output_tensor.copy_to_cpu() outputs.append(output) preds = outputs[0] gpu_time = round(float(time.time()-_time), 2) log("gpu_time " + str(gpu_time)) _shape = preds.shape _dtype = str(preds.dtype) # 判断前一个读取完 _time = time.time() while True: shm = globals().get("shm") if shm is None: break last_shape = globals().get("last_shape") sm_data = np.ndarray(last_shape, dtype=sm_dtype, buffer=shm.buf) if (sm_data == np.zeros(last_shape)).all(): try: _time1 = time.time() shm.close() shm.unlink() log("release share memory " + str(time.time()-_time1)) except FileNotFoundError: log("share memory " + shm.name + " not exists!") break log("wait for share memory being read " + str(time.time()-_time)) # 数据放入共享内存 _time = time.time() shm = to_share_memory(preds) globals().update({"shm": shm}) globals().update({"last_shape": _shape}) log("data into share memory " + str(shm.name) + " " + str(time.time()-_time)) # 释放内存 _time = time.time() predictor.clear_intermediate_tensor() predictor.try_shrink_memory() log("ocr shrink memory " + str(time.time()-_time)) finish_time = round(float(time.time()-start_time), 2) log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time)) return json.dumps({"gpu_time": gpu_time, "elapse": finish_time, "sm_name": shm.name, "sm_shape": _shape, "sm_dtype": _dtype}) except Exception as e: finish_time = round(float(time.time()-start_time), 2) traceback.print_exc() return json.dumps({"gpu_time": gpu_time, "elapse": finish_time, "sm_name": None, "sm_shape": None, "sm_dtype": None}) def _ocr(): start_time = time.time() log("into _ocr") _global._init() _global.update({"port": globals().get("port")}) log("into _ocr -> _global " + str(time.time()-start_time)) start_time = time.time() sm_list_name = "sml_ocr_"+str(port) try: # 初始化模型 for predictor_type in ["det", "cls", "rec"]: args = init_ocr_args() predictor, input_tensor, output_tensors = \ utility.create_predictor(args, predictor_type, logger) globals().update({predictor_type: {"predictor": predictor, "input_tensor": input_tensor, "output_tensors": output_tensors}}) if predictor == "det": inputs = np.zeros((1, 3, 1024, 1024), dtype=np.float32) else: inputs = np.zeros((30, 3, 32, 64), dtype=np.float32) # init model by running once input_tensor.copy_from_cpu(inputs) predictor.run() outputs = [] for output_tensor in output_tensors: output = output_tensor.copy_to_cpu() outputs.append(output) log("finish init predictor " + predictor_type) # 循环判断是否有新数据需处理 # full_sm_list = globals().get("sm_list") while True: try: full_sm_list = get_share_memory_list(sm_list_name=sm_list_name) except FileNotFoundError: full_sm_list = get_share_memory_list(sm_list_name=sm_list_name, list_size=10) try: if full_sm_list[0] == "1" and full_sm_list[-1] == "1": log("empty_sm_list[0] " + full_sm_list[0]) log("empty_sm_list[-1] " + full_sm_list[-1]) log("empty_sm_list[1] " + full_sm_list[1]) log("wait for " + str(time.time()-start_time)) break except ValueError: continue start_time = time.time() _time = time.time() _md5 = full_sm_list[1] model_type = full_sm_list[2] predictor_type = full_sm_list[3] args = full_sm_list[4] args = namespace_to_dict(eval(args), reverse=True) sm_name = full_sm_list[5] sm_shape = full_sm_list[6] sm_shape = eval(sm_shape) sm_dtype = full_sm_list[7] sm_dtype = get_np_type(sm_dtype) _global.update({"md5": _md5}) log("read data " + str(time.time()-_time)) # 读取共享内存 _time = time.time() if sm_name: inputs = from_share_memory(sm_name, sm_shape, sm_dtype) log(predictor_type + " inputs shape " + str(inputs.shape)) else: log("from_share_memory failed!") raise Exception log("data from share memory " + sm_name + " " + str(time.time()-_time)) # 获取对应predictor if globals().get(predictor_type) is None: start_time1 = time.time() log("=== init " + model_type + " " + predictor_type + " model ===") predictor, input_tensor, output_tensors = \ utility.create_predictor(args, predictor_type, logger) globals().update({predictor_type: {"predictor": predictor, "input_tensor": input_tensor, "output_tensors": output_tensors}}) log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===") else: predictor = globals().get(predictor_type).get("predictor") input_tensor = globals().get(predictor_type).get("input_tensor") output_tensors = globals().get(predictor_type).get("output_tensors") _time = time.time() with lock: # 设置模型输入 input_tensor.copy_from_cpu(inputs) # 运行 predictor.run() # 获取模型输出 outputs = [] for output_tensor in output_tensors: output = output_tensor.copy_to_cpu() outputs.append(output) preds = outputs[0] gpu_time = round(float(time.time()-_time), 2) log("gpu_time " + str(gpu_time)) # 数据放入共享内存 _time = time.time() # 先释放之前的同名share memory release_share_memory(get_share_memory(sm_name)) # 写入共享内存 shm = to_share_memory(preds, sm_name) full_sm_list[5] = shm.name full_sm_list[6] = str(preds.shape) full_sm_list[7] = str(preds.dtype) full_sm_list[8] = str(gpu_time) full_sm_list[-1] = "0" log("data into share memory " + str(shm.name) + " " + str(time.time()-_time)) # 释放共享内存 close_share_memory_list(full_sm_list) # 释放内存 _time = time.time() predictor.clear_intermediate_tensor() predictor.try_shrink_memory() log("ocr shrink memory " + str(time.time()-_time)) finish_time = round(float(time.time()-start_time), 2) log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time)) except Exception as e: finish_time = round(float(time.time()-start_time), 2) traceback.print_exc() raise def init_ocr_args(): return argparse.Namespace( use_gpu=True, ir_optim=True, use_tensorrt=False, gpu_mem=8000, image_dir='', det_algorithm='DB', det_model_dir=ocr_model_dir+"det", det_limit_side_len=1280, det_limit_type='max', det_db_thresh=0.1, # det_db_box_thresh 漏行 调小 det_db_box_thresh=0.1, # det_db_unclip_ratio 检测框的贴近程度 det_db_unclip_ratio=2.5, # 对文字膨胀操作 use_dilation=False, det_east_score_thresh=0.8, det_east_cover_thresh=0.1, det_east_nms_thresh=0.2, rec_algorithm='CRNN', rec_model_dir=ocr_model_dir+"rec/ch", rec_image_shape="3, 32, 1000", rec_char_type='ch', rec_batch_num=30, max_text_length=128, rec_char_dict_path='./ppocr/utils/ppocr_keys_v1.txt', use_space_char=True, drop_score=0.5, cls_model_dir=ocr_model_dir+"cls", cls_image_shape="3, 32, 1000", label_list=['0', '180'], cls_batch_num=30, cls_thresh=0.9, enable_mkldnn=False, use_zero_copy_run=True, use_pdserving=False, lang='ch', det=True, rec=True, use_angle_cls=False) if __name__ == '__main__': if len(sys.argv) == 2: port = int(sys.argv[1]) using_gpu_index = 0 elif len(sys.argv) == 3: port = int(sys.argv[1]) using_gpu_index = int(sys.argv[2]) else: port = 17000 using_gpu_index = 0 _global._init() _global.update({"port": str(port)}) globals().update({"port": str(port)}) ip = get_intranet_ip() os.environ['CUDA_VISIBLE_DEVICES'] = str(using_gpu_index) # app.run(host='0.0.0.0', port=port, processes=1, threaded=False, debug=False) # app.run() # log("OCR running "+str(port)) while True: _ocr()