import logging import os import sys sys.path.append(os.path.dirname(__file__) + "/../") from pdfminer.layout import LTLine import traceback import cv2 from format_convert import get_memory_info from format_convert.utils import judge_error_code, add_div, LineTable, get_table_html from format_convert.table_correct import get_rotated_image from format_convert.convert_need_interface import from_otr_interface, from_ocr_interface def image_process(image_np, image_path, use_ocr=True): from format_convert.convert_tree import _Table, _Sentence def get_cluster(t_list, b_list, axis): zip_list = list(zip(t_list, b_list)) if len(zip_list) == 0: return t_list, b_list if len(zip_list[0]) > 0: zip_list.sort(key=lambda x: x[1][axis][1]) cluster_list = [] margin = 5 for text, bbox in zip_list: _find = 0 for cluster in cluster_list: if abs(cluster[1] - bbox[axis][1]) <= margin: cluster[0].append([text, bbox]) cluster[1] = bbox[axis][1] _find = 1 break if not _find: cluster_list.append([[[text, bbox]], bbox[axis][1]]) new_text_list = [] new_bbox_list = [] for cluster in cluster_list: # print("=============convert_image") # print("cluster_list", cluster) center_y = 0 for text, bbox in cluster[0]: center_y += bbox[axis][1] center_y = int(center_y / len(cluster[0])) for text, bbox in cluster[0]: bbox[axis][1] = center_y new_text_list.append(text) new_bbox_list.append(bbox) # print("cluster_list", cluster) return new_text_list, new_bbox_list def merge_textbox(textbox_list, in_objs): delete_obj = [] threshold = 5 for k in range(len(textbox_list)): tb1 = textbox_list[k] if tb1 not in in_objs and tb1 not in delete_obj: for m in range(k+1, len(textbox_list)): tb2 = textbox_list[m] if abs(tb1.bbox[1]-tb2.bbox[1]) <= threshold \ and abs(tb1.bbox[3]-tb2.bbox[3]) <= threshold: if tb1.bbox[0] <= tb2.bbox[0]: tb1.text = tb1.text + tb2.text else: tb1.text = tb2.text + tb1.text tb1.bbox[0] = min(tb1.bbox[0], tb2.bbox[0]) tb1.bbox[2] = max(tb1.bbox[2], tb2.bbox[2]) delete_obj.append(tb2) for _obj in delete_obj: if _obj in textbox_list: textbox_list.remove(_obj) return textbox_list logging.info("into image_preprocess") try: # 图片倾斜校正,写入原来的图片路径 print("image_process", image_path) g_r_i = get_rotated_image(image_np, image_path) if g_r_i == [-1]: return [-1] # otr需要图片resize, 写入另一个路径 image_np = cv2.imread(image_path) if image_np is None: return [] best_h, best_w = get_best_predict_size(image_np) image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA) # image_resize_path = image_path[:-4] + "_resize" + image_path[-4:] image_resize_path = image_path.split(".")[0] + "_resize." + image_path.split(".")[-1] cv2.imwrite(image_resize_path, image_resize) # 调用otr模型接口 with open(image_resize_path, "rb") as f: image_bytes = f.read() list_line = from_otr_interface(image_bytes) if judge_error_code(list_line): return list_line # 将resize后得到的bbox根据比例还原 ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) for i in range(len(list_line)): point = list_line[i] list_line[i] = [int(point[0]*ratio[1]), int(point[1]*ratio[0]), int(point[2]*ratio[1]), int(point[3]*ratio[0])] # 调用ocr模型接口 with open(image_path, "rb") as f: image_bytes = f.read() text_list, bbox_list = from_ocr_interface(image_bytes, True) if judge_error_code(text_list): return text_list # 对文字框的y进行聚类 text_list, bbox_list = get_cluster(text_list, bbox_list, 0) # text_list, bbox_list = get_cluster(text_list, bbox_list, 1) text_list, bbox_list = get_cluster(text_list, bbox_list, 2) # text_list, bbox_list = get_cluster(text_list, bbox_list, 3) # 调用现成方法形成表格 try: from format_convert.convert_tree import TableLine list_lines = [] for line in list_line: list_lines.append(LTLine(1, (line[0], line[1]), (line[2], line[3]))) from format_convert.convert_tree import TextBox list_text_boxes = [] for i in range(len(bbox_list)): bbox = bbox_list[i] b_text = text_list[i] list_text_boxes.append(TextBox([bbox[0][0], bbox[0][1], bbox[2][0], bbox[2][1]], b_text)) lt = LineTable() tables, obj_in_table, _ = lt.recognize_table(list_text_boxes, list_lines, False) # 合并同一行textbox list_text_boxes = merge_textbox(list_text_boxes, obj_in_table) obj_list = [] for table in tables: obj_list.append(_Table(table["table"], table["bbox"])) for text_box in list_text_boxes: if text_box not in obj_in_table: obj_list.append(_Sentence(text_box.get_text(), text_box.bbox)) return obj_list except: traceback.print_exc() return [-8] except Exception as e: logging.info("image_preprocess error") print("image_preprocess", traceback.print_exc()) return [-1] @get_memory_info.memory_decorator def picture2text(path, html=False): logging.info("into picture2text") try: # 判断图片中表格 img = cv2.imread(path) if img is None: return [-3] text = image_process(img, path) if judge_error_code(text): return text if html: text = add_div(text) return [text] except Exception as e: logging.info("picture2text error!") print("picture2text", traceback.print_exc()) return [-1] def get_best_predict_size(image_np, times=64): sizes = [] for i in range(1, 100): if i*times <= 3000: sizes.append(i*times) sizes.sort(key=lambda x: x, reverse=True) min_len = 10000 best_height = sizes[0] for height in sizes: if abs(image_np.shape[0] - height) < min_len: min_len = abs(image_np.shape[0] - height) best_height = height min_len = 10000 best_width = sizes[0] for width in sizes: if abs(image_np.shape[1] - width) < min_len: min_len = abs(image_np.shape[1] - width) best_width = width return best_height, best_width class ImageConvert: def __init__(self, path, unique_type_dir): from format_convert.convert_tree import _Document self._doc = _Document(path) self.path = path self.unique_type_dir = unique_type_dir def init_package(self): # 各个包初始化 try: with open(self.path, "rb") as f: self.image = f.read() except: logging.info("cannot open image!") traceback.print_exc() self._doc.error_code = [-3] def convert(self): from format_convert.convert_tree import _Page, _Image self.init_package() if self._doc.error_code is not None: return _page = _Page(None, 0) _image = _Image(self.image, self.path) _page.add_child(_image) self._doc.add_child(_page) def get_html(self): try: self.convert() except: traceback.print_exc() self._doc.error_code = [-1] if self._doc.error_code is not None: return self._doc.error_code return self._doc.get_html()