123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536 |
- # encoding=utf8
- import argparse
- import base64
- import io
- import json
- import pickle
- import sys
- import os
- import threading
- import numpy as np
- import redis
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- import time
- import traceback
- os.environ['FLAGS_eager_delete_tensor_gb'] = '0'
- from format_convert.utils import request_post, test_gpu, get_intranet_ip, log, get_md5_from_bytes, \
- to_share_memory, from_share_memory, get_np_type, namespace_to_dict, get_share_memory_list, get_ip_port, \
- release_share_memory, get_share_memory, close_share_memory_list
- from flask import Flask, request
- from format_convert import _global
- from ocr.tools.infer import utility
- from ocr.ppocr.utils.logging import get_logger
- logger = get_logger()
- # 接口配置
- app = Flask(__name__)
- ocr_model_dir = os.path.dirname(os.path.abspath(__file__)) + "/model/2.0/"
- lock = threading.RLock()
- ip_port_dict = get_ip_port()
- ip = 'http://127.0.0.1'
- ocr_port_list = ip_port_dict.get(ip).get("ocr")
- otr_port_list = ip_port_dict.get(ip).get("otr")
- # redis_db = redis.StrictRedis(host='127.0.0.1', port='6379',
- # db=1, password='bidi123456', health_check_interval=300)
- redis_db = None
- # @app.route('/ocr', methods=['POST'])
- def _ocr_gpu_flask():
- start_time = time.time()
- log("into _ocr")
- _global._init()
- _global.update({"port": globals().get("port")})
- log("into _ocr -> _global " + str(time.time()-start_time))
- start_time = time.time()
- try:
- if not request.form:
- log("ocr no data!")
- return json.dumps({"text": str([-9]), "bbox": str([-9])})
- log("judge request.form " + str(time.time()-start_time))
- start_time1 = time.time()
- result = pickle.loads(base64.b64decode(request.form.get("data")))
- # 解压
- inputs = result.get("inputs")
- # 解压numpy
- decompressed_array = io.BytesIO()
- decompressed_array.write(inputs)
- decompressed_array.seek(0)
- inputs = np.load(decompressed_array, allow_pickle=True)['arr_0']
- log("inputs.shape" + str(inputs.shape))
- args = result.get("args")
- predictor_type = result.get("predictor_type")
- model_type = result.get("model_type")
- _md5 = result.get("md5")
- _global.update({"md5": _md5})
- log("read data " + str(time.time()-start_time1))
- # 获取对应predictor
- if globals().get(predictor_type) is None:
- start_time1 = time.time()
- log("=== init " + model_type + " " + predictor_type + " model ===")
- predictor, input_tensor, output_tensors = \
- utility.create_predictor(args, predictor_type, logger)
- globals().update({predictor_type: {"predictor": predictor,
- "input_tensor": input_tensor,
- "output_tensors": output_tensors}})
- log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
- else:
- predictor = globals().get(predictor_type).get("predictor")
- input_tensor = globals().get(predictor_type).get("input_tensor")
- output_tensors = globals().get(predictor_type).get("output_tensors")
- # 设置模型输入,运行
- input_tensor.copy_from_cpu(inputs)
- with lock:
- start_time1 = time.time()
- predictor.run()
- gpu_time = round(float(time.time()-start_time1), 2)
- # 获取模型输出
- outputs = []
- for output_tensor in output_tensors:
- output = output_tensor.copy_to_cpu()
- outputs.append(output)
- preds = outputs[0]
- # 压缩numpy
- compressed_array = io.BytesIO()
- np.savez_compressed(compressed_array, preds)
- compressed_array.seek(0)
- preds = compressed_array.read()
- # 释放内存
- predictor.clear_intermediate_tensor()
- predictor.try_shrink_memory()
- finish_time = round(float(time.time()-start_time), 2)
- log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
- return base64.b64encode(pickle.dumps({"preds": preds, "gpu_time": gpu_time, "elapse": finish_time}))
- except Exception as e:
- finish_time = round(float(time.time()-start_time), 2)
- traceback.print_exc()
- return base64.b64encode(pickle.dumps({"preds": None, "gpu_time": 0., "elapse": finish_time}))
- def _ocr_gpu_redis():
- start_time = time.time()
- log("into _ocr")
- _global._init()
- _global.update({"port": globals().get("port")})
- log("into _ocr -> _global " + str(time.time()-start_time))
- while True:
- start_time = time.time()
- try:
- if redis_db.llen("producer_ocr") == 0:
- continue
- log("judge llen " + str(time.time()-start_time))
- _time = time.time()
- result = redis_db.lpop("producer_ocr")
- if result is None:
- continue
- result = pickle.loads(result)
- log("from producer_ocr time " + str(time.time() - _time))
- _time = time.time()
- inputs = result.get("inputs")
- # # 解压numpy
- # decompressed_array = io.BytesIO()
- # decompressed_array.write(inputs)
- # decompressed_array.seek(0)
- # inputs = np.load(decompressed_array, allow_pickle=True)['arr_0']
- # log("inputs.shape " + str(inputs.shape))
- # log("numpy decompress " + str(time.time()-_time))
- args = result.get("args")
- _uuid = result.get("uuid")
- predictor_type = result.get("predictor_type")
- model_type = result.get("model_type")
- _md5 = result.get("md5")
- _global.update({"md5": _md5})
- log("read data " + str(time.time()-_time))
- # 获取对应predictor
- if globals().get(predictor_type) is None:
- start_time1 = time.time()
- log("=== init " + model_type + " " + predictor_type + " model ===")
- predictor, input_tensor, output_tensors = \
- utility.create_predictor(args, predictor_type, logger)
- globals().update({predictor_type: {"predictor": predictor,
- "input_tensor": input_tensor,
- "output_tensors": output_tensors}})
- log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
- else:
- predictor = globals().get(predictor_type).get("predictor")
- input_tensor = globals().get(predictor_type).get("input_tensor")
- output_tensors = globals().get(predictor_type).get("output_tensors")
- # 设置模型输入,运行
- input_tensor.copy_from_cpu(inputs)
- start_time1 = time.time()
- predictor.run()
- gpu_time = round(float(time.time()-start_time1), 2)
- # 获取模型输出
- _time = time.time()
- outputs = []
- for output_tensor in output_tensors:
- output = output_tensor.copy_to_cpu()
- outputs.append(output)
- preds = outputs[0]
- log("output_tensors " + str(time.time()-_time))
- # # 压缩numpy
- # _time = time.time()
- # compressed_array = io.BytesIO()
- # np.savez_compressed(compressed_array, preds)
- # compressed_array.seek(0)
- # preds = compressed_array.read()
- # log("numpy compress " + str(time.time()-_time))
- # 写入redis
- finish_time = round(float(time.time()-start_time), 2)
- _time = time.time()
- redis_db.hset("consumer_ocr", _uuid, pickle.dumps({"preds": preds, "gpu_time": gpu_time, "elapse": finish_time}))
- log("to consumer_ocr " + str(time.time()-_time))
- # 释放内存
- predictor.clear_intermediate_tensor()
- predictor.try_shrink_memory()
- log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
- except Exception as e:
- traceback.print_exc()
- # @app.route('/ocr', methods=['POST'])
- def _ocr_gpu_flask_sm():
- start_time = time.time()
- log("into _ocr")
- _global._init()
- _global.update({"port": globals().get("port")})
- log("into _ocr -> _global " + str(time.time()-start_time))
- start_time = time.time()
- try:
- if not request.form:
- log("ocr no data!")
- return json.dumps({"text": str([-9]), "bbox": str([-9])})
- log("judge request.form " + str(time.time()-start_time))
- _time = time.time()
- result = json.loads(request.form.get("data"))
- predictor_type = result.get("predictor_type")
- model_type = result.get("model_type")
- args = result.get("args")
- args = namespace_to_dict(args, reverse=True)
- _md5 = result.get("md5")
- sm_name = result.get("sm_name")
- sm_shape = result.get("sm_shape")
- sm_dtype = result.get("sm_dtype")
- sm_dtype = get_np_type(sm_dtype)
- _global.update({"md5": _md5})
- log("read data " + str(time.time()-_time))
- # 读取共享内存
- _time = time.time()
- if sm_name:
- inputs = from_share_memory(sm_name, sm_shape, sm_dtype)
- else:
- log("from_share_memory failed!")
- raise Exception
- log("data from share memory " + sm_name + " " + str(time.time()-_time))
- # 获取对应predictor
- if globals().get(predictor_type) is None:
- start_time1 = time.time()
- log("=== init " + model_type + " " + predictor_type + " model ===")
- predictor, input_tensor, output_tensors = \
- utility.create_predictor(args, predictor_type, logger)
- globals().update({predictor_type: {"predictor": predictor,
- "input_tensor": input_tensor,
- "output_tensors": output_tensors}})
- log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
- else:
- predictor = globals().get(predictor_type).get("predictor")
- input_tensor = globals().get(predictor_type).get("input_tensor")
- output_tensors = globals().get(predictor_type).get("output_tensors")
- _time = time.time()
- with lock:
- # 设置模型输入
- input_tensor.copy_from_cpu(inputs)
- # 运行
- predictor.run()
- # 获取模型输出
- outputs = []
- for output_tensor in output_tensors:
- output = output_tensor.copy_to_cpu()
- outputs.append(output)
- preds = outputs[0]
- gpu_time = round(float(time.time()-_time), 2)
- log("gpu_time " + str(gpu_time))
- _shape = preds.shape
- _dtype = str(preds.dtype)
- # 判断前一个读取完
- _time = time.time()
- while True:
- shm = globals().get("shm")
- if shm is None:
- break
- last_shape = globals().get("last_shape")
- sm_data = np.ndarray(last_shape, dtype=sm_dtype, buffer=shm.buf)
- if (sm_data == np.zeros(last_shape)).all():
- try:
- _time1 = time.time()
- shm.close()
- shm.unlink()
- log("release share memory " + str(time.time()-_time1))
- except FileNotFoundError:
- log("share memory " + shm.name + " not exists!")
- break
- log("wait for share memory being read " + str(time.time()-_time))
- # 数据放入共享内存
- _time = time.time()
- shm = to_share_memory(preds)
- globals().update({"shm": shm})
- globals().update({"last_shape": _shape})
- log("data into share memory " + str(shm.name) + " " + str(time.time()-_time))
- # 释放内存
- _time = time.time()
- predictor.clear_intermediate_tensor()
- predictor.try_shrink_memory()
- log("ocr shrink memory " + str(time.time()-_time))
- finish_time = round(float(time.time()-start_time), 2)
- log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
- return json.dumps({"gpu_time": gpu_time, "elapse": finish_time,
- "sm_name": shm.name, "sm_shape": _shape, "sm_dtype": _dtype})
- except Exception as e:
- finish_time = round(float(time.time()-start_time), 2)
- traceback.print_exc()
- return json.dumps({"gpu_time": gpu_time, "elapse": finish_time,
- "sm_name": None, "sm_shape": None, "sm_dtype": None})
- def _ocr():
- start_time = time.time()
- log("into _ocr")
- _global._init()
- _global.update({"port": globals().get("port")})
- log("into _ocr -> _global " + str(time.time()-start_time))
- start_time = time.time()
- sm_list_name = "sml_ocr_"+str(port)
- try:
- # 初始化模型
- for predictor_type in ["det", "cls", "rec"]:
- args = init_ocr_args()
- predictor, input_tensor, output_tensors = \
- utility.create_predictor(args, predictor_type, logger)
- globals().update({predictor_type: {"predictor": predictor,
- "input_tensor": input_tensor,
- "output_tensors": output_tensors}})
- if predictor == "det":
- inputs = np.zeros((1, 3, 1024, 1024), dtype=np.float32)
- else:
- inputs = np.zeros((30, 3, 32, 64), dtype=np.float32)
- # init model by running once
- input_tensor.copy_from_cpu(inputs)
- predictor.run()
- outputs = []
- for output_tensor in output_tensors:
- output = output_tensor.copy_to_cpu()
- outputs.append(output)
- log("finish init predictor " + predictor_type)
- # 循环判断是否有新数据需处理
- # full_sm_list = globals().get("sm_list")
- while True:
- try:
- full_sm_list = get_share_memory_list(sm_list_name=sm_list_name)
- except FileNotFoundError:
- full_sm_list = get_share_memory_list(sm_list_name=sm_list_name, list_size=10)
- try:
- if full_sm_list[0] == "1" and full_sm_list[-1] == "1":
- log("empty_sm_list[0] " + full_sm_list[0])
- log("empty_sm_list[-1] " + full_sm_list[-1])
- log("empty_sm_list[1] " + full_sm_list[1])
- log("wait for " + str(time.time()-start_time))
- break
- except ValueError:
- continue
- start_time = time.time()
- _time = time.time()
- _md5 = full_sm_list[1]
- model_type = full_sm_list[2]
- predictor_type = full_sm_list[3]
- args = full_sm_list[4]
- args = namespace_to_dict(eval(args), reverse=True)
- sm_name = full_sm_list[5]
- sm_shape = full_sm_list[6]
- sm_shape = eval(sm_shape)
- sm_dtype = full_sm_list[7]
- sm_dtype = get_np_type(sm_dtype)
- _global.update({"md5": _md5})
- log("read data " + str(time.time()-_time))
- # 读取共享内存
- _time = time.time()
- if sm_name:
- inputs = from_share_memory(sm_name, sm_shape, sm_dtype)
- log(predictor_type + " inputs shape " + str(inputs.shape))
- else:
- log("from_share_memory failed!")
- raise Exception
- log("data from share memory " + sm_name + " " + str(time.time()-_time))
- # 获取对应predictor
- if globals().get(predictor_type) is None:
- start_time1 = time.time()
- log("=== init " + model_type + " " + predictor_type + " model ===")
- predictor, input_tensor, output_tensors = \
- utility.create_predictor(args, predictor_type, logger)
- globals().update({predictor_type: {"predictor": predictor,
- "input_tensor": input_tensor,
- "output_tensors": output_tensors}})
- log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
- else:
- predictor = globals().get(predictor_type).get("predictor")
- input_tensor = globals().get(predictor_type).get("input_tensor")
- output_tensors = globals().get(predictor_type).get("output_tensors")
- _time = time.time()
- with lock:
- # 设置模型输入
- input_tensor.copy_from_cpu(inputs)
- # 运行
- predictor.run()
- # 获取模型输出
- outputs = []
- for output_tensor in output_tensors:
- output = output_tensor.copy_to_cpu()
- outputs.append(output)
- preds = outputs[0]
- gpu_time = round(float(time.time()-_time), 2)
- log("gpu_time " + str(gpu_time))
- # 数据放入共享内存
- _time = time.time()
- # 先释放之前的同名share memory
- release_share_memory(get_share_memory(sm_name))
- # 写入共享内存
- shm = to_share_memory(preds, sm_name)
- full_sm_list[5] = shm.name
- full_sm_list[6] = str(preds.shape)
- full_sm_list[7] = str(preds.dtype)
- full_sm_list[8] = str(gpu_time)
- full_sm_list[-1] = "0"
- log("data into share memory " + str(shm.name) + " " + str(time.time()-_time))
- # 释放共享内存
- close_share_memory_list(full_sm_list)
- # 释放内存
- _time = time.time()
- predictor.clear_intermediate_tensor()
- predictor.try_shrink_memory()
- log("ocr shrink memory " + str(time.time()-_time))
- finish_time = round(float(time.time()-start_time), 2)
- log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
- except Exception as e:
- finish_time = round(float(time.time()-start_time), 2)
- traceback.print_exc()
- raise
- def init_ocr_args():
- return argparse.Namespace(
- use_gpu=True,
- ir_optim=True,
- use_tensorrt=False,
- gpu_mem=8000,
- image_dir='',
- det_algorithm='DB',
- det_model_dir=ocr_model_dir+"det",
- det_limit_side_len=1280,
- det_limit_type='max',
- det_db_thresh=0.1,
- # det_db_box_thresh 漏行 调小
- det_db_box_thresh=0.1,
- # det_db_unclip_ratio 检测框的贴近程度
- det_db_unclip_ratio=2.5,
- # 对文字膨胀操作
- use_dilation=False,
- det_east_score_thresh=0.8,
- det_east_cover_thresh=0.1,
- det_east_nms_thresh=0.2,
- rec_algorithm='CRNN',
- rec_model_dir=ocr_model_dir+"rec/ch",
- rec_image_shape="3, 32, 1000",
- rec_char_type='ch',
- rec_batch_num=30,
- max_text_length=128,
- rec_char_dict_path='./ppocr/utils/ppocr_keys_v1.txt',
- use_space_char=True,
- drop_score=0.5,
- cls_model_dir=ocr_model_dir+"cls",
- cls_image_shape="3, 32, 1000",
- label_list=['0', '180'],
- cls_batch_num=30,
- cls_thresh=0.9,
- enable_mkldnn=False,
- use_zero_copy_run=True,
- use_pdserving=False,
- lang='ch',
- det=True,
- rec=True,
- use_angle_cls=False)
- if __name__ == '__main__':
- if len(sys.argv) == 2:
- port = int(sys.argv[1])
- using_gpu_index = 0
- elif len(sys.argv) == 3:
- port = int(sys.argv[1])
- using_gpu_index = int(sys.argv[2])
- else:
- port = 17000
- using_gpu_index = 0
- _global._init()
- _global.update({"port": str(port)})
- globals().update({"port": str(port)})
- ip = get_intranet_ip()
- os.environ['CUDA_VISIBLE_DEVICES'] = str(using_gpu_index)
- # app.run(host='0.0.0.0', port=port, processes=1, threaded=False, debug=False)
- # app.run()
- # log("OCR running "+str(port))
- while True:
- _ocr()
|