# encoding=utf8 import copy import inspect import io import logging import os import sys import time import requests import numpy as np from PIL import Image sys.path.append(os.path.dirname(__file__) + "/../") from pdfminer.layout import LTLine import traceback import cv2 from isr.pre_process import count_red_pixel from format_convert.utils import judge_error_code, add_div, LineTable, get_table_html, get_logger, log, \ memory_decorator, pil_resize, np2bytes, ocr_cant_read from format_convert.convert_need_interface import from_otr_interface, from_ocr_interface, from_gpu_interface_redis, \ from_idc_interface, from_isr_interface from format_convert.table_correct import get_rotated_image def image_process(image_np, image_path, is_from_pdf=False, is_from_docx=False, use_ocr=True): from format_convert.convert_tree import _Table, _Sentence def get_cluster(t_list, b_list, axis): zip_list = list(zip(t_list, b_list)) if len(zip_list) == 0: return t_list, b_list if len(zip_list[0]) > 0: zip_list.sort(key=lambda x: x[1][axis][1]) cluster_list = [] margin = 5 for text, bbox in zip_list: _find = 0 for cluster in cluster_list: if abs(cluster[1] - bbox[axis][1]) <= margin: cluster[0].append([text, bbox]) cluster[1] = bbox[axis][1] _find = 1 break if not _find: cluster_list.append([[[text, bbox]], bbox[axis][1]]) new_text_list = [] new_bbox_list = [] for cluster in cluster_list: # print("=============convert_image") # print("cluster_list", cluster) center_y = 0 for text, bbox in cluster[0]: center_y += bbox[axis][1] center_y = int(center_y / len(cluster[0])) for text, bbox in cluster[0]: bbox[axis][1] = center_y new_text_list.append(text) new_bbox_list.append(bbox) # print("cluster_list", cluster) return new_text_list, new_bbox_list def merge_textbox(textbox_list, in_objs): delete_obj = [] threshold = 5 textbox_list.sort(key=lambda x:x.bbox[0]) for k in range(len(textbox_list)): tb1 = textbox_list[k] if tb1 not in in_objs and tb1 not in delete_obj: for m in range(k+1, len(textbox_list)): tb2 = textbox_list[m] if tb2 in in_objs: continue if abs(tb1.bbox[1]-tb2.bbox[1]) <= threshold \ and abs(tb1.bbox[3]-tb2.bbox[3]) <= threshold: if tb1.bbox[0] <= tb2.bbox[0]: tb1.text = tb1.text + tb2.text else: tb1.text = tb2.text + tb1.text tb1.bbox[0] = min(tb1.bbox[0], tb2.bbox[0]) tb1.bbox[2] = max(tb1.bbox[2], tb2.bbox[2]) delete_obj.append(tb2) for _obj in delete_obj: if _obj in textbox_list: textbox_list.remove(_obj) return textbox_list def idc_process(_image_np): # 图片倾斜校正,写入原来的图片路径 # print("image_process", image_path) # g_r_i = get_rotated_image(_image_np, image_path) # if judge_error_code(g_r_i): # if is_from_docx: # return [] # else: # return g_r_i # _image_np = cv2.imread(image_path) # if _image_np is None: # return [] # return _image_np # if _image_np is None: # return [] # idc模型实现图片倾斜校正 h, w = get_best_predict_size2(_image_np, 1080) image_resize = pil_resize(_image_np, h, w) # image_resize_path = image_path.split(".")[0] + "_resize_idc." + image_path.split(".")[-1] # cv2.imwrite(image_resize_path, image_resize) # with open(image_resize_path, "rb") as f: # image_bytes = f.read() image_bytes = np2bytes(image_resize) angle = from_idc_interface(image_bytes) if judge_error_code(angle): if is_from_docx: return [] else: return angle # 根据角度旋转 image_pil = Image.fromarray(_image_np) _image_np = np.array(image_pil.rotate(angle, expand=1)) # 写入 # idc_path = image_path.split(".")[0] + "_idc." + image_path.split(".")[-1] # cv2.imwrite(idc_path, image_np) return _image_np def isr_process(_image_np): image_np_copy = copy.deepcopy(_image_np) # isr模型去除印章 _isr_time = time.time() if count_red_pixel(_image_np): # 红色像素达到一定值才过模型 with open(image_path, "rb") as f: image_bytes = f.read() _image_np = from_isr_interface(image_bytes) if judge_error_code(_image_np): if is_from_docx: return [] else: return _image_np # [1]代表检测不到印章,直接返回 if isinstance(_image_np, list) and _image_np == [1]: log("no seals detected!") _image_np = image_np_copy else: isr_path = image_path.split(".")[0] + "_isr." + image_path.split(".")[-1] cv2.imwrite(isr_path, _image_np) log("isr total time "+str(time.time()-_isr_time)) return _image_np def ocr_process(_image_np): # ocr图片过大内存溢出,需resize start_time = time.time() # 调用ocr模型接口 image_bytes = np2bytes(_image_np) text_list, bbox_list = from_ocr_interface(image_bytes, is_table=True) if judge_error_code(text_list): return text_list, text_list for i in range(len(bbox_list)): point = bbox_list[i] bbox_list[i] = [[int(point[0][0]), int(point[0][1])], [int(point[1][0]), int(point[1][1])], [int(point[2][0]), int(point[2][1])], [int(point[3][0]), int(point[3][1])]] return text_list, bbox_list def otr_process(_image_np): # otr模型识别表格,需要图片resize成模型所需大小, 写入另一个路径 best_h, best_w = get_best_predict_size(_image_np) image_resize = pil_resize(_image_np, best_h, best_w) # image_resize_path = image_path.split(".")[0] + "_resize_otr." + image_path.split(".")[-1] # cv2.imwrite(image_resize_path, image_resize) # 调用otr模型接口 # with open(image_resize_path, "rb") as f: # image_bytes = f.read() image_bytes = np2bytes(image_resize) list_line = from_otr_interface(image_bytes, is_from_pdf) if judge_error_code(list_line): if is_from_docx: return [] else: return list_line # otr resize后得到的bbox根据比例还原 start_time = time.time() ratio = (_image_np.shape[0]/best_h, _image_np.shape[1]/best_w) for i in range(len(list_line)): point = list_line[i] list_line[i] = [int(point[0]*ratio[1]), int(point[1]*ratio[0]), int(point[2]*ratio[1]), int(point[3]*ratio[0])] log("otr resize bbox recover " + str(time.time()-start_time)) return list_line def table_process(list_line, text_list, bbox_list): # 调用现成方法形成表格 try: from format_convert.convert_tree import TableLine list_lines = [] for line in list_line: list_lines.append(LTLine(1, (line[0], line[1]), (line[2], line[3]))) from format_convert.convert_tree import TextBox list_text_boxes = [] for i in range(len(bbox_list)): bbox = bbox_list[i] b_text = text_list[i] list_text_boxes.append(TextBox([bbox[0][0], bbox[0][1], bbox[2][0], bbox[2][1]], b_text)) # for _textbox in list_text_boxes: # print("==",_textbox.get_text()) lt = LineTable() tables, obj_in_table, _ = lt.recognize_table(list_text_boxes, list_lines, False) # 合并同一行textbox list_text_boxes = merge_textbox(list_text_boxes, obj_in_table) return list_text_boxes, tables, obj_in_table except: traceback.print_exc() return [-8], [-8], [-8] log("into image_preprocess") try: if image_np is None: return [] # 整体分辨率限制 threshold = 2000 if image_np.shape[0] > threshold or image_np.shape[1] > threshold: h, w = get_best_predict_size2(image_np, threshold=threshold) log("global image resize " + str(image_np.shape[:2]) + " -> " + str(h) + "," + str(w)) image_np = pil_resize(image_np, h, w) # 印章去除 image_np = isr_process(image_np) if isinstance(image_np, list): return image_np # 文字识别 text_list, box_list = ocr_process(image_np) if judge_error_code(text_list): return text_list # 判断ocr识别是否正确 if ocr_cant_read(text_list, box_list): # 方向分类 image_np = idc_process(image_np) # cv2.imshow("idc_process", image_np) # cv2.waitKey(0) if isinstance(image_np, list): return image_np # 文字识别 text_list1, box_list_1 = ocr_process(image_np) if judge_error_code(text_list1): return text_list1 # 比较字数 # print("ocr process", len("".join(text_list)), len("".join(text_list1))) if len("".join(text_list)) < len("".join(text_list1)): text_list = text_list1 box_list = box_list_1 # 表格识别 line_list = otr_process(image_np) if judge_error_code(line_list): return line_list # 表格生成 text_box_list, table_list, obj_in_table_list = table_process(line_list, text_list, box_list) if judge_error_code(table_list): return table_list # 对象生成 obj_list = [] for table in table_list: obj_list.append(_Table(table["table"], table["bbox"])) for text_box in text_box_list: if text_box not in obj_in_table_list: obj_list.append(_Sentence(text_box.get_text(), text_box.bbox)) return obj_list except Exception as e: log("image_preprocess error") traceback.print_exc() return [-1] @memory_decorator def picture2text(path, html=False): log("into picture2text") try: # 判断图片中表格 img = cv2.imread(path) if img is None: return [-3] text = image_process(img, path) if judge_error_code(text): return text if html: text = add_div(text) return [text] except Exception as e: log("picture2text error!") print("picture2text", traceback.print_exc()) return [-1] def get_best_predict_size(image_np, times=64): sizes = [] for i in range(1, 100): if i*times <= 1300: sizes.append(i*times) sizes.sort(key=lambda x: x, reverse=True) min_len = 10000 best_height = sizes[0] for height in sizes: if abs(image_np.shape[0] - height) < min_len: min_len = abs(image_np.shape[0] - height) best_height = height min_len = 10000 best_width = sizes[0] for width in sizes: if abs(image_np.shape[1] - width) < min_len: min_len = abs(image_np.shape[1] - width) best_width = width return best_height, best_width def get_best_predict_size2(image_np, threshold=3000): h, w = image_np.shape[:2] scale = threshold / max(h, w) h = int(h * scale) w = int(w * scale) return h, w class ImageConvert: def __init__(self, path, unique_type_dir): from format_convert.convert_tree import _Document self._doc = _Document(path) self.path = path self.unique_type_dir = unique_type_dir def init_package(self): # 各个包初始化 try: with open(self.path, "rb") as f: self.image = f.read() except: log("cannot open image!") traceback.print_exc() self._doc.error_code = [-3] def convert(self): from format_convert.convert_tree import _Page, _Image self.init_package() if self._doc.error_code is not None: return _page = _Page(None, 0) _image = _Image(self.image, self.path) _page.add_child(_image) self._doc.add_child(_page) def get_html(self): try: self.convert() except: traceback.print_exc() self._doc.error_code = [-1] if self._doc.error_code is not None: return self._doc.error_code return self._doc.get_html() def image_process_old(image_np, image_path, is_from_pdf=False, is_from_docx=False, use_ocr=True): from format_convert.convert_tree import _Table, _Sentence def get_cluster(t_list, b_list, axis): zip_list = list(zip(t_list, b_list)) if len(zip_list) == 0: return t_list, b_list if len(zip_list[0]) > 0: zip_list.sort(key=lambda x: x[1][axis][1]) cluster_list = [] margin = 5 for text, bbox in zip_list: _find = 0 for cluster in cluster_list: if abs(cluster[1] - bbox[axis][1]) <= margin: cluster[0].append([text, bbox]) cluster[1] = bbox[axis][1] _find = 1 break if not _find: cluster_list.append([[[text, bbox]], bbox[axis][1]]) new_text_list = [] new_bbox_list = [] for cluster in cluster_list: # print("=============convert_image") # print("cluster_list", cluster) center_y = 0 for text, bbox in cluster[0]: center_y += bbox[axis][1] center_y = int(center_y / len(cluster[0])) for text, bbox in cluster[0]: bbox[axis][1] = center_y new_text_list.append(text) new_bbox_list.append(bbox) # print("cluster_list", cluster) return new_text_list, new_bbox_list def merge_textbox(textbox_list, in_objs): delete_obj = [] threshold = 5 textbox_list.sort(key=lambda x:x.bbox[0]) for k in range(len(textbox_list)): tb1 = textbox_list[k] if tb1 not in in_objs and tb1 not in delete_obj: for m in range(k+1, len(textbox_list)): tb2 = textbox_list[m] if tb2 in in_objs: continue if abs(tb1.bbox[1]-tb2.bbox[1]) <= threshold \ and abs(tb1.bbox[3]-tb2.bbox[3]) <= threshold: if tb1.bbox[0] <= tb2.bbox[0]: tb1.text = tb1.text + tb2.text else: tb1.text = tb2.text + tb1.text tb1.bbox[0] = min(tb1.bbox[0], tb2.bbox[0]) tb1.bbox[2] = max(tb1.bbox[2], tb2.bbox[2]) delete_obj.append(tb2) for _obj in delete_obj: if _obj in textbox_list: textbox_list.remove(_obj) return textbox_list log("into image_preprocess") try: if image_np is None: return [] # 整体分辨率限制 if image_np.shape[0] > 2000 or image_np.shape[1] > 2000: h, w = get_best_predict_size2(image_np, threshold=2000) log("global image resize " + str(image_np.shape[:2]) + " -> " + str(h) + "," + str(w)) image_np = pil_resize(image_np, h, w) # 图片倾斜校正,写入原来的图片路径 # print("image_process", image_path) g_r_i = get_rotated_image(image_np, image_path) if judge_error_code(g_r_i): if is_from_docx: return [] else: return g_r_i image_np = cv2.imread(image_path) image_np_copy = copy.deepcopy(image_np) if image_np is None: return [] # if image_np is None: # return [] # # # idc模型实现图片倾斜校正 # image_resize = pil_resize(image_np, 640, 640) # image_resize_path = image_path.split(".")[0] + "_resize_idc." + image_path.split(".")[-1] # cv2.imwrite(image_resize_path, image_resize) # # with open(image_resize_path, "rb") as f: # image_bytes = f.read() # angle = from_idc_interface(image_bytes) # if judge_error_code(angle): # if is_from_docx: # return [] # else: # return angle # # 根据角度旋转 # image_pil = Image.fromarray(image_np) # image_np = np.array(image_pil.rotate(angle, expand=1)) # # 写入 # idc_path = image_path.split(".")[0] + "_idc." + image_path.split(".")[-1] # cv2.imwrite(idc_path, image_np) # isr模型去除印章 _isr_time = time.time() if count_red_pixel(image_np): # 红色像素达到一定值才过模型 with open(image_path, "rb") as f: image_bytes = f.read() image_np = from_isr_interface(image_bytes) if judge_error_code(image_np): if is_from_docx: return [] else: return image_np # [1]代表检测不到印章,直接返回 if isinstance(image_np, list) and image_np == [1]: log("no seals detected!") image_np = image_np_copy else: isr_path = image_path.split(".")[0] + "_isr." + image_path.split(".")[-1] cv2.imwrite(isr_path, image_np) log("isr total time "+str(time.time()-_isr_time)) # otr模型识别表格,需要图片resize成模型所需大小, 写入另一个路径 best_h, best_w = get_best_predict_size(image_np) # image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA) image_resize = pil_resize(image_np, best_h, best_w) image_resize_path = image_path.split(".")[0] + "_resize_otr." + image_path.split(".")[-1] cv2.imwrite(image_resize_path, image_resize) # 调用otr模型接口 with open(image_resize_path, "rb") as f: image_bytes = f.read() list_line = from_otr_interface(image_bytes, is_from_pdf) if judge_error_code(list_line): return list_line # # 预处理 # if is_from_pdf: # prob = 0.2 # else: # prob = 0.5 # with open(image_resize_path, "rb") as f: # image_bytes = f.read() # img_new, inputs = table_preprocess(image_bytes, prob) # if type(img_new) is list and judge_error_code(img_new): # return img_new # log("img_new.shape " + str(img_new.shape)) # # # 调用模型运行接口 # _dict = {"inputs": inputs, "md5": _global.get("md5")} # result = from_gpu_interface(_dict, model_type="otr", predictor_type="") # if judge_error_code(result): # logging.error("from_gpu_interface failed! " + str(result)) # raise requests.exceptions.RequestException # # pred = result.get("preds") # gpu_time = result.get("gpu_time") # log("otr model predict time " + str(gpu_time)) # # # # 解压numpy # # decompressed_array = io.BytesIO() # # decompressed_array.write(pred) # # decompressed_array.seek(0) # # pred = np.load(decompressed_array, allow_pickle=True)['arr_0'] # # log("inputs.shape" + str(pred.shape)) # # 调用gpu共享内存处理 # _dict = {"inputs": inputs, "md5": _global.get("md5")} # result = from_gpu_share_memory(_dict, model_type="otr", predictor_type="") # if judge_error_code(result): # logging.error("from_gpu_interface failed! " + str(result)) # raise requests.exceptions.RequestException # # pred = result.get("preds") # gpu_time = result.get("gpu_time") # log("otr model predict time " + str(gpu_time)) # # # 后处理 # list_line = table_postprocess(img_new, pred, prob) # log("len(list_line) " + str(len(list_line))) # if judge_error_code(list_line): # return list_line # otr resize后得到的bbox根据比例还原 start_time = time.time() ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) for i in range(len(list_line)): point = list_line[i] list_line[i] = [int(point[0]*ratio[1]), int(point[1]*ratio[0]), int(point[2]*ratio[1]), int(point[3]*ratio[0])] log("otr resize bbox recover " + str(time.time()-start_time)) # ocr图片过大内存溢出,需resize start_time = time.time() threshold = 3000 ocr_resize_flag = 0 if image_np.shape[0] >= threshold or image_np.shape[1] >= threshold: ocr_resize_flag = 1 best_h, best_w = get_best_predict_size2(image_np, threshold) # image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA) image_resize = pil_resize(image_np, best_h, best_w) image_resize_path = image_path.split(".")[0] + "_resize_ocr." + image_path.split(".")[-1] cv2.imwrite(image_resize_path, image_resize) log("ocr resize before " + str(time.time()-start_time)) # 调用ocr模型接口 with open(image_resize_path, "rb") as f: image_bytes = f.read() text_list, bbox_list = from_ocr_interface(image_bytes, is_table=True) if judge_error_code(text_list): return text_list # # PaddleOCR内部包括预处理,调用模型运行接口,后处理 # paddle_ocr = PaddleOCR(use_angle_cls=True, lang="ch") # results = paddle_ocr.ocr(image_resize, det=True, rec=True, cls=True) # # 循环每张图片识别结果 # text_list = [] # bbox_list = [] # for line in results: # # print("ocr_interface line", line) # text_list.append(line[-1][0]) # bbox_list.append(line[0]) # if len(text_list) == 0: # return [] # ocr resize后的bbox还原 if ocr_resize_flag: ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) else: ratio = (1, 1) for i in range(len(bbox_list)): point = bbox_list[i] bbox_list[i] = [[int(point[0][0]*ratio[1]), int(point[0][1]*ratio[0])], [int(point[1][0]*ratio[1]), int(point[1][1]*ratio[0])], [int(point[2][0]*ratio[1]), int(point[2][1]*ratio[0])], [int(point[3][0]*ratio[1]), int(point[3][1]*ratio[0])]] # 调用现成方法形成表格 try: from format_convert.convert_tree import TableLine list_lines = [] for line in list_line: list_lines.append(LTLine(1, (line[0], line[1]), (line[2], line[3]))) from format_convert.convert_tree import TextBox list_text_boxes = [] for i in range(len(bbox_list)): bbox = bbox_list[i] b_text = text_list[i] list_text_boxes.append(TextBox([bbox[0][0], bbox[0][1], bbox[2][0], bbox[2][1]], b_text)) # for _textbox in list_text_boxes: # print("==",_textbox.get_text()) lt = LineTable() tables, obj_in_table, _ = lt.recognize_table(list_text_boxes, list_lines, False) # 合并同一行textbox list_text_boxes = merge_textbox(list_text_boxes, obj_in_table) obj_list = [] for table in tables: obj_list.append(_Table(table["table"], table["bbox"])) for text_box in list_text_boxes: if text_box not in obj_in_table: obj_list.append(_Sentence(text_box.get_text(), text_box.bbox)) return obj_list except: traceback.print_exc() return [-8] except Exception as e: log("image_preprocess error") traceback.print_exc() return [-1]