# encoding=utf8 import base64 import io import json import os import pickle import threading import traceback # os.environ['TF_XLA_FLAGS'] = '--tf_xla_cpu_global_jit' # os.environ['CUDA_VISIBLE_DEVICES'] = "0" import redis import tensorflow as tf try: gpus = tf.config.list_physical_devices('GPU') if len(gpus) > 0: tf.config.experimental.set_virtual_device_configuration( gpus[0], [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2000)]) except: traceback.print_exc() pass import sys sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") import time import logging # from table_line import * import cv2 import numpy as np from flask import Flask, request from format_convert.utils import request_post, judge_error_code, get_intranet_ip, log, get_md5_from_bytes, get_platform, \ to_share_memory, from_share_memory, get_np_type, get_share_memory_list, release_share_memory, get_share_memory, \ close_share_memory_list from otr.table_line import table_net, table_line, table_preprocess, table_postprocess from format_convert import _global # 接口配置 app = Flask(__name__) lock = threading.RLock() # redis_db = redis.StrictRedis(host='127.0.0.1', port='6379', # db=1, password='bidi123456', health_check_interval=300) redis_db = None # @app.route('/otr', methods=['POST']) def _otr_flask(): start_time = time.time() log("into _otr") _global._init() _global.update({"port": globals().get("port")}) log("into _otr -> _global " + str(time.time()-start_time)) start_time = time.time() try: if not request.form: log("otr no data!") return json.dumps({"list_line": str([-9])}) log("judge request.form " + str(time.time()-start_time)) start_time1 = time.time() # 反序列化 result = pickle.loads(base64.b64decode(request.form.get("data"))) inputs = result.get("inputs") # 解压numpy decompressed_array = io.BytesIO() decompressed_array.write(inputs) decompressed_array.seek(0) inputs = np.load(decompressed_array, allow_pickle=True)['arr_0'] log("inputs.shape" + str(inputs.shape)) predictor_type = result.get("predictor_type") model_type = result.get("model_type") _md5 = result.get("md5") _global.update({"md5": _md5}) log("read data " + str(time.time()-start_time1)) # 获取模型 model = globals().get(model_type) if model is None: start_time1 = time.time() log("=== init " + model_type + " model ===") model = OtrModels().get_model() globals().update({model_type: model}) log("=== init " + model_type + " model " + str(round(time.time()-start_time1, 2)) + " ===") # 运行 with lock: start_time1 = time.time() pred = model.predict(inputs) pred = pred[0] log("pred.shape " + str(pred.shape)) # 压缩numpy compressed_array = io.BytesIO() np.savez_compressed(compressed_array, pred) compressed_array.seek(0) pred = compressed_array.read() gpu_time = round(float(time.time()-start_time1), 2) finish_time = round(float(time.time()-start_time), 2) log("otr model predict time " + str(gpu_time) + " " + str(finish_time)) return base64.b64encode(pickle.dumps({"preds": pred, "gpu_time": gpu_time, "elapse": finish_time})) except Exception as e: finish_time = round(float(time.time()-start_time), 2) traceback.print_exc() return base64.b64encode(pickle.dumps({"preds": None, "gpu_time": 0., "elapse": finish_time})) def _otr_redis(): start_time = time.time() log("into _otr") _global._init() _global.update({"port": globals().get("port")}) log("into _otr -> _global " + str(time.time()-start_time)) while True: start_time = time.time() try: if redis_db.llen("producer_otr") == 0: continue log("judge llen " + str(time.time()-start_time)) _time = time.time() result = redis_db.lpop("producer_otr") if result is None: continue result = pickle.loads(result) log("from producer_otr time " + str(time.time() - _time)) _time = time.time() inputs = result.get("inputs") # # 解压numpy # decompressed_array = io.BytesIO() # decompressed_array.write(inputs) # decompressed_array.seek(0) # inputs = np.load(decompressed_array, allow_pickle=True)['arr_0'] # log("inputs.shape " + str(inputs.shape)) # log("numpy decompress " + str(time.time()-_time)) predictor_type = result.get("predictor_type") _uuid = result.get("uuid") model_type = result.get("model_type") _md5 = result.get("md5") _global.update({"md5": _md5}) log("read data " + str(time.time()-_time)) # 获取模型 model = globals().get(model_type) if model is None: start_time1 = time.time() log("=== init " + model_type + " model ===") model = OtrModels().get_model() globals().update({model_type: model}) log("=== init " + model_type + " model " + str(round(time.time()-start_time1, 2)) + " ===") # 运行 start_time1 = time.time() pred = model.predict(inputs) pred = pred[0] log("pred.shape " + str(pred.shape)) # # 压缩numpy # _time = time.time() # compressed_array = io.BytesIO() # np.savez_compressed(compressed_array, pred) # compressed_array.seek(0) # pred = compressed_array.read() # log("numpy compress " + str(time.time()-_time)) # 写入redis gpu_time = round(float(time.time()-start_time1), 2) finish_time = round(float(time.time()-start_time), 2) redis_db.hset("consumer_otr", _uuid, pickle.dumps({"preds": pred, "gpu_time": gpu_time, "elapse": finish_time})) log("to consumer_otr " + str(time.time()-_time)) log("otr model predict time " + str(gpu_time) + " " + str(finish_time)) except Exception as e: traceback.print_exc() @app.route('/otr', methods=['POST']) def _otr_flask_sm(): start_time = time.time() log("into _otr") _global._init() _global.update({"port": globals().get("port")}) log("into _otr -> _global " + str(time.time()-start_time)) start_time = time.time() try: if not request.form: log("otr no data!") return json.dumps({"list_line": str([-9])}) log("judge request.form " + str(time.time()-start_time)) _time = time.time() result = json.loads(request.form.get("data")) model_type = result.get("model_type") args = result.get("args") _md5 = result.get("md5") sm_name = result.get("sm_name") sm_shape = result.get("sm_shape") sm_dtype = result.get("sm_dtype") sm_dtype = get_np_type(sm_dtype) _global.update({"md5": _md5}) log("read data " + str(time.time()-_time)) # 读取共享内存 _time = time.time() if sm_name: inputs = from_share_memory(sm_name, sm_shape, sm_dtype) else: log("from_share_memory failed!") raise Exception log("data from share memory " + sm_name + " " + str(time.time()-_time)) # 获取模型 model = globals().get(model_type) if model is None: start_time1 = time.time() log("=== init " + model_type + " model ===") model = OtrModels().get_model() globals().update({model_type: model}) log("=== init " + model_type + " model " + str(round(time.time()-start_time1, 2)) + " ===") # 运行 _time = time.time() with lock: pred = model.predict(inputs) pred = pred[0] _shape = pred.shape _dtype = str(pred.dtype) log("pred.shape " + str(pred.shape)) gpu_time = round(float(time.time()-_time), 2) # 判断前一个读取完 _time = time.time() while True: shm = globals().get("shm") if shm is None: break last_shape = globals().get("last_shape") sm_data = np.ndarray(last_shape, dtype=sm_dtype, buffer=shm.buf) if (sm_data == np.zeros(last_shape)).all(): try: _time1 = time.time() shm.close() shm.unlink() log("release share memory " + str(time.time()-_time1)) except FileNotFoundError: log("share memory " + shm.name + " not exists!") break log("wait for share memory being read " + str(time.time()-_time)) # 数据放入共享内存 _time = time.time() shm = to_share_memory(pred) globals().update({"shm": shm}) globals().update({"last_shape": _shape}) log("data into share memory " + str(shm.name) + " " + str(time.time()-_time)) finish_time = round(float(time.time()-start_time), 2) log("otr model predict time " + str(gpu_time) + " " + str(finish_time)) return json.dumps({"gpu_time": gpu_time, "elapse": finish_time, "sm_name": shm.name, "sm_shape": _shape, "sm_dtype": _dtype}) except Exception as e: finish_time = round(float(time.time()-start_time), 2) traceback.print_exc() return json.dumps({"gpu_time": 0., "elapse": finish_time, "sm_name": None, "sm_shape": None, "sm_dtype": None}) def _otr(): start_time = time.time() log("into _ocr") _global._init() _global.update({"port": globals().get("port")}) log("into _ocr -> _global " + str(time.time()-start_time)) start_time = time.time() try: # 循环判断是否有新数据需处理 while True: try: full_sm_list = get_share_memory_list(sm_list_name="sml_otr_"+str(globals().get("port"))) except FileNotFoundError: full_sm_list = get_share_memory_list(sm_list_name="sml_otr_"+str(globals().get("port")), list_size=10) try: if full_sm_list[0] == "1" and full_sm_list[-1] == "1": log("empty_sm_list[0] " + full_sm_list[0]) log("empty_sm_list[-1] " + full_sm_list[-1]) log("empty_sm_list[1] " + full_sm_list[1]) log("wait for " + str(time.time()-start_time)) break except ValueError: continue start_time = time.time() _time = time.time() _md5 = full_sm_list[1] model_type = full_sm_list[2] sm_name = full_sm_list[5] sm_shape = full_sm_list[6] sm_shape = eval(sm_shape) sm_dtype = full_sm_list[7] sm_dtype = get_np_type(sm_dtype) _global.update({"md5": _md5}) log("read data " + str(time.time()-_time)) # 读取共享内存 _time = time.time() if sm_name: inputs = from_share_memory(sm_name, sm_shape, sm_dtype) else: log("from_share_memory failed!") raise Exception log("data from share memory " + sm_name + " " + str(time.time()-_time)) # 获取模型 model = globals().get(model_type) if model is None: start_time1 = time.time() log("=== init " + model_type + " model ===") model = OtrModels().get_model() globals().update({model_type: model}) log("=== init " + model_type + " model " + str(round(time.time()-start_time1, 2)) + " ===") # 运行 _time = time.time() with lock: pred = model.predict(inputs) preds = pred[0] log("preds.shape " + str(preds.shape)) gpu_time = round(float(time.time()-_time), 2) # 数据放入共享内存 _time = time.time() # 先释放之前的同名share memory release_share_memory(get_share_memory(sm_name)) # 写入共享内存 shm = to_share_memory(preds, sm_name) full_sm_list[5] = shm.name full_sm_list[6] = str(preds.shape) full_sm_list[7] = str(preds.dtype) full_sm_list[8] = str(gpu_time) full_sm_list[-1] = "0" log("data into share memory " + str(shm.name) + " " + str(time.time()-_time)) close_share_memory_list(full_sm_list) finish_time = round(float(time.time()-start_time), 2) log("otr model predict time " + str(gpu_time) + " " + str(finish_time)) except Exception as e: finish_time = round(float(time.time()-start_time), 2) traceback.print_exc() raise class OtrModels: def __init__(self): # python文件所在目录 _dir = os.path.abspath(os.path.dirname(os.path.abspath(__file__))) model_path = _dir + "/models/table-line.h5" self.otr_model = table_net((None, None, 3), 2) self.otr_model.load_weights(model_path) def get_model(self): return self.otr_model if __name__ == '__main__': if len(sys.argv) == 2: port = int(sys.argv[1]) using_gpu_index = 0 elif len(sys.argv) == 3: port = int(sys.argv[1]) using_gpu_index = int(sys.argv[2]) else: port = 18000 using_gpu_index = 0 # _global._init() # _global.update({"port": str(port)}) globals().update({"port": str(port)}) # app.run(host='0.0.0.0', port=port, processes=1, threaded=False, debug=False) # app.run() # log("OTR running "+str(port)) while True: _otr()