# encoding=utf8 import copy import inspect import io import logging import os import re import sys import time from glob import glob import requests import numpy as np from PIL import Image sys.path.append(os.path.dirname(__file__) + "/../") from pdfminer.layout import LTLine import traceback import cv2 from isr.pre_process import count_red_pixel from format_convert.utils import judge_error_code, add_div, LineTable, get_table_html, get_logger, log, \ memory_decorator, pil_resize, np2bytes, ocr_cant_read from format_convert.convert_need_interface import from_otr_interface, from_ocr_interface, from_gpu_interface_redis, \ from_idc_interface, from_isr_interface from format_convert.table_correct import get_rotated_image from botr.extract_table import get_table def image_process(image_np, image_path, is_from_pdf=False, is_from_docx=False, b_table_from_text=False, pdf_obj_list=[], pdf_layout_size=()): from format_convert.convert_tree import _Table, _Sentence def get_cluster(t_list, b_list, axis): zip_list = list(zip(t_list, b_list)) if len(zip_list) == 0: return t_list, b_list if len(zip_list[0]) > 0: zip_list.sort(key=lambda x: x[1][axis][1]) cluster_list = [] margin = 5 for text, bbox in zip_list: _find = 0 for cluster in cluster_list: if abs(cluster[1] - bbox[axis][1]) <= margin: cluster[0].append([text, bbox]) cluster[1] = bbox[axis][1] _find = 1 break if not _find: cluster_list.append([[[text, bbox]], bbox[axis][1]]) new_text_list = [] new_bbox_list = [] for cluster in cluster_list: # print("=============convert_image") # print("cluster_list", cluster) center_y = 0 for text, bbox in cluster[0]: center_y += bbox[axis][1] center_y = int(center_y / len(cluster[0])) for text, bbox in cluster[0]: bbox[axis][1] = center_y new_text_list.append(text) new_bbox_list.append(bbox) # print("cluster_list", cluster) return new_text_list, new_bbox_list def merge_textbox(textbox_list, in_objs): delete_obj = [] threshold = 5 textbox_list.sort(key=lambda x:x.bbox[0]) for k in range(len(textbox_list)): tb1 = textbox_list[k] if tb1 not in in_objs and tb1 not in delete_obj: for m in range(k+1, len(textbox_list)): tb2 = textbox_list[m] if tb2 in in_objs: continue if abs(tb1.bbox[1]-tb2.bbox[1]) <= threshold \ and abs(tb1.bbox[3]-tb2.bbox[3]) <= threshold: if tb1.bbox[0] <= tb2.bbox[0]: tb1.text = tb1.text + tb2.text else: tb1.text = tb2.text + tb1.text tb1.bbox[0] = min(tb1.bbox[0], tb2.bbox[0]) tb1.bbox[2] = max(tb1.bbox[2], tb2.bbox[2]) delete_obj.append(tb2) for _obj in delete_obj: if _obj in textbox_list: textbox_list.remove(_obj) return textbox_list def idc_process(_image_np): # 图片倾斜校正,写入原来的图片路径 # print("image_process", image_path) # g_r_i = get_rotated_image(_image_np, image_path) # if judge_error_code(g_r_i): # if is_from_docx: # return [] # else: # return g_r_i # _image_np = cv2.imread(image_path) # if _image_np is None: # return [] # return _image_np # if _image_np is None: # return [] # idc模型实现图片倾斜校正 h, w = get_best_predict_size2(_image_np, 1080) image_resize = pil_resize(_image_np, h, w) # image_resize_path = image_path.split(".")[0] + "_resize_idc." + image_path.split(".")[-1] # cv2.imwrite(image_resize_path, image_resize) # with open(image_resize_path, "rb") as f: # image_bytes = f.read() image_bytes = np2bytes(image_resize) angle = from_idc_interface(image_bytes) if judge_error_code(angle): if is_from_docx: return [] else: return angle # 根据角度旋转 image_pil = Image.fromarray(_image_np) _image_np = np.array(image_pil.rotate(angle, expand=1)) # 写入 # idc_path = image_path.split(".")[0] + "_idc." + image_path.split(".")[-1] # cv2.imwrite(idc_path, image_np) return _image_np def isr_process(_image_np): log("isr_process image shape " + str(_image_np.shape)) image_np_copy = copy.deepcopy(_image_np) # isr模型去除印章 _isr_time = time.time() if count_red_pixel(_image_np): # 红色像素达到一定值才过模型 image_bytes = np2bytes(_image_np) _image_np = from_isr_interface(image_bytes) if judge_error_code(_image_np): if is_from_docx: return [] else: return _image_np # [1]代表检测不到印章,直接返回 if isinstance(_image_np, list) and _image_np == [1]: log("no seals detected!") _image_np = image_np_copy log("isr total time "+str(time.time()-_isr_time)) return _image_np def ocr_process(_image_np, _threshold=2048): log("ocr_process image shape " + str(_image_np.shape)) # ocr图片过大内存溢出,需resize # 大图按比例缩小,小图维持不变;若统一拉伸成固定大小如1024会爆显存 ratio = (1, 1) if _image_np.shape[0] > _threshold or _image_np.shape[1] > _threshold: best_h, best_w = get_best_predict_size2(_image_np, _threshold) _image_np = pil_resize(_image_np, best_h, best_w) log("ocr_process image resize " + str(_image_np.shape)) ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) # 大图片ocr加锁,防止爆显存 # if _image_np.shape[0] >= 1024 and _image_np.shape[1] >= 1024: # file_lock = True # else: # file_lock = False # 调用ocr模型接口 image_bytes = np2bytes(_image_np) text_list, bbox_list = from_ocr_interface(image_bytes, is_table=True) if judge_error_code(text_list): return text_list, text_list for i in range(len(bbox_list)): point = bbox_list[i] bbox_list[i] = [[int(point[0][0]*ratio[0]), int(point[0][1]*ratio[1])], [int(point[1][0]*ratio[0]), int(point[1][1]*ratio[1])], [int(point[2][0]*ratio[0]), int(point[2][1]*ratio[1])], [int(point[3][0]*ratio[0]), int(point[3][1]*ratio[1])]] # 去除水印字 根据识别是否为矩形框 temp_text_list = [] temp_bbox_list = [] for i in range(len(bbox_list)): bbox = bbox_list[i] text = text_list[i] if len(re.findall('[\u4e00-\u9fa5]', text)) == len(text): if (abs(bbox[0][1] - bbox[1][1]) <= 2 and abs(bbox[2][1] - bbox[3][1]) <= 2) \ or (abs(bbox[0][0] - bbox[3][0]) <= 4 and abs(bbox[2][0] - bbox[1][0]) <= 4): temp_text_list.append(text) temp_bbox_list.append(bbox) else: temp_text_list.append(text) temp_bbox_list.append(bbox) text_list = temp_text_list bbox_list = temp_bbox_list return text_list, bbox_list def otr_process(_image_np): log("otr_process image shape " + str(_image_np.shape)) # otr模型识别表格,需要图片resize成模型所需大小, 写入另一个路径 best_h, best_w = get_best_predict_size(_image_np) image_resize = pil_resize(_image_np, best_h, best_w) # image_resize_path = image_path.split(".")[0] + "_resize_otr." + image_path.split(".")[-1] # cv2.imwrite(image_resize_path, image_resize) # 调用otr模型接口 # with open(image_resize_path, "rb") as f: # image_bytes = f.read() image_bytes = np2bytes(image_resize) list_line = from_otr_interface(image_bytes, is_from_pdf) if judge_error_code(list_line): if is_from_docx: return [] else: return list_line # otr resize后得到的bbox根据比例还原 start_time = time.time() ratio = (_image_np.shape[0]/best_h, _image_np.shape[1]/best_w) for i in range(len(list_line)): point = list_line[i] list_line[i] = [int(point[0]*ratio[1]), int(point[1]*ratio[0]), int(point[2]*ratio[1]), int(point[3]*ratio[0])] log("otr resize bbox recover " + str(time.time()-start_time)) return list_line def botr_process(_image_np, table_list2, text_list2, box_list2, text_box_list2, obj_in_table_list2, from_pdf=False, pdf_obj_list=[], pdf_layout_size=()): if from_pdf: # 交叉验证 ocr结果与pdf obj,暂时使用pdf提取的 h_ratio = _image_np.shape[0] / pdf_layout_size[1] w_ratio = _image_np.shape[1] / pdf_layout_size[0] pdf_text_list = [] pdf_box_list = [] for obj in pdf_obj_list: # pdf坐标是上下颠倒的 obj.bbox = (obj.bbox[0], pdf_layout_size[1]-obj.bbox[1], obj.bbox[2], pdf_layout_size[1]-obj.bbox[3]) # 根据两个页面大小比例调整坐标 obj.bbox = (obj.bbox[0]*w_ratio, obj.bbox[1]*h_ratio, obj.bbox[2]*w_ratio, obj.bbox[3]*h_ratio) # 剔除水印字 text = re.sub('[\n ]', '', obj.get_text()) if len(text) == 1 and abs(obj.bbox[0] - obj.bbox[2]) >= 70: continue pdf_box_list.append([[int(obj.bbox[0]), int(obj.bbox[3])], [], [int(obj.bbox[2]), int(obj.bbox[1])], [] ]) pdf_text_list.append(re.sub('[\n]', '', obj.get_text())) pdf_text_box_list = get_text_box_obj(pdf_text_list, pdf_box_list) text_list2 = pdf_text_list box_list2 = pdf_box_list text_box_list2 = pdf_text_box_list _text_box_list, _table_list, _obj_in_table_list = get_table(_image_np, table_list2, text_list2, box_list2, text_box_list2) # 保存无边框表格文件 if _table_list: try: save_b_table(_image_np, text_box_list2, from_pdf) except: pass # print('_text_box_list', _text_box_list) # print('_table_list', _table_list) if from_pdf: text_box_list2 = [] table_list2 = [] if _table_list and _text_box_list: text_box_list2 += _text_box_list text_box_list2 = list(set(text_box_list2)) # table_list2 += _table_list # obj_in_table_list2 = obj_in_table_list2.union(_obj_in_table_list) return text_box_list2, _table_list, _obj_in_table_list def table_process(list_line, list_text_boxes, _image_np): # 调用现成方法形成表格 try: if list_line: from format_convert.convert_tree import TableLine list_lines = [] for line in list_line: list_lines.append(LTLine(1, (line[0], line[1]), (line[2], line[3]))) lt = LineTable() tables, obj_in_table, _, connect_textbox_list = lt.recognize_table(list_text_boxes, list_lines, sourceP_LB=False, splited=False, from_pdf=is_from_pdf) # 需分割textbox if connect_textbox_list: list_text_boxes = table_textbox_split(_image_np, connect_textbox_list, list_text_boxes) # 新的textbox,重新做表格 tables, obj_in_table, _, connect_textbox_list = lt.recognize_table(list_text_boxes, list_lines, sourceP_LB=False, splited=True, from_pdf=is_from_pdf) if not tables: return list_text_boxes, tables, obj_in_table return list_text_boxes, tables, obj_in_table else: return list_text_boxes, [], set() except: traceback.print_exc() return [-8], [-8], [-8] def get_text_box_obj(_text_list, _bbox_list): from format_convert.convert_tree import TextBox _text_box_list = [] for i in range(len(_bbox_list)): bbox = _bbox_list[i] b_text = _text_list[i] _text_box_list.append(TextBox([bbox[0][0], bbox[0][1], bbox[2][0], bbox[2][1]], b_text)) return _text_box_list def save_b_table(image_np2, text_box_list2, from_pdf=False): _start_time = time.time() _path = '/data/fangjiasheng/format_conversion_maxcompute/save_b_table' # _path = 'D:/Project/format_conversion_maxcompute/save_b_table' max_index = 20000 if os.path.exists(_path): file_list = glob(_path + '/*') if file_list: file_index_list = [int(re.split('[/.\\\\-]', x)[-3]) for x in file_list] file_index_list.sort(key=lambda x: x) index = file_index_list[-1] + 1 else: index = 0 if index > max_index: return # 文件md5 from format_convert import _global _md5 = _global.get("md5") _image_path = _path + '/' + str(index) + '-' + str(_md5) + '.png' cv2.imwrite(_image_path, image_np2) log('save b_table image success!') # if from_pdf: # _file_path = _path + '/' + str(_md5) + '-' + str(index) + '.txt' # new_text_box_list2 = [str(x) + '\n' for x in text_box_list2] # with open(_file_path, 'w') as f: # f.writelines(new_text_box_list2) # log('save b_table txt success!') log('save_b_table cost: ' + str(time.time()-_start_time)) def table_textbox_split(image_np2, connect_textbox_list, textbox_list): """ 两个单元格里的文本被ocr识别为一个,需分开才能准确放进表格 :return: """ split_bbox_list = [] split_text_list = [] splited_textbox_list = [] for textbox in connect_textbox_list: bbox = textbox.bbox bbox = [[bbox[0], bbox[1]], [], [bbox[2], bbox[3]], []] sub_image_np = image_np2[int(bbox[0][1]):int(bbox[2][1]), int(bbox[0][0]):int(bbox[2][0]), :] split_index_list = [] # 从左到右遍历img for i in range(5, sub_image_np.shape[1]-5): # 找表格分割线,这一列都为黑色像素 if np.where(sub_image_np[:, i, 0] < 200)[0].size >= sub_image_np.shape[0]: split_index_list.append(i) # 判断两线之间宽度,去重 if len(split_index_list) > 1: last_index = split_index_list[0] temp_list = [] delete_list = [] for index in split_index_list[1:]: if index in delete_list: continue if index - last_index <= 5: delete_list.append(index) else: last_index = index temp_list.append(last_index) split_index_list = temp_list # n条以上分割线,有问题 if len(split_index_list) == 0 or len(split_index_list) >= 2: print('len(split_index_list)', len(split_index_list), split_index_list) continue else: # 根据index拆开图片,重新ocr split_index_list.insert(0, 0) print('split_index_list1', split_index_list) for _i, index in enumerate(split_index_list): if _i == len(split_index_list) - 1: split_image_np = sub_image_np[:, index:, :] split_bbox_list.append([[bbox[0][0]+index, bbox[0][1]], [], [bbox[2][0], bbox[2][1]], []]) else: next_index = split_index_list[_i+1] split_image_np = sub_image_np[:, index:next_index, :] split_bbox_list.append([[bbox[0][0]+index, bbox[0][1]], [], [bbox[0][0]+next_index, bbox[2][1]], []]) # ocr split_image_bytes = np2bytes(split_image_np) text_list2, bbox_list2 = from_ocr_interface(split_image_bytes, is_table=True, only_rec=True) print('text_list2', text_list2) print('bbox_list2', split_bbox_list) if judge_error_code(text_list2): text2 = '' else: text2 = text_list2[0] split_text_list.append(text2) splited_textbox_list.append(textbox) if split_text_list and split_bbox_list: split_textbox_list = get_text_box_obj(split_text_list, split_bbox_list) for tb in splited_textbox_list: if tb in textbox_list: textbox_list.remove(tb) textbox_list += split_textbox_list return textbox_list log("into image_preprocess") try: if image_np is None: return [] if image_np.shape[0] <= 20 or image_np.shape[1] <= 20: return [] if not b_table_from_text: # 判断是否需要长图分割 slice_flag = need_image_slice(image_np) log("need_image_slice " + str(slice_flag) + " " + str(image_np.shape)) idc_flag = False image_np_list = [image_np] if slice_flag: # 方向分类 image_np = idc_process(image_np) idc_flag = True if isinstance(image_np, list): return image_np # 再判断 if need_image_slice(image_np): # 长图分割 image_np_list = image_slice_new(image_np) if len(image_np_list) < 1: log("image_slice failed!") image_np_list = [image_np] # return [-10] all_obj_list = [] _add_y = 0 for image_np in image_np_list: # print("sub image shape", image_np.shape) # 整体分辨率限制 threshold = 2048 if image_np.shape[0] > threshold or image_np.shape[1] > threshold: h, w = get_best_predict_size2(image_np, threshold=threshold) log("global image resize " + str(image_np.shape[:2]) + " -> " + str(h) + "," + str(w)) image_np = pil_resize(image_np, h, w) # 印章去除 image_np = isr_process(image_np) if isinstance(image_np, list): return image_np # 文字识别 text_list, box_list = ocr_process(image_np) if judge_error_code(text_list): return text_list # 判断ocr识别是否正确 if ocr_cant_read(text_list, box_list) and not idc_flag and False: # 方向分类 image_np = idc_process(image_np) # cv2.imshow("idc_process", image_np) # cv2.waitKey(0) if isinstance(image_np, list): return image_np # 文字识别 text_list1, box_list_1 = ocr_process(image_np) if judge_error_code(text_list1): return text_list1 # 比较字数 # print("ocr process", len("".join(text_list)), len("".join(text_list1))) if len("".join(text_list)) < len("".join(text_list1)): text_list = text_list1 box_list = box_list_1 # 表格识别 line_list = otr_process(image_np) if judge_error_code(line_list): return line_list # 生成TextBox对象 text_box_list = get_text_box_obj(text_list, box_list) # 表格生成 text_box_list, table_list, obj_in_table_list = table_process(line_list, text_box_list, image_np) if judge_error_code(table_list): return table_list # 无边框表格识别 start_time = time.time() text_box_list, b_table_list, b_obj_in_table_list = botr_process(image_np, table_list, text_list, box_list, text_box_list, obj_in_table_list, b_table_from_text, pdf_obj_list, pdf_layout_size, ) log('botr process cost: ' + str(time.time()-start_time)) # 合并非表格的同一行TextBox text_box_list = merge_textbox(text_box_list, obj_in_table_list) # 对象生成 obj_list = [] for table in table_list: _table = _Table(table["table"], table["bbox"]) obj_list.append(_table) for table in b_table_list: _table = _Table(table["table"], table["bbox"]) obj_list.append(_table) _table.y += 10000 for text_box in text_box_list: if text_box not in obj_in_table_list: obj_list.append(_Sentence(text_box.get_text(), text_box.bbox)) # 多图修正y if len(image_np_list) > 1: list_y = [] for obj in obj_list: obj.y += _add_y list_y.append(obj.y) if len(list_y) > 0: _add_y = max(list_y) # 合并 all_obj_list += obj_list else: all_obj_list = [] table_list = [] text_list = [] box_list = [] text_box_list = [] obj_in_table_list = set() # 表格识别 line_list = otr_process(image_np) if judge_error_code(line_list): return line_list # 生成TextBox对象 text_box_list = get_text_box_obj(text_list, box_list) # 表格生成 text_box_list, table_list, obj_in_table_list = table_process(line_list, text_box_list, image_np) if judge_error_code(table_list): return table_list # 无边框表格识别 start_time = time.time() text_box_list, table_list, obj_in_table_list = botr_process(image_np, table_list, text_list, box_list, text_box_list, obj_in_table_list, b_table_from_text, pdf_obj_list, pdf_layout_size, ) log('botr process cost: ' + str(time.time()-start_time)) # 合并非表格的同一行TextBox text_box_list = merge_textbox(text_box_list, obj_in_table_list) # 对象生成 obj_list = [] # print('table_list', table_list) for table in table_list: _table = _Table(table["table"], table["bbox"]) obj_list.append(_table) for text_box in text_box_list: if text_box not in obj_in_table_list: obj_list.append(_Sentence(text_box.get_text(), text_box.bbox)) # 合并 all_obj_list += obj_list return all_obj_list except Exception as e: log("image_preprocess error") traceback.print_exc() return [-1] @memory_decorator def picture2text(path, html=False): log("into picture2text") try: # 判断图片中表格 img = cv2.imread(path) if img is None: return [-3] text = image_process(img, path) if judge_error_code(text): return text if html: text = add_div(text) return [text] except Exception as e: log("picture2text error!") print("picture2text", traceback.print_exc()) return [-1] def get_best_predict_size(image_np, times=64): sizes = [] for i in range(1, 100): if i*times <= 1300: sizes.append(i*times) sizes.sort(key=lambda x: x, reverse=True) min_len = 10000 best_height = sizes[0] for height in sizes: if abs(image_np.shape[0] - height) < min_len: min_len = abs(image_np.shape[0] - height) best_height = height min_len = 10000 best_width = sizes[0] for width in sizes: if abs(image_np.shape[1] - width) < min_len: min_len = abs(image_np.shape[1] - width) best_width = width return best_height, best_width def get_best_predict_size2(image_np, threshold=3000): h, w = image_np.shape[:2] scale = threshold / max(h, w) h = int(h * scale) w = int(w * scale) return h, w def image_slice(image_np): """ slice the image if the height is to large :return: """ _sum = np.average(image_np, axis=1) list_white_line = [] list_ave = list(_sum) for _i in range(len(list_ave)): if (list_ave[_i] > 250).all(): list_white_line.append(_i) set_white_line = set(list_white_line) width = image_np.shape[1] height = image_np.shape[0] list_images = [] _begin = 0 _end = 0 while 1: if _end > height: break _end += width while 1: if _begin in set_white_line: break if _begin > height: break _begin += 1 _image = image_np[_begin:_end, ...] list_images.append(_image) _begin = _end log("image_slice into %d parts" % (len(list_images))) return list_images def image_slice_new(image_np): """ 长图分割 :return: """ height, width = image_np.shape[:2] image_origin = copy.deepcopy(image_np) # 去除黑边 image_np = remove_black_border(image_np) # 1. 转化成灰度图 image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY) # 2. 二值化 ret, binary = cv2.threshold(image_np, 125, 255, cv2.THRESH_BINARY_INV) # 3. 膨胀和腐蚀操作的核函数 kernal = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)) # 4. 膨胀一次,让轮廓突出 dilation = cv2.dilate(binary, kernal, iterations=1) # dilation = np.add(np.int0(np.full(dilation.shape, 255)), -1 * np.int0(dilation)) # dilation = np.uint8(dilation) # cv2.namedWindow("dilation", 0) # cv2.resizeWindow("dilation", 1000, 800) # cv2.imshow("dilation", dilation) # cv2.waitKey(0) # cv2.imwrite("error.jpg", dilation) # 按行求平均 width_avg = np.average(np.float32(dilation), axis=1) zero_index = np.where(width_avg == 0.)[0] # print(height, width) # print(width_avg) # print(width_avg.shape) # print(zero_index) # print(zero_index.shape) # zero_index.sort(key=lambda x: x) # 截取范围内寻找分割点 max_distance = int(width / 2) image_list = [] last_h = 0 for i in range(height // width + 1): h = last_h + width # 前后的分割点 zero_h_after = zero_index[np.where(zero_index >= h)] zero_h_before = zero_index[np.where(zero_index <= h)] # print("last_h, h", last_h, h) # print("last_h, h", last_h, h) # print(zero_index.shape) # print("zero_h_after.shape", zero_h_after.shape) if zero_h_after.shape[0] == 0: # 最后一截 last_image = image_origin[last_h:, :, :] if last_image.shape[0] <= max_distance: image_list[-1] = np.concatenate([image_list[-1], last_image], axis=0) else: image_list.append(last_image) break # 分割点距离不能太远 cut_h = zero_h_after.tolist()[0] # print("cut_h", cut_h) if abs(h - cut_h) <= max_distance: image_list.append(image_origin[last_h:cut_h, :, :]) last_h = cut_h # 后面找不到往前找 else: cut_h = zero_h_before.tolist()[-1] if abs(cut_h - h) <= max_distance: image_list.append(image_origin[last_h:cut_h, :, :]) last_h = cut_h # i = 0 # for im in image_list: # print(im.shape) # cv2.imwrite("error" + str(i) + ".jpg", im) # i += 1 # cv2.namedWindow("im", 0) # cv2.resizeWindow("im", 1000, 800) # cv2.imshow("im", im) # cv2.waitKey(0) log("image_slice into %d parts" % (len(image_list))) return image_list def need_image_slice(image_np): h, w = image_np.shape[:2] # if h > 3000 and w < 2000: # return True if 2. <= h / w and w >= 100: return True return False def remove_black_border(img_np): try: # 阈值 threshold = 100 # 转换为灰度图像 gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY) # 获取图片尺寸 h, w = gray.shape[:2] # 无法区分黑色区域超过一半的情况 rowc = gray[:, int(1/2*w)] colc = gray[int(1/2*h), :] rowflag = np.argwhere(rowc > threshold) colflag = np.argwhere(colc > threshold) left, bottom, right, top = rowflag[0, 0], colflag[-1, 0], rowflag[-1, 0], colflag[0, 0] # cv2.imshow('remove_black_border', img_np[left:right, top:bottom, :]) # cv2.waitKey() return img_np[left:right, top:bottom, :] except: return img_np class ImageConvert: def __init__(self, path, unique_type_dir): from format_convert.convert_tree import _Document self._doc = _Document(path) self.path = path self.unique_type_dir = unique_type_dir def init_package(self): # 各个包初始化 try: with open(self.path, "rb") as f: self.image = f.read() except: log("cannot open image!") traceback.print_exc() self._doc.error_code = [-3] def convert(self): from format_convert.convert_tree import _Page, _Image self.init_package() if self._doc.error_code is not None: return _page = _Page(None, 0) _image = _Image(self.image, self.path) _page.add_child(_image) self._doc.add_child(_page) def get_html(self): try: self.convert() except: traceback.print_exc() self._doc.error_code = [-1] if self._doc.error_code is not None: return self._doc.error_code return self._doc.get_html() def image_process_old(image_np, image_path, is_from_pdf=False, is_from_docx=False, use_ocr=True): from format_convert.convert_tree import _Table, _Sentence def get_cluster(t_list, b_list, axis): zip_list = list(zip(t_list, b_list)) if len(zip_list) == 0: return t_list, b_list if len(zip_list[0]) > 0: zip_list.sort(key=lambda x: x[1][axis][1]) cluster_list = [] margin = 5 for text, bbox in zip_list: _find = 0 for cluster in cluster_list: if abs(cluster[1] - bbox[axis][1]) <= margin: cluster[0].append([text, bbox]) cluster[1] = bbox[axis][1] _find = 1 break if not _find: cluster_list.append([[[text, bbox]], bbox[axis][1]]) new_text_list = [] new_bbox_list = [] for cluster in cluster_list: # print("=============convert_image") # print("cluster_list", cluster) center_y = 0 for text, bbox in cluster[0]: center_y += bbox[axis][1] center_y = int(center_y / len(cluster[0])) for text, bbox in cluster[0]: bbox[axis][1] = center_y new_text_list.append(text) new_bbox_list.append(bbox) # print("cluster_list", cluster) return new_text_list, new_bbox_list def merge_textbox(textbox_list, in_objs): delete_obj = [] threshold = 5 textbox_list.sort(key=lambda x:x.bbox[0]) for k in range(len(textbox_list)): tb1 = textbox_list[k] if tb1 not in in_objs and tb1 not in delete_obj: for m in range(k+1, len(textbox_list)): tb2 = textbox_list[m] if tb2 in in_objs: continue if abs(tb1.bbox[1]-tb2.bbox[1]) <= threshold \ and abs(tb1.bbox[3]-tb2.bbox[3]) <= threshold: if tb1.bbox[0] <= tb2.bbox[0]: tb1.text = tb1.text + tb2.text else: tb1.text = tb2.text + tb1.text tb1.bbox[0] = min(tb1.bbox[0], tb2.bbox[0]) tb1.bbox[2] = max(tb1.bbox[2], tb2.bbox[2]) delete_obj.append(tb2) for _obj in delete_obj: if _obj in textbox_list: textbox_list.remove(_obj) return textbox_list log("into image_preprocess") try: if image_np is None: return [] # 整体分辨率限制 if image_np.shape[0] > 2000 or image_np.shape[1] > 2000: h, w = get_best_predict_size2(image_np, threshold=2000) log("global image resize " + str(image_np.shape[:2]) + " -> " + str(h) + "," + str(w)) image_np = pil_resize(image_np, h, w) # 图片倾斜校正,写入原来的图片路径 # print("image_process", image_path) g_r_i = get_rotated_image(image_np, image_path) if judge_error_code(g_r_i): if is_from_docx: return [] else: return g_r_i image_np = cv2.imread(image_path) image_np_copy = copy.deepcopy(image_np) if image_np is None: return [] # if image_np is None: # return [] # # # idc模型实现图片倾斜校正 # image_resize = pil_resize(image_np, 640, 640) # image_resize_path = image_path.split(".")[0] + "_resize_idc." + image_path.split(".")[-1] # cv2.imwrite(image_resize_path, image_resize) # # with open(image_resize_path, "rb") as f: # image_bytes = f.read() # angle = from_idc_interface(image_bytes) # if judge_error_code(angle): # if is_from_docx: # return [] # else: # return angle # # 根据角度旋转 # image_pil = Image.fromarray(image_np) # image_np = np.array(image_pil.rotate(angle, expand=1)) # # 写入 # idc_path = image_path.split(".")[0] + "_idc." + image_path.split(".")[-1] # cv2.imwrite(idc_path, image_np) # isr模型去除印章 _isr_time = time.time() if count_red_pixel(image_np): # 红色像素达到一定值才过模型 with open(image_path, "rb") as f: image_bytes = f.read() image_np = from_isr_interface(image_bytes) if judge_error_code(image_np): if is_from_docx: return [] else: return image_np # [1]代表检测不到印章,直接返回 if isinstance(image_np, list) and image_np == [1]: log("no seals detected!") image_np = image_np_copy else: isr_path = image_path.split(".")[0] + "_isr." + image_path.split(".")[-1] cv2.imwrite(isr_path, image_np) log("isr total time "+str(time.time()-_isr_time)) # otr模型识别表格,需要图片resize成模型所需大小, 写入另一个路径 best_h, best_w = get_best_predict_size(image_np) # image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA) image_resize = pil_resize(image_np, best_h, best_w) image_resize_path = image_path.split(".")[0] + "_resize_otr." + image_path.split(".")[-1] cv2.imwrite(image_resize_path, image_resize) # 调用otr模型接口 with open(image_resize_path, "rb") as f: image_bytes = f.read() list_line = from_otr_interface(image_bytes, is_from_pdf) if judge_error_code(list_line): return list_line # # 预处理 # if is_from_pdf: # prob = 0.2 # else: # prob = 0.5 # with open(image_resize_path, "rb") as f: # image_bytes = f.read() # img_new, inputs = table_preprocess(image_bytes, prob) # if type(img_new) is list and judge_error_code(img_new): # return img_new # log("img_new.shape " + str(img_new.shape)) # # # 调用模型运行接口 # _dict = {"inputs": inputs, "md5": _global.get("md5")} # result = from_gpu_interface(_dict, model_type="otr", predictor_type="") # if judge_error_code(result): # logging.error("from_gpu_interface failed! " + str(result)) # raise requests.exceptions.RequestException # # pred = result.get("preds") # gpu_time = result.get("gpu_time") # log("otr model predict time " + str(gpu_time)) # # # # 解压numpy # # decompressed_array = io.BytesIO() # # decompressed_array.write(pred) # # decompressed_array.seek(0) # # pred = np.load(decompressed_array, allow_pickle=True)['arr_0'] # # log("inputs.shape" + str(pred.shape)) # # 调用gpu共享内存处理 # _dict = {"inputs": inputs, "md5": _global.get("md5")} # result = from_gpu_share_memory(_dict, model_type="otr", predictor_type="") # if judge_error_code(result): # logging.error("from_gpu_interface failed! " + str(result)) # raise requests.exceptions.RequestException # # pred = result.get("preds") # gpu_time = result.get("gpu_time") # log("otr model predict time " + str(gpu_time)) # # # 后处理 # list_line = table_postprocess(img_new, pred, prob) # log("len(list_line) " + str(len(list_line))) # if judge_error_code(list_line): # return list_line # otr resize后得到的bbox根据比例还原 start_time = time.time() ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) for i in range(len(list_line)): point = list_line[i] list_line[i] = [int(point[0]*ratio[1]), int(point[1]*ratio[0]), int(point[2]*ratio[1]), int(point[3]*ratio[0])] log("otr resize bbox recover " + str(time.time()-start_time)) # ocr图片过大内存溢出,需resize start_time = time.time() threshold = 3000 ocr_resize_flag = 0 if image_np.shape[0] >= threshold or image_np.shape[1] >= threshold: ocr_resize_flag = 1 best_h, best_w = get_best_predict_size2(image_np, threshold) # image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA) image_resize = pil_resize(image_np, best_h, best_w) log("ocr_process image resize " + str(image_resize.shape)) image_resize_path = image_path.split(".")[0] + "_resize_ocr." + image_path.split(".")[-1] cv2.imwrite(image_resize_path, image_resize) log("ocr resize before " + str(time.time()-start_time)) # 调用ocr模型接口 with open(image_resize_path, "rb") as f: image_bytes = f.read() text_list, bbox_list = from_ocr_interface(image_bytes, is_table=True) if judge_error_code(text_list): return text_list # # PaddleOCR内部包括预处理,调用模型运行接口,后处理 # paddle_ocr = PaddleOCR(use_angle_cls=True, lang="ch") # results = paddle_ocr.ocr(image_resize, det=True, rec=True, cls=True) # # 循环每张图片识别结果 # text_list = [] # bbox_list = [] # for line in results: # # print("ocr_interface line", line) # text_list.append(line[-1][0]) # bbox_list.append(line[0]) # if len(text_list) == 0: # return [] # ocr resize后的bbox还原 if ocr_resize_flag: ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) else: ratio = (1, 1) for i in range(len(bbox_list)): point = bbox_list[i] bbox_list[i] = [[int(point[0][0]*ratio[1]), int(point[0][1]*ratio[0])], [int(point[1][0]*ratio[1]), int(point[1][1]*ratio[0])], [int(point[2][0]*ratio[1]), int(point[2][1]*ratio[0])], [int(point[3][0]*ratio[1]), int(point[3][1]*ratio[0])]] # 调用现成方法形成表格 try: from format_convert.convert_tree import TableLine list_lines = [] for line in list_line: list_lines.append(LTLine(1, (line[0], line[1]), (line[2], line[3]))) from format_convert.convert_tree import TextBox list_text_boxes = [] for i in range(len(bbox_list)): bbox = bbox_list[i] b_text = text_list[i] list_text_boxes.append(TextBox([bbox[0][0], bbox[0][1], bbox[2][0], bbox[2][1]], b_text)) # for _textbox in list_text_boxes: # print("==",_textbox.get_text()) lt = LineTable() tables, obj_in_table, _ = lt.recognize_table(list_text_boxes, list_lines, False) # 合并同一行textbox list_text_boxes = merge_textbox(list_text_boxes, obj_in_table) obj_list = [] for table in tables: obj_list.append(_Table(table["table"], table["bbox"])) for text_box in list_text_boxes: if text_box not in obj_in_table: obj_list.append(_Sentence(text_box.get_text(), text_box.bbox)) return obj_list except: traceback.print_exc() return [-8] except Exception as e: log("image_preprocess error") traceback.print_exc() return [-1] if __name__ == "__main__": image_slice_new(cv2.imread("C:/Users/Administrator/Desktop/test_image/error28.jpg"))