123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import base64
- import json
- import multiprocessing as mp
- import socket
- import sys
- import os
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- import time
- import traceback
- from multiprocessing.context import Process
- import cv2
- import requests
- import logging
- import numpy as np
- os.environ['FLAGS_eager_delete_tensor_gb'] = '0'
- from format_convert.utils import request_post, test_gpu, get_intranet_ip, log, get_md5_from_bytes, bytes2np
- from flask import Flask, request
- from format_convert import _global
- # 接口配置
- app = Flask(__name__)
- use_angle_cls = False
- @app.route('/ocr', methods=['POST'])
- def _ocr():
- _global._init()
- _global.update({"port": globals().get("port")})
- start_time = time.time()
- log("into ocr_interface _ocr")
- try:
- if not request.form:
- log("ocr no data!")
- return json.dumps({"text": str([-9]), "bbox": str([-9])})
- data = request.form.get("data")
- _md5 = request.form.get("md5")
- only_rec = request.form.get("only_rec")
- if only_rec is None:
- only_rec = 0
- else:
- only_rec = int(only_rec)
- _global.update({"md5": _md5})
- ocr_model = globals().get("global_ocr_model")
- if ocr_model is None:
- log("----------- init ocr_model ------------")
- ocr_model = OcrModels().get_model()
- globals().update({"global_ocr_model": ocr_model})
- text = ocr(data, ocr_model, only_rec)
- return json.dumps(text)
- except TimeoutError:
- return json.dumps({"text": str([-5]), "bbox": str([-5])})
- except:
- traceback.print_exc()
- return json.dumps({"text": str([-1]), "bbox": str([-1])})
- finally:
- log("ocr interface finish time " + str(time.time()-start_time))
- def ocr(data, ocr_model, only_rec=0):
- log("into ocr_interface ocr")
- try:
- img_data = base64.b64decode(data)
- text = picture2text(img_data, ocr_model, only_rec)
- return text
- except TimeoutError:
- return {"text": str([-5]), "bbox": str([-5])}
- def picture2text(img_data, ocr_model, only_rec=0):
- log("into ocr_interface picture2text")
- try:
- # 二进制数据流转np.ndarray [np.uint8: 8位像素]
- img = bytes2np(img_data)
- # cv2.imwrite('ocr.jpg', img)
- # 预测
- if only_rec:
- results = ocr_model.ocr(img, det=False, rec=True, cls=use_angle_cls)
- else:
- results = ocr_model.ocr(img, det=True, rec=True, cls=use_angle_cls)
- # 循环每张图片识别结果
- text_list = []
- bbox_list = []
- if only_rec:
- text_list = [results[0][0]]
- bbox_list = []
- else:
- for line in results:
- text_list.append(line[-1][0])
- bbox_list.append(line[0])
- return {"text": str(text_list), "bbox": str(bbox_list)}
- except TimeoutError:
- raise TimeoutError
- except Exception:
- log("picture2text error!")
- traceback.print_exc()
- return {"text": str([]), "bbox": str([])}
- def get_best_predict_size(image_np):
- sizes = [1280, 1152, 1024, 896, 768, 640, 512, 384, 256, 128]
- min_len = 10000
- best_height = sizes[0]
- for height in sizes:
- if abs(image_np.shape[0] - height) < min_len:
- min_len = abs(image_np.shape[0] - height)
- best_height = height
- min_len = 10000
- best_width = sizes[0]
- for width in sizes:
- if abs(image_np.shape[1] - width) < min_len:
- min_len = abs(image_np.shape[1] - width)
- best_width = width
- return best_height, best_width
- class OcrModels:
- def __init__(self):
- from ocr.paddleocr import PaddleOCR
- try:
- log('----------- init ocr model ---------------')
- self.ocr_model = PaddleOCR(use_angle_cls=use_angle_cls, lang="ch")
- except:
- print(traceback.print_exc())
- raise RuntimeError
- def get_model(self):
- return self.ocr_model
- def test_ocr_model(from_remote=True):
- file_path = "error8.png"
- with open(file_path, "rb") as f:
- file_bytes = f.read()
- file_base64 = base64.b64encode(file_bytes)
- _md5 = get_md5_from_bytes(file_bytes)[0]
- only_rec = False
- _global._init()
- _global.update({"port": 15010, "md5": _md5})
- if from_remote:
- file_json = {"data": file_base64, "md5": _md5, 'only_rec': only_rec}
- # _url = "http://192.168.2.102:17000/ocr"
- _url = "http://127.0.0.1:17000/ocr"
- print(json.loads(request_post(_url, file_json)))
- else:
- ocr_model = OcrModels().get_model()
- result = ocr(file_base64, ocr_model, only_rec=only_rec)
- print(result)
- if __name__ == '__main__':
- test_ocr_model(False)
|