import base64 import json import multiprocessing as mp import socket import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") import time import traceback from multiprocessing.context import Process import cv2 import requests import logging import numpy as np os.environ['FLAGS_eager_delete_tensor_gb'] = '0' from format_convert.utils import request_post, test_gpu, get_intranet_ip, log, get_md5_from_bytes from flask import Flask, request from format_convert import _global # 接口配置 app = Flask(__name__) @app.route('/ocr', methods=['POST']) def _ocr(): _global._init() _global.update({"port": globals().get("port")}) start_time = time.time() log("into ocr_interface _ocr") try: if not request.form: log("ocr no data!") return json.dumps({"text": str([-9]), "bbox": str([-9])}) data = request.form.get("data") img_data = base64.b64decode(data) # _md5 = get_md5_from_bytes(img_data)[0] _md5 = request.form.get("md5") _global.update({"md5": _md5}) ocr_model = globals().get("global_ocr_model") if ocr_model is None: log("----------- init ocr_model ------------") ocr_model = OcrModels().get_model() globals().update({"global_ocr_model": ocr_model}) text = picture2text(img_data, ocr_model) return json.dumps(text) except TimeoutError: return json.dumps({"text": str([-5]), "bbox": str([-5])}) except: traceback.print_exc() return json.dumps({"text": str([-1]), "bbox": str([-1])}) finally: log("ocr interface finish time " + str(time.time()-start_time)) def ocr(data, ocr_model): log("into ocr_interface ocr") try: img_data = base64.b64decode(data) text = picture2text(img_data, ocr_model) return text except TimeoutError: raise TimeoutError flag = 0 def picture2text(img_data, ocr_model): log("into ocr_interface picture2text") try: start_time = time.time() # 二进制数据流转np.ndarray [np.uint8: 8位像素] img = cv2.imdecode(np.frombuffer(img_data, np.uint8), cv2.IMREAD_COLOR) # 将bgr转为rbg try: np_images = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) except cv2.error as e: if "src.empty()" in str(e): log("ocr_interface picture2text image is empty!") return {"text": str([]), "bbox": str([])} # resize # cv2.imshow("before resize", np_images) # print("np_images.shape", np_images.shape) # best_h, best_w = get_best_predict_size(np_images) # np_images = cv2.resize(np_images, (best_w, best_h), interpolation=cv2.INTER_AREA) # cv2.imshow("after resize", np_images) # print("np_images.shape", np_images.shape) # cv2.waitKey(0) # 预测 results = ocr_model.ocr(np_images, det=True, rec=True, cls=True) # 循环每张图片识别结果 text_list = [] bbox_list = [] for line in results: # print("ocr_interface line", line) text_list.append(line[-1][0]) bbox_list.append(line[0]) # 查看bbox # img = np.zeros((np_images.shape[1], np_images.shape[0]), np.uint8) # img.fill(255) # for box in bbox_list: # print(box) # cv2.rectangle(img, (int(box[0][0]), int(box[0][1])), # (int(box[2][0]), int(box[2][1])), (0, 0, 255), 1) # cv2.imshow("bbox", img) # cv2.waitKey(0) # log("ocr model use time: " + str(time.time()-start_time)) return {"text": str(text_list), "bbox": str(bbox_list)} except TimeoutError: raise TimeoutError except Exception as e: log("picture2text error!") print("picture2text", traceback.print_exc()) return {"text": str([]), "bbox": str([])} def get_best_predict_size(image_np): sizes = [1280, 1152, 1024, 896, 768, 640, 512, 384, 256, 128] min_len = 10000 best_height = sizes[0] for height in sizes: if abs(image_np.shape[0] - height) < min_len: min_len = abs(image_np.shape[0] - height) best_height = height min_len = 10000 best_width = sizes[0] for width in sizes: if abs(image_np.shape[1] - width) < min_len: min_len = abs(image_np.shape[1] - width) best_width = width return best_height, best_width class OcrModels: def __init__(self): from ocr.paddleocr import PaddleOCR try: self.ocr_model = PaddleOCR(use_angle_cls=True, lang="ch") except: print(traceback.print_exc()) raise RuntimeError def get_model(self): return self.ocr_model def test_ocr_model(from_remote=True): file_path = "C:/Users/Administrator/Desktop/error2.png" with open(file_path, "rb") as f: file_bytes = f.read() file_base64 = base64.b64encode(file_bytes) _md5 = get_md5_from_bytes(file_bytes)[0] _global._init() _global.update({"port": 15010, "md5": _md5}) if from_remote: file_json = {"data": file_base64, "md5": _md5} # _url = "http://192.168.2.102:17000/ocr" _url = "http://127.0.0.1:17000/ocr" print(json.loads(request_post(_url, file_json))) else: ocr_model = OcrModels().get_model() result = ocr(file_base64, ocr_model) print(result) if __name__ == '__main__': if len(sys.argv) == 2: port = int(sys.argv[1]) elif len(sys.argv) == 3: port = int(sys.argv[1]) using_gpu_index = int(sys.argv[2]) else: port = 17000 using_gpu_index = 0 _global._init() _global.update({"port": str(port)}) globals().update({"port": str(port)}) # ip = get_intranet_ip() # logging.basicConfig(level=logging.INFO, # format='%(asctime)s - %(name)s - %(levelname)s - ' # + ip + ' - ' + str(port) + ' - %(message)s') os.environ['CUDA_VISIBLE_DEVICES'] = str(using_gpu_index) # app.run(host='0.0.0.0', port=port, processes=1, threaded=False, debug=False) app.run() log("OCR running "+str(port)) # test_ocr_model(False) # # log("OCR running") # file_path = "C:/Users/Administrator/Desktop/error9.jpg" # file_path = "error1.png" # # with open(file_path, "rb") as f: # file_bytes = f.read() # file_base64 = base64.b64encode(file_bytes) # # ocr_model = OcrModels().get_model() # result = ocr(file_base64, ocr_model) # result = ocr(file_base64, ocr_model) # text_list = eval(result.get("text")) # box_list = eval(result.get("bbox")) # # new_list = [] # for i in range(len(text_list)): # new_list.append([text_list[i], box_list[i]]) # # # print(new_list[0][1]) # new_list.sort(key=lambda x: (x[1][1][0], x[1][0][0])) # # for t in new_list: # print(t[0])