| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- # encoding=utf8
- import base64
- import io
- import json
- import pickle
- import sys
- import os
- import threading
- import zlib
- import numpy as np
- import redis
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- import time
- import traceback
- os.environ['FLAGS_eager_delete_tensor_gb'] = '0'
- from format_convert.utils import request_post, test_gpu, get_intranet_ip, log, get_md5_from_bytes
- from flask import Flask, request
- from format_convert import _global
- from ocr.tools.infer import utility
- from ocr.ppocr.utils.logging import get_logger
- logger = get_logger()
- # 接口配置
- app = Flask(__name__)
- lock = threading.RLock()
- redis_db = redis.StrictRedis(host='127.0.0.1', port='6379',
- db=1, password='bidi123456', health_check_interval=300)
- # @app.route('/ocr', methods=['POST'])
- def _ocr_gpu_flask():
- start_time = time.time()
- log("into _ocr")
- _global._init()
- _global.update({"port": globals().get("port")})
- log("into _ocr -> _global " + str(time.time()-start_time))
- start_time = time.time()
- try:
- if not request.form:
- log("ocr no data!")
- return json.dumps({"text": str([-9]), "bbox": str([-9])})
- log("judge request.form " + str(time.time()-start_time))
- start_time1 = time.time()
- result = pickle.loads(base64.b64decode(request.form.get("data")))
- # 解压
- inputs = result.get("inputs")
- # 解压numpy
- decompressed_array = io.BytesIO()
- decompressed_array.write(inputs)
- decompressed_array.seek(0)
- inputs = np.load(decompressed_array, allow_pickle=True)['arr_0']
- log("inputs.shape" + str(inputs.shape))
- args = result.get("args")
- predictor_type = result.get("predictor_type")
- model_type = result.get("model_type")
- _md5 = result.get("md5")
- _global.update({"md5": _md5})
- log("read data " + str(time.time()-start_time1))
- # 获取对应predictor
- if globals().get(predictor_type) is None:
- start_time1 = time.time()
- log("=== init " + model_type + " " + predictor_type + " model ===")
- predictor, input_tensor, output_tensors = \
- utility.create_predictor(args, predictor_type, logger)
- globals().update({predictor_type: {"predictor": predictor,
- "input_tensor": input_tensor,
- "output_tensors": output_tensors}})
- log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
- else:
- predictor = globals().get(predictor_type).get("predictor")
- input_tensor = globals().get(predictor_type).get("input_tensor")
- output_tensors = globals().get(predictor_type).get("output_tensors")
- # 设置模型输入,运行
- input_tensor.copy_from_cpu(inputs)
- with lock:
- start_time1 = time.time()
- predictor.run()
- gpu_time = round(float(time.time()-start_time1), 2)
- # 获取模型输出
- outputs = []
- for output_tensor in output_tensors:
- output = output_tensor.copy_to_cpu()
- outputs.append(output)
- preds = outputs[0]
- # 压缩numpy
- compressed_array = io.BytesIO()
- np.savez_compressed(compressed_array, preds)
- compressed_array.seek(0)
- preds = compressed_array.read()
- # 释放内存
- predictor.clear_intermediate_tensor()
- predictor.try_shrink_memory()
- finish_time = round(float(time.time()-start_time), 2)
- log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
- return base64.b64encode(pickle.dumps({"preds": preds, "gpu_time": gpu_time, "elapse": finish_time}))
- except Exception as e:
- finish_time = round(float(time.time()-start_time), 2)
- traceback.print_exc()
- return base64.b64encode(pickle.dumps({"preds": None, "gpu_time": 0., "elapse": finish_time}))
- def _ocr_gpu_redis():
- start_time = time.time()
- log("into _ocr")
- _global._init()
- _global.update({"port": globals().get("port")})
- log("into _ocr -> _global " + str(time.time()-start_time))
- while True:
- start_time = time.time()
- try:
- if redis_db.llen("producer_ocr") == 0:
- continue
- log("judge llen " + str(time.time()-start_time))
- _time = time.time()
- result = redis_db.lpop("producer_ocr")
- if result is None:
- continue
- result = pickle.loads(result)
- log("from producer_ocr time " + str(time.time() - _time))
- _time = time.time()
- inputs = result.get("inputs")
- # # 解压numpy
- # decompressed_array = io.BytesIO()
- # decompressed_array.write(inputs)
- # decompressed_array.seek(0)
- # inputs = np.load(decompressed_array, allow_pickle=True)['arr_0']
- # log("inputs.shape " + str(inputs.shape))
- # log("numpy decompress " + str(time.time()-_time))
- args = result.get("args")
- _uuid = result.get("uuid")
- predictor_type = result.get("predictor_type")
- model_type = result.get("model_type")
- _md5 = result.get("md5")
- _global.update({"md5": _md5})
- log("read data " + str(time.time()-_time))
- # 获取对应predictor
- if globals().get(predictor_type) is None:
- start_time1 = time.time()
- log("=== init " + model_type + " " + predictor_type + " model ===")
- predictor, input_tensor, output_tensors = \
- utility.create_predictor(args, predictor_type, logger)
- globals().update({predictor_type: {"predictor": predictor,
- "input_tensor": input_tensor,
- "output_tensors": output_tensors}})
- log("=== init " + model_type + " " + predictor_type + " model " + str(round(time.time()-start_time1, 2)) + " ===")
- else:
- predictor = globals().get(predictor_type).get("predictor")
- input_tensor = globals().get(predictor_type).get("input_tensor")
- output_tensors = globals().get(predictor_type).get("output_tensors")
- # 设置模型输入,运行
- input_tensor.copy_from_cpu(inputs)
- start_time1 = time.time()
- predictor.run()
- gpu_time = round(float(time.time()-start_time1), 2)
- # 获取模型输出
- _time = time.time()
- outputs = []
- for output_tensor in output_tensors:
- output = output_tensor.copy_to_cpu()
- outputs.append(output)
- preds = outputs[0]
- log("output_tensors " + str(time.time()-_time))
- # # 压缩numpy
- # _time = time.time()
- # compressed_array = io.BytesIO()
- # np.savez_compressed(compressed_array, preds)
- # compressed_array.seek(0)
- # preds = compressed_array.read()
- # log("numpy compress " + str(time.time()-_time))
- # 写入redis
- finish_time = round(float(time.time()-start_time), 2)
- _time = time.time()
- redis_db.hset("consumer_ocr", _uuid, pickle.dumps({"preds": preds, "gpu_time": gpu_time, "elapse": finish_time}))
- log("to consumer_ocr " + str(time.time()-_time))
- # 释放内存
- predictor.clear_intermediate_tensor()
- predictor.try_shrink_memory()
- log("ocr model predict time - " + str(predictor_type) + " - " + str(gpu_time) + " " + str(finish_time))
- except Exception as e:
- traceback.print_exc()
- if __name__ == '__main__':
- if len(sys.argv) == 2:
- port = int(sys.argv[1])
- using_gpu_index = 0
- elif len(sys.argv) == 3:
- port = int(sys.argv[1])
- using_gpu_index = int(sys.argv[2])
- else:
- port = 17000
- using_gpu_index = 0
- _global._init()
- _global.update({"port": str(port)})
- globals().update({"port": str(port)})
- ip = get_intranet_ip()
- os.environ['CUDA_VISIBLE_DEVICES'] = str(using_gpu_index)
- # app.run(host='0.0.0.0', port=port, processes=1, threaded=False, debug=False)
- app.run()
- # log("OCR running "+str(port))
- # _ocr()
|