123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- import base64
- import copy
- import json
- import os
- import time
- import sys
- import traceback
- from glob import glob
- os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- from format_convert import _global
- import cv2
- import numpy as np
- from PIL import Image
- from idc.model import direction_model
- from format_convert.utils import log, get_md5_from_bytes, request_post, np2pil, bytes2np, pil2np, pil_resize, np2bytes
- import tensorflow as tf
- from flask import Flask, request
- tf.compat.v1.disable_eager_execution()
- sess = tf.compat.v1.Session(graph=tf.Graph())
- image_shape = (640, 640)
- def adjust_direction(image_np, model, if_return_angle=False):
- # 4个方向
- cls_num = 4
- # 构建数据
- origin_image = copy.deepcopy(image_np)
- # image_np = pil_resize(image_np, image_shape[0], image_shape[1])
- X = np.expand_dims(np.array(image_np), 0)
- # 预测
- with sess.as_default():
- with sess.graph.as_default():
- pred = model.predict(X, batch_size=1)
- pred = pred.astype(np.float64)
- pred = np.argmax(pred[0])
- # 根据分类计算角度
- angle = int(360 - pred*int((360/cls_num)))
- if if_return_angle:
- return angle
- else:
- # 根据角度旋转
- image_pil = Image.fromarray(origin_image)
- image_rotate = np.array(image_pil.rotate(angle, expand=1))
- return image_rotate
- def idc(data, model):
- log("into idc_interface idc")
- try:
- # start_time = time.time()
- img_data = base64.b64decode(data)
- img_np = bytes2np(img_data)
- angle = adjust_direction(img_np, model, if_return_angle=True)
- # print(time.time()-start_time)
- return {"angle": angle}
- except TimeoutError:
- return {"angle": [-5]}
- except:
- traceback.print_exc()
- return {"angle": [-1]}
- # 接口配置
- app = Flask(__name__)
- @app.route('/idc', methods=['POST'])
- def _idc():
- _global._init()
- _global.update({"port": globals().get("port")})
- start_time = time.time()
- log("into idc_interface _idc")
- try:
- if not request.form:
- log("idc no data!")
- return json.dumps({"text": str([-9]), "bbox": str([-9])})
- data = request.form.get("data")
- log("idc_interface get data time" + str(time.time()-start_time))
- img_data = base64.b64decode(data)
- img_np = bytes2np(img_data)
- _md5 = request.form.get("md5")
- _global.update({"md5": _md5})
- idc_model = globals().get("global_idc_model")
- if idc_model is None:
- print("=========== init idc model ===========")
- idc_model = IdcModels().get_model()
- globals().update({"global_idc_model": idc_model})
- angle = adjust_direction(img_np, idc_model, if_return_angle=True)
- return json.dumps({"angle": angle})
- except TimeoutError:
- return json.dumps({"angle": str([-5])})
- except:
- traceback.print_exc()
- return json.dumps({"angle": str([-1])})
- finally:
- log("idc interface finish time " + str(time.time()-start_time))
- class IdcModels:
- def __init__(self):
- # python文件所在目录
- _dir = os.path.abspath(os.path.dirname(__file__))
- # detect
- model_path = _dir + "/models/model.h5"
- with sess.as_default():
- with sess.graph.as_default():
- self.model = direction_model(input_shape=(image_shape[0], image_shape[1], 3),
- output_shape=4)
- self.model.load_weights(model_path)
- def get_model(self):
- return self.model
- def test_idc_model(from_remote=False):
- idc_model = IdcModels().get_model()
- paths = glob("C:/Users/Administrator/Desktop/test_image/*")
- # file_path = "C:/Users/Administrator/Desktop/test_image/error10.jpg"
- for file_path in paths:
- img_np = cv2.imread(file_path)
- img_np = pil_resize(img_np, 640, 640)
- print(img_np.shape)
- file_bytes = np2bytes(img_np)
- file_base64 = base64.b64encode(file_bytes)
- _md5 = get_md5_from_bytes(file_bytes)[0]
- _global._init()
- _global.update({"port": 15010, "md5": _md5})
- if from_remote:
- file_json = {"data": file_base64, "md5": _md5}
- # _url = "http://192.168.2.102:17000/ocr"
- _url = "http://127.0.0.1:17000/ocr"
- print(json.loads(request_post(_url, file_json)))
- else:
- result = idc(file_base64, idc_model)
- # print(result)
- if type(result.get("angle")) == list:
- print(result)
- else:
- angle = result.get("angle")
- img = Image.fromarray(img_np)
- img = np.array(img.rotate(angle, expand=1))
- print(img.shape)
- cv2.namedWindow('img', cv2.WINDOW_NORMAL | cv2.WINDOW_KEEPRATIO)
- cv2.imshow("img", img)
- cv2.waitKey(0)
- # print(result)
- if __name__ == "__main__":
- test_idc_model()
|