import base64 import copy import json import os import time import sys import traceback from glob import glob os.environ["CUDA_VISIBLE_DEVICES"] = "-1" sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from format_convert.max_compute_config import max_compute import tensorflow as tf MAX_COMPUTE = max_compute if not MAX_COMPUTE: # tensorflow 内存设置 try: gpus = tf.config.list_physical_devices('GPU') if len(gpus) > 0: tf.config.experimental.set_virtual_device_configuration( gpus[0], [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)]) except: traceback.print_exc() # pass # gpus = tf.config.list_physical_devices('GPU') # for gpu in gpus: # 如果使用多块GPU时 # tf.config.experimental.set_memory_growth(gpu, True) os.environ['CUDA_CACHE_MAXSIZE'] = str(2147483648) os.environ['CUDA_CACHE_DISABLE'] = str(0) gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.05) sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options)) from format_convert import _global import cv2 import numpy as np from PIL import Image from idc.model import direction_model from format_convert.utils import log, get_md5_from_bytes, request_post, np2pil, bytes2np, pil2np, pil_resize, np2bytes import tensorflow as tf from flask import Flask, request from idc.pre_process import get_text_region, get_best_predict_size2 tf.compat.v1.disable_eager_execution() sess = tf.compat.v1.Session(graph=tf.Graph()) image_shape = (192, 192) def adjust_direction(image_np, model, if_return_angle=False): # 4个方向 cls_num = 4 # 构建数据 origin_image = copy.deepcopy(image_np) # image_np = pil_resize(image_np, image_shape[0], image_shape[1]) # 获取合适的文字区域 result_list, image_np = get_text_region(image_np, image_shape) # cv2.imshow("get_text_region", image_np) # cv2.waitKey(0) if not result_list: return None if len(image_np.shape) < 3: image_np = np.expand_dims(image_np, axis=-1) if image_np.shape[0] != image_shape[0] or image_np.shape[1] != image_shape[1]: image_np = pil_resize(image_np, image_shape[0], image_shape[1]) image_np = np.expand_dims(image_np[:, :, 0], axis=-1) X = np.expand_dims(np.array(image_np), 0) # 预测 with sess.as_default(): with sess.graph.as_default(): pred = model.predict(X, batch_size=1) pred = pred.astype(np.float64) pred = np.argmax(pred[0]) # 根据分类计算角度 angle = int(360 - pred*int((360/cls_num))) if if_return_angle: return angle else: # 根据角度旋转 image_pil = Image.fromarray(origin_image) image_rotate = np.array(image_pil.rotate(angle, expand=1)) return image_rotate def idc(data, model): log("into idc_interface idc") try: # start_time = time.time() img_data = base64.b64decode(data) img_np = bytes2np(img_data) angle = adjust_direction(img_np, model, if_return_angle=True) if angle is None: angle = 0 # print(time.time()-start_time) log("idc angle " + str(angle)) return {"angle": angle} except TimeoutError: return {"angle": [-5]} except: traceback.print_exc() return {"angle": [-1]} # 接口配置 app = Flask(__name__) @app.route('/idc', methods=['POST']) def _idc(): _global._init() _global.update({"port": globals().get("port")}) start_time = time.time() log("into idc_interface _idc") try: if not request.form: log("idc no data!") return json.dumps({"angle": str([-9])}) data = request.form.get("data") log("idc_interface get data time" + str(time.time()-start_time)) _md5 = request.form.get("md5") _global.update({"md5": _md5}) idc_model = globals().get("global_idc_model") if idc_model is None: print("=========== init idc model ===========") idc_model = IdcModels().get_model() globals().update({"global_idc_model": idc_model}) angle = idc(data, idc_model).get("angle") return json.dumps({"angle": angle}) except TimeoutError: return json.dumps({"angle": str([-5])}) except: traceback.print_exc() return json.dumps({"angle": str([-1])}) finally: log("idc interface finish time " + str(time.time()-start_time)) class IdcModels: def __init__(self): # python文件所在目录 _dir = os.path.abspath(os.path.dirname(__file__)) # detect model_path = _dir + "/models/cnn.h5" with sess.as_default(): with sess.graph.as_default(): self.model = direction_model(input_shape=(image_shape[0], image_shape[1], 1), output_shape=4) self.model.load_weights(model_path) def get_model(self): return self.model def test_idc_model(from_remote=False): idc_model = IdcModels().get_model() paths = glob("C:/Users/Administrator/Desktop/test_image/*") # file_path = "C:/Users/Administrator/Desktop/test_image/error10.jpg" for file_path in paths: img_np = cv2.imread(file_path) # img_np = pil_resize(img_np, 640, 640) h, w = get_best_predict_size2(img_np, threshold=1080) img_np = pil_resize(img_np, h, w) # print(img_np.shape) file_bytes = np2bytes(img_np) file_base64 = base64.b64encode(file_bytes) _md5 = get_md5_from_bytes(file_bytes)[0] _global._init() _global.update({"port": 15010, "md5": _md5}) if from_remote: file_json = {"data": file_base64, "md5": _md5} # _url = "http://192.168.2.102:17000/ocr" _url = "http://127.0.0.1:17000/ocr" print(json.loads(request_post(_url, file_json))) else: result = idc(file_base64, idc_model) # print(result) if type(result.get("angle")) == list: print(result) else: angle = result.get("angle") img = Image.fromarray(img_np) img = np.array(img.rotate(angle, expand=1)) print("angle", angle) print(img.shape) # cv2.namedWindow('img', cv2.WINDOW_NORMAL | cv2.WINDOW_KEEPRATIO) # cv2.imshow("img", img) # cv2.waitKey(0) # print(result) if __name__ == "__main__": test_idc_model()