123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155 |
- # encoding=utf8
- import copy
- import inspect
- import io
- import logging
- import os
- import re
- import sys
- import time
- from glob import glob
- import requests
- import numpy as np
- from PIL import Image
- sys.path.append(os.path.dirname(__file__) + "/../")
- from pdfminer.layout import LTLine
- import traceback
- import cv2
- from isr.pre_process import count_red_pixel
- from format_convert.utils import judge_error_code, add_div, LineTable, get_table_html, get_logger, log, \
- memory_decorator, pil_resize, np2bytes, ocr_cant_read
- from format_convert.convert_need_interface import from_otr_interface, from_ocr_interface, from_gpu_interface_redis, \
- from_idc_interface, from_isr_interface
- from format_convert.table_correct import get_rotated_image
- from botr.extract_table import get_table
- def image_process(image_np, image_path, is_from_pdf=False, is_from_docx=False,
- b_table_from_text=False, pdf_obj_list=[], pdf_layout_size=()):
- from format_convert.convert_tree import _Table, _Sentence
- def get_cluster(t_list, b_list, axis):
- zip_list = list(zip(t_list, b_list))
- if len(zip_list) == 0:
- return t_list, b_list
- if len(zip_list[0]) > 0:
- zip_list.sort(key=lambda x: x[1][axis][1])
- cluster_list = []
- margin = 5
- for text, bbox in zip_list:
- _find = 0
- for cluster in cluster_list:
- if abs(cluster[1] - bbox[axis][1]) <= margin:
- cluster[0].append([text, bbox])
- cluster[1] = bbox[axis][1]
- _find = 1
- break
- if not _find:
- cluster_list.append([[[text, bbox]], bbox[axis][1]])
- new_text_list = []
- new_bbox_list = []
- for cluster in cluster_list:
- # print("=============convert_image")
- # print("cluster_list", cluster)
- center_y = 0
- for text, bbox in cluster[0]:
- center_y += bbox[axis][1]
- center_y = int(center_y / len(cluster[0]))
- for text, bbox in cluster[0]:
- bbox[axis][1] = center_y
- new_text_list.append(text)
- new_bbox_list.append(bbox)
- # print("cluster_list", cluster)
- return new_text_list, new_bbox_list
- def merge_textbox(textbox_list, in_objs):
- delete_obj = []
- threshold = 5
- textbox_list.sort(key=lambda x:x.bbox[0])
- for k in range(len(textbox_list)):
- tb1 = textbox_list[k]
- if tb1 not in in_objs and tb1 not in delete_obj:
- for m in range(k+1, len(textbox_list)):
- tb2 = textbox_list[m]
- if tb2 in in_objs:
- continue
- if abs(tb1.bbox[1]-tb2.bbox[1]) <= threshold \
- and abs(tb1.bbox[3]-tb2.bbox[3]) <= threshold:
- if tb1.bbox[0] <= tb2.bbox[0]:
- tb1.text = tb1.text + tb2.text
- else:
- tb1.text = tb2.text + tb1.text
- tb1.bbox[0] = min(tb1.bbox[0], tb2.bbox[0])
- tb1.bbox[2] = max(tb1.bbox[2], tb2.bbox[2])
- delete_obj.append(tb2)
- for _obj in delete_obj:
- if _obj in textbox_list:
- textbox_list.remove(_obj)
- return textbox_list
- def idc_process(_image_np):
- # 图片倾斜校正,写入原来的图片路径
- # print("image_process", image_path)
- # g_r_i = get_rotated_image(_image_np, image_path)
- # if judge_error_code(g_r_i):
- # if is_from_docx:
- # return []
- # else:
- # return g_r_i
- # _image_np = cv2.imread(image_path)
- # if _image_np is None:
- # return []
- # return _image_np
- # if _image_np is None:
- # return []
- # idc模型实现图片倾斜校正
- h, w = get_best_predict_size2(_image_np, 1080)
- image_resize = pil_resize(_image_np, h, w)
- # image_resize_path = image_path.split(".")[0] + "_resize_idc." + image_path.split(".")[-1]
- # cv2.imwrite(image_resize_path, image_resize)
- # with open(image_resize_path, "rb") as f:
- # image_bytes = f.read()
- image_bytes = np2bytes(image_resize)
- angle = from_idc_interface(image_bytes)
- if judge_error_code(angle):
- if is_from_docx:
- return []
- else:
- return angle
- # 根据角度旋转
- image_pil = Image.fromarray(_image_np)
- _image_np = np.array(image_pil.rotate(angle, expand=1))
- # 写入
- # idc_path = image_path.split(".")[0] + "_idc." + image_path.split(".")[-1]
- # cv2.imwrite(idc_path, image_np)
- return _image_np
- def isr_process(_image_np):
- log("isr_process image shape " + str(_image_np.shape))
- image_np_copy = copy.deepcopy(_image_np)
- # isr模型去除印章
- _isr_time = time.time()
- if count_red_pixel(_image_np):
- # 红色像素达到一定值才过模型
- image_bytes = np2bytes(_image_np)
- _image_np = from_isr_interface(image_bytes)
- if judge_error_code(_image_np):
- if is_from_docx:
- return []
- else:
- return _image_np
- # [1]代表检测不到印章,直接返回
- if isinstance(_image_np, list) and _image_np == [1]:
- log("no seals detected!")
- _image_np = image_np_copy
- log("isr total time "+str(time.time()-_isr_time))
- return _image_np
- def ocr_process(_image_np, _threshold=2048):
- log("ocr_process image shape " + str(_image_np.shape))
- # ocr图片过大内存溢出,需resize
- # 大图按比例缩小,小图维持不变;若统一拉伸成固定大小如1024会爆显存
- ratio = (1, 1)
- if _image_np.shape[0] > _threshold or _image_np.shape[1] > _threshold:
- best_h, best_w = get_best_predict_size2(_image_np, _threshold)
- _image_np = pil_resize(_image_np, best_h, best_w)
- log("ocr_process image resize " + str(_image_np.shape))
- ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w)
- # 大图片ocr加锁,防止爆显存
- # if _image_np.shape[0] >= 1024 and _image_np.shape[1] >= 1024:
- # file_lock = True
- # else:
- # file_lock = False
- # 调用ocr模型接口
- image_bytes = np2bytes(_image_np)
- text_list, bbox_list = from_ocr_interface(image_bytes, is_table=True)
- if judge_error_code(text_list):
- return text_list, text_list
- for i in range(len(bbox_list)):
- point = bbox_list[i]
- bbox_list[i] = [[int(point[0][0]*ratio[0]), int(point[0][1]*ratio[1])],
- [int(point[1][0]*ratio[0]), int(point[1][1]*ratio[1])],
- [int(point[2][0]*ratio[0]), int(point[2][1]*ratio[1])],
- [int(point[3][0]*ratio[0]), int(point[3][1]*ratio[1])]]
- # 去除水印字 根据识别是否为矩形框
- temp_text_list = []
- temp_bbox_list = []
- for i in range(len(bbox_list)):
- bbox = bbox_list[i]
- text = text_list[i]
- if len(re.findall('[\u4e00-\u9fa5]', text)) == len(text):
- if (abs(bbox[0][1] - bbox[1][1]) <= 2 and abs(bbox[2][1] - bbox[3][1]) <= 2) \
- or (abs(bbox[0][0] - bbox[3][0]) <= 4 and abs(bbox[2][0] - bbox[1][0]) <= 4):
- temp_text_list.append(text)
- temp_bbox_list.append(bbox)
- else:
- temp_text_list.append(text)
- temp_bbox_list.append(bbox)
- text_list = temp_text_list
- bbox_list = temp_bbox_list
- return text_list, bbox_list
- def otr_process(_image_np):
- log("otr_process image shape " + str(_image_np.shape))
- # otr模型识别表格,需要图片resize成模型所需大小, 写入另一个路径
- best_h, best_w = get_best_predict_size(_image_np)
- image_resize = pil_resize(_image_np, best_h, best_w)
- # image_resize_path = image_path.split(".")[0] + "_resize_otr." + image_path.split(".")[-1]
- # cv2.imwrite(image_resize_path, image_resize)
- # 调用otr模型接口
- # with open(image_resize_path, "rb") as f:
- # image_bytes = f.read()
- image_bytes = np2bytes(image_resize)
- list_line = from_otr_interface(image_bytes, is_from_pdf)
- if judge_error_code(list_line):
- if is_from_docx:
- return []
- else:
- return list_line
- # otr resize后得到的bbox根据比例还原
- start_time = time.time()
- ratio = (_image_np.shape[0]/best_h, _image_np.shape[1]/best_w)
- for i in range(len(list_line)):
- point = list_line[i]
- list_line[i] = [int(point[0]*ratio[1]), int(point[1]*ratio[0]),
- int(point[2]*ratio[1]), int(point[3]*ratio[0])]
- log("otr resize bbox recover " + str(time.time()-start_time))
- return list_line
- def botr_process(_image_np, table_list2, text_list2, box_list2, text_box_list2, obj_in_table_list2,
- from_pdf=False, pdf_obj_list=[], pdf_layout_size=()):
- if from_pdf:
- # 交叉验证 ocr结果与pdf obj,暂时使用pdf提取的
- h_ratio = _image_np.shape[0] / pdf_layout_size[1]
- w_ratio = _image_np.shape[1] / pdf_layout_size[0]
- pdf_text_list = []
- pdf_box_list = []
- for obj in pdf_obj_list:
- # pdf坐标是上下颠倒的
- obj.bbox = (obj.bbox[0], pdf_layout_size[1]-obj.bbox[1],
- obj.bbox[2], pdf_layout_size[1]-obj.bbox[3])
- # 根据两个页面大小比例调整坐标
- obj.bbox = (obj.bbox[0]*w_ratio, obj.bbox[1]*h_ratio,
- obj.bbox[2]*w_ratio, obj.bbox[3]*h_ratio)
- # 剔除水印字
- text = re.sub('[\n ]', '', obj.get_text())
- if len(text) == 1 and abs(obj.bbox[0] - obj.bbox[2]) >= 70:
- continue
- pdf_box_list.append([[int(obj.bbox[0]), int(obj.bbox[3])],
- [],
- [int(obj.bbox[2]), int(obj.bbox[1])],
- []
- ])
- pdf_text_list.append(re.sub('[\n]', '', obj.get_text()))
- pdf_text_box_list = get_text_box_obj(pdf_text_list, pdf_box_list)
- text_list2 = pdf_text_list
- box_list2 = pdf_box_list
- text_box_list2 = pdf_text_box_list
- _text_box_list, _table_list, _obj_in_table_list = get_table(_image_np, table_list2, text_list2, box_list2, text_box_list2)
- # 保存无边框表格文件
- if _table_list:
- try:
- save_b_table(_image_np, text_box_list2, from_pdf)
- except:
- pass
- # print('_text_box_list', _text_box_list)
- # print('_table_list', _table_list)
- if from_pdf:
- text_box_list2 = []
- table_list2 = []
- if _table_list and _text_box_list:
- text_box_list2 += _text_box_list
- text_box_list2 = list(set(text_box_list2))
- # table_list2 += _table_list
- # obj_in_table_list2 = obj_in_table_list2.union(_obj_in_table_list)
- return text_box_list2, _table_list, _obj_in_table_list
- def table_process(list_line, list_text_boxes, _image_np):
- # 调用现成方法形成表格
- try:
- if list_line:
- from format_convert.convert_tree import TableLine
- list_lines = []
- for line in list_line:
- list_lines.append(LTLine(1, (line[0], line[1]), (line[2], line[3])))
- lt = LineTable()
- tables, obj_in_table, _, connect_textbox_list = lt.recognize_table(list_text_boxes, list_lines,
- sourceP_LB=False, splited=False,
- from_pdf=is_from_pdf)
- # 需分割textbox
- if connect_textbox_list:
- list_text_boxes = table_textbox_split(_image_np, connect_textbox_list, list_text_boxes)
- # 新的textbox,重新做表格
- tables, obj_in_table, _, connect_textbox_list = lt.recognize_table(list_text_boxes, list_lines,
- sourceP_LB=False, splited=True,
- from_pdf=is_from_pdf)
- if not tables:
- return list_text_boxes, tables, obj_in_table
- return list_text_boxes, tables, obj_in_table
- else:
- return list_text_boxes, [], set()
- except:
- traceback.print_exc()
- return [-8], [-8], [-8]
- def get_text_box_obj(_text_list, _bbox_list):
- from format_convert.convert_tree import TextBox
- _text_box_list = []
- for i in range(len(_bbox_list)):
- bbox = _bbox_list[i]
- b_text = _text_list[i]
- _text_box_list.append(TextBox([bbox[0][0], bbox[0][1],
- bbox[2][0], bbox[2][1]], b_text))
- return _text_box_list
- def save_b_table(image_np2, text_box_list2, from_pdf=False):
- _start_time = time.time()
- _path = '/data/fangjiasheng/format_conversion_maxcompute/save_b_table'
- # _path = 'D:/Project/format_conversion_maxcompute/save_b_table'
- max_index = 20000
- if os.path.exists(_path):
- file_list = glob(_path + '/*')
- if file_list:
- file_index_list = [int(re.split('[/.\\\\-]', x)[-3]) for x in file_list]
- file_index_list.sort(key=lambda x: x)
- index = file_index_list[-1] + 1
- else:
- index = 0
- if index > max_index:
- return
- # 文件md5
- from format_convert import _global
- _md5 = _global.get("md5")
- _image_path = _path + '/' + str(index) + '-' + str(_md5) + '.png'
- cv2.imwrite(_image_path, image_np2)
- log('save b_table image success!')
- # if from_pdf:
- # _file_path = _path + '/' + str(_md5) + '-' + str(index) + '.txt'
- # new_text_box_list2 = [str(x) + '\n' for x in text_box_list2]
- # with open(_file_path, 'w') as f:
- # f.writelines(new_text_box_list2)
- # log('save b_table txt success!')
- log('save_b_table cost: ' + str(time.time()-_start_time))
- def table_textbox_split(image_np2, connect_textbox_list, textbox_list):
- """
- 两个单元格里的文本被ocr识别为一个,需分开才能准确放进表格
- :return:
- """
- split_bbox_list = []
- split_text_list = []
- splited_textbox_list = []
- for textbox in connect_textbox_list:
- bbox = textbox.bbox
- bbox = [[bbox[0], bbox[1]], [], [bbox[2], bbox[3]], []]
- sub_image_np = image_np2[int(bbox[0][1]):int(bbox[2][1]), int(bbox[0][0]):int(bbox[2][0]), :]
- split_index_list = []
- # 从左到右遍历img
- for i in range(5, sub_image_np.shape[1]-5):
- # 找表格分割线,这一列都为黑色像素
- if np.where(sub_image_np[:, i, 0] < 200)[0].size >= sub_image_np.shape[0]:
- split_index_list.append(i)
- # 判断两线之间宽度,去重
- if len(split_index_list) > 1:
- last_index = split_index_list[0]
- temp_list = []
- delete_list = []
- for index in split_index_list[1:]:
- if index in delete_list:
- continue
- if index - last_index <= 5:
- delete_list.append(index)
- else:
- last_index = index
- temp_list.append(last_index)
- split_index_list = temp_list
- # n条以上分割线,有问题
- if len(split_index_list) == 0 or len(split_index_list) >= 2:
- print('len(split_index_list)', len(split_index_list), split_index_list)
- continue
- else:
- # 根据index拆开图片,重新ocr
- split_index_list.insert(0, 0)
- print('split_index_list1', split_index_list)
- for _i, index in enumerate(split_index_list):
- if _i == len(split_index_list) - 1:
- split_image_np = sub_image_np[:, index:, :]
- split_bbox_list.append([[bbox[0][0]+index, bbox[0][1]], [], [bbox[2][0], bbox[2][1]], []])
- else:
- next_index = split_index_list[_i+1]
- split_image_np = sub_image_np[:, index:next_index, :]
- split_bbox_list.append([[bbox[0][0]+index, bbox[0][1]], [], [bbox[0][0]+next_index, bbox[2][1]], []])
- # ocr
- split_image_bytes = np2bytes(split_image_np)
- text_list2, bbox_list2 = from_ocr_interface(split_image_bytes, is_table=True, only_rec=True)
- print('text_list2', text_list2)
- print('bbox_list2', split_bbox_list)
- if judge_error_code(text_list2):
- text2 = ''
- else:
- text2 = text_list2[0]
- split_text_list.append(text2)
- splited_textbox_list.append(textbox)
- if split_text_list and split_bbox_list:
- split_textbox_list = get_text_box_obj(split_text_list, split_bbox_list)
- for tb in splited_textbox_list:
- if tb in textbox_list:
- textbox_list.remove(tb)
- textbox_list += split_textbox_list
- return textbox_list
- log("into image_preprocess")
- try:
- if image_np is None:
- return []
- if image_np.shape[0] <= 20 or image_np.shape[1] <= 20:
- return []
- if not b_table_from_text:
- # 判断是否需要长图分割
- slice_flag = need_image_slice(image_np)
- log("need_image_slice " + str(slice_flag) + " " + str(image_np.shape))
- idc_flag = False
- image_np_list = [image_np]
- if slice_flag:
- # 方向分类
- image_np = idc_process(image_np)
- idc_flag = True
- if isinstance(image_np, list):
- return image_np
- # 再判断
- if need_image_slice(image_np):
- # 长图分割
- image_np_list = image_slice_new(image_np)
- if len(image_np_list) < 1:
- log("image_slice failed!")
- image_np_list = [image_np]
- # return [-10]
- all_obj_list = []
- _add_y = 0
- for image_np in image_np_list:
- # print("sub image shape", image_np.shape)
- # 整体分辨率限制
- threshold = 2048
- if image_np.shape[0] > threshold or image_np.shape[1] > threshold:
- h, w = get_best_predict_size2(image_np, threshold=threshold)
- log("global image resize " + str(image_np.shape[:2]) + " -> " + str(h) + "," + str(w))
- image_np = pil_resize(image_np, h, w)
- # 印章去除
- image_np = isr_process(image_np)
- if isinstance(image_np, list):
- return image_np
- # 文字识别
- text_list, box_list = ocr_process(image_np)
- if judge_error_code(text_list):
- return text_list
- # 判断ocr识别是否正确
- if ocr_cant_read(text_list, box_list) and not idc_flag and False:
- # 方向分类
- image_np = idc_process(image_np)
- # cv2.imshow("idc_process", image_np)
- # cv2.waitKey(0)
- if isinstance(image_np, list):
- return image_np
- # 文字识别
- text_list1, box_list_1 = ocr_process(image_np)
- if judge_error_code(text_list1):
- return text_list1
- # 比较字数
- # print("ocr process", len("".join(text_list)), len("".join(text_list1)))
- if len("".join(text_list)) < len("".join(text_list1)):
- text_list = text_list1
- box_list = box_list_1
- # 表格识别
- line_list = otr_process(image_np)
- if judge_error_code(line_list):
- return line_list
- # 生成TextBox对象
- text_box_list = get_text_box_obj(text_list, box_list)
- # 表格生成
- text_box_list, table_list, obj_in_table_list = table_process(line_list, text_box_list, image_np)
- if judge_error_code(table_list):
- return table_list
- # 无边框表格识别
- start_time = time.time()
- text_box_list, b_table_list, b_obj_in_table_list = botr_process(image_np, table_list,
- text_list, box_list,
- text_box_list,
- obj_in_table_list,
- b_table_from_text,
- pdf_obj_list,
- pdf_layout_size,
- )
- log('botr process cost: ' + str(time.time()-start_time))
- # 合并非表格的同一行TextBox
- text_box_list = merge_textbox(text_box_list, obj_in_table_list)
- # 对象生成
- obj_list = []
- for table in table_list:
- _table = _Table(table["table"], table["bbox"])
- obj_list.append(_table)
- for table in b_table_list:
- _table = _Table(table["table"], table["bbox"])
- obj_list.append(_table)
- _table.y += 10000
- for text_box in text_box_list:
- if text_box not in obj_in_table_list:
- obj_list.append(_Sentence(text_box.get_text(), text_box.bbox))
- # 多图修正y
- if len(image_np_list) > 1:
- list_y = []
- for obj in obj_list:
- obj.y += _add_y
- list_y.append(obj.y)
- if len(list_y) > 0:
- _add_y = max(list_y)
- # 合并
- all_obj_list += obj_list
- else:
- all_obj_list = []
- table_list = []
- text_list = []
- box_list = []
- text_box_list = []
- obj_in_table_list = set()
- # 表格识别
- line_list = otr_process(image_np)
- if judge_error_code(line_list):
- return line_list
- # 生成TextBox对象
- text_box_list = get_text_box_obj(text_list, box_list)
- # 表格生成
- text_box_list, table_list, obj_in_table_list = table_process(line_list, text_box_list, image_np)
- if judge_error_code(table_list):
- return table_list
- # 无边框表格识别
- start_time = time.time()
- text_box_list, table_list, obj_in_table_list = botr_process(image_np, table_list,
- text_list, box_list,
- text_box_list,
- obj_in_table_list,
- b_table_from_text,
- pdf_obj_list,
- pdf_layout_size,
- )
- log('botr process cost: ' + str(time.time()-start_time))
- # 合并非表格的同一行TextBox
- text_box_list = merge_textbox(text_box_list, obj_in_table_list)
- # 对象生成
- obj_list = []
- # print('table_list', table_list)
- for table in table_list:
- _table = _Table(table["table"], table["bbox"])
- obj_list.append(_table)
- for text_box in text_box_list:
- if text_box not in obj_in_table_list:
- obj_list.append(_Sentence(text_box.get_text(), text_box.bbox))
- # 合并
- all_obj_list += obj_list
- return all_obj_list
- except Exception as e:
- log("image_preprocess error")
- traceback.print_exc()
- return [-1]
- @memory_decorator
- def picture2text(path, html=False):
- log("into picture2text")
- try:
- # 判断图片中表格
- img = cv2.imread(path)
- if img is None:
- return [-3]
- text = image_process(img, path)
- if judge_error_code(text):
- return text
- if html:
- text = add_div(text)
- return [text]
- except Exception as e:
- log("picture2text error!")
- print("picture2text", traceback.print_exc())
- return [-1]
- def get_best_predict_size(image_np, times=64):
- sizes = []
- for i in range(1, 100):
- if i*times <= 1300:
- sizes.append(i*times)
- sizes.sort(key=lambda x: x, reverse=True)
- min_len = 10000
- best_height = sizes[0]
- for height in sizes:
- if abs(image_np.shape[0] - height) < min_len:
- min_len = abs(image_np.shape[0] - height)
- best_height = height
- min_len = 10000
- best_width = sizes[0]
- for width in sizes:
- if abs(image_np.shape[1] - width) < min_len:
- min_len = abs(image_np.shape[1] - width)
- best_width = width
- return best_height, best_width
- def get_best_predict_size2(image_np, threshold=3000):
- h, w = image_np.shape[:2]
- scale = threshold / max(h, w)
- h = int(h * scale)
- w = int(w * scale)
- return h, w
- def image_slice(image_np):
- """
- slice the image if the height is to large
- :return:
- """
- _sum = np.average(image_np, axis=1)
- list_white_line = []
- list_ave = list(_sum)
- for _i in range(len(list_ave)):
- if (list_ave[_i] > 250).all():
- list_white_line.append(_i)
- set_white_line = set(list_white_line)
- width = image_np.shape[1]
- height = image_np.shape[0]
- list_images = []
- _begin = 0
- _end = 0
- while 1:
- if _end > height:
- break
- _end += width
- while 1:
- if _begin in set_white_line:
- break
- if _begin > height:
- break
- _begin += 1
- _image = image_np[_begin:_end, ...]
- list_images.append(_image)
- _begin = _end
- log("image_slice into %d parts" % (len(list_images)))
- return list_images
- def image_slice_new(image_np):
- """
- 长图分割
- :return:
- """
- height, width = image_np.shape[:2]
- image_origin = copy.deepcopy(image_np)
- # 去除黑边
- image_np = remove_black_border(image_np)
- # 1. 转化成灰度图
- image_np = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY)
- # 2. 二值化
- ret, binary = cv2.threshold(image_np, 125, 255, cv2.THRESH_BINARY_INV)
- # 3. 膨胀和腐蚀操作的核函数
- kernal = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
- # 4. 膨胀一次,让轮廓突出
- dilation = cv2.dilate(binary, kernal, iterations=1)
- # dilation = np.add(np.int0(np.full(dilation.shape, 255)), -1 * np.int0(dilation))
- # dilation = np.uint8(dilation)
- # cv2.namedWindow("dilation", 0)
- # cv2.resizeWindow("dilation", 1000, 800)
- # cv2.imshow("dilation", dilation)
- # cv2.waitKey(0)
- # cv2.imwrite("error.jpg", dilation)
- # 按行求平均
- width_avg = np.average(np.float32(dilation), axis=1)
- zero_index = np.where(width_avg == 0.)[0]
- # print(height, width)
- # print(width_avg)
- # print(width_avg.shape)
- # print(zero_index)
- # print(zero_index.shape)
- # zero_index.sort(key=lambda x: x)
- # 截取范围内寻找分割点
- max_distance = int(width / 2)
- image_list = []
- last_h = 0
- for i in range(height // width + 1):
- h = last_h + width
- # 前后的分割点
- zero_h_after = zero_index[np.where(zero_index >= h)]
- zero_h_before = zero_index[np.where(zero_index <= h)]
- # print("last_h, h", last_h, h)
- # print("last_h, h", last_h, h)
- # print(zero_index.shape)
- # print("zero_h_after.shape", zero_h_after.shape)
- if zero_h_after.shape[0] == 0:
- # 最后一截
- last_image = image_origin[last_h:, :, :]
- if last_image.shape[0] <= max_distance:
- image_list[-1] = np.concatenate([image_list[-1], last_image], axis=0)
- else:
- image_list.append(last_image)
- break
- # 分割点距离不能太远
- cut_h = zero_h_after.tolist()[0]
- # print("cut_h", cut_h)
- if abs(h - cut_h) <= max_distance:
- image_list.append(image_origin[last_h:cut_h, :, :])
- last_h = cut_h
- # 后面找不到往前找
- else:
- cut_h = zero_h_before.tolist()[-1]
- if abs(cut_h - h) <= max_distance:
- image_list.append(image_origin[last_h:cut_h, :, :])
- last_h = cut_h
- # i = 0
- # for im in image_list:
- # print(im.shape)
- # cv2.imwrite("error" + str(i) + ".jpg", im)
- # i += 1
- # cv2.namedWindow("im", 0)
- # cv2.resizeWindow("im", 1000, 800)
- # cv2.imshow("im", im)
- # cv2.waitKey(0)
- log("image_slice into %d parts" % (len(image_list)))
- return image_list
- def need_image_slice(image_np):
- h, w = image_np.shape[:2]
- # if h > 3000 and w < 2000:
- # return True
- if 2. <= h / w and w >= 100:
- return True
- return False
- def remove_black_border(img_np):
- try:
- # 阈值
- threshold = 100
- # 转换为灰度图像
- gray = cv2.cvtColor(img_np, cv2.COLOR_RGB2GRAY)
- # 获取图片尺寸
- h, w = gray.shape[:2]
- # 无法区分黑色区域超过一半的情况
- rowc = gray[:, int(1/2*w)]
- colc = gray[int(1/2*h), :]
- rowflag = np.argwhere(rowc > threshold)
- colflag = np.argwhere(colc > threshold)
- left, bottom, right, top = rowflag[0, 0], colflag[-1, 0], rowflag[-1, 0], colflag[0, 0]
- # cv2.imshow('remove_black_border', img_np[left:right, top:bottom, :])
- # cv2.waitKey()
- return img_np[left:right, top:bottom, :]
- except:
- return img_np
- class ImageConvert:
- def __init__(self, path, unique_type_dir):
- from format_convert.convert_tree import _Document
- self._doc = _Document(path)
- self.path = path
- self.unique_type_dir = unique_type_dir
- def init_package(self):
- # 各个包初始化
- try:
- with open(self.path, "rb") as f:
- self.image = f.read()
- except:
- log("cannot open image!")
- traceback.print_exc()
- self._doc.error_code = [-3]
- def convert(self):
- from format_convert.convert_tree import _Page, _Image
- self.init_package()
- if self._doc.error_code is not None:
- return
- _page = _Page(None, 0)
- _image = _Image(self.image, self.path)
- _page.add_child(_image)
- self._doc.add_child(_page)
- def get_html(self):
- try:
- self.convert()
- except:
- traceback.print_exc()
- self._doc.error_code = [-1]
- if self._doc.error_code is not None:
- return self._doc.error_code
- return self._doc.get_html()
- def image_process_old(image_np, image_path, is_from_pdf=False, is_from_docx=False, use_ocr=True):
- from format_convert.convert_tree import _Table, _Sentence
- def get_cluster(t_list, b_list, axis):
- zip_list = list(zip(t_list, b_list))
- if len(zip_list) == 0:
- return t_list, b_list
- if len(zip_list[0]) > 0:
- zip_list.sort(key=lambda x: x[1][axis][1])
- cluster_list = []
- margin = 5
- for text, bbox in zip_list:
- _find = 0
- for cluster in cluster_list:
- if abs(cluster[1] - bbox[axis][1]) <= margin:
- cluster[0].append([text, bbox])
- cluster[1] = bbox[axis][1]
- _find = 1
- break
- if not _find:
- cluster_list.append([[[text, bbox]], bbox[axis][1]])
- new_text_list = []
- new_bbox_list = []
- for cluster in cluster_list:
- # print("=============convert_image")
- # print("cluster_list", cluster)
- center_y = 0
- for text, bbox in cluster[0]:
- center_y += bbox[axis][1]
- center_y = int(center_y / len(cluster[0]))
- for text, bbox in cluster[0]:
- bbox[axis][1] = center_y
- new_text_list.append(text)
- new_bbox_list.append(bbox)
- # print("cluster_list", cluster)
- return new_text_list, new_bbox_list
- def merge_textbox(textbox_list, in_objs):
- delete_obj = []
- threshold = 5
- textbox_list.sort(key=lambda x:x.bbox[0])
- for k in range(len(textbox_list)):
- tb1 = textbox_list[k]
- if tb1 not in in_objs and tb1 not in delete_obj:
- for m in range(k+1, len(textbox_list)):
- tb2 = textbox_list[m]
- if tb2 in in_objs:
- continue
- if abs(tb1.bbox[1]-tb2.bbox[1]) <= threshold \
- and abs(tb1.bbox[3]-tb2.bbox[3]) <= threshold:
- if tb1.bbox[0] <= tb2.bbox[0]:
- tb1.text = tb1.text + tb2.text
- else:
- tb1.text = tb2.text + tb1.text
- tb1.bbox[0] = min(tb1.bbox[0], tb2.bbox[0])
- tb1.bbox[2] = max(tb1.bbox[2], tb2.bbox[2])
- delete_obj.append(tb2)
- for _obj in delete_obj:
- if _obj in textbox_list:
- textbox_list.remove(_obj)
- return textbox_list
- log("into image_preprocess")
- try:
- if image_np is None:
- return []
- # 整体分辨率限制
- if image_np.shape[0] > 2000 or image_np.shape[1] > 2000:
- h, w = get_best_predict_size2(image_np, threshold=2000)
- log("global image resize " + str(image_np.shape[:2]) + " -> " + str(h) + "," + str(w))
- image_np = pil_resize(image_np, h, w)
- # 图片倾斜校正,写入原来的图片路径
- # print("image_process", image_path)
- g_r_i = get_rotated_image(image_np, image_path)
- if judge_error_code(g_r_i):
- if is_from_docx:
- return []
- else:
- return g_r_i
- image_np = cv2.imread(image_path)
- image_np_copy = copy.deepcopy(image_np)
- if image_np is None:
- return []
- # if image_np is None:
- # return []
- #
- # # idc模型实现图片倾斜校正
- # image_resize = pil_resize(image_np, 640, 640)
- # image_resize_path = image_path.split(".")[0] + "_resize_idc." + image_path.split(".")[-1]
- # cv2.imwrite(image_resize_path, image_resize)
- #
- # with open(image_resize_path, "rb") as f:
- # image_bytes = f.read()
- # angle = from_idc_interface(image_bytes)
- # if judge_error_code(angle):
- # if is_from_docx:
- # return []
- # else:
- # return angle
- # # 根据角度旋转
- # image_pil = Image.fromarray(image_np)
- # image_np = np.array(image_pil.rotate(angle, expand=1))
- # # 写入
- # idc_path = image_path.split(".")[0] + "_idc." + image_path.split(".")[-1]
- # cv2.imwrite(idc_path, image_np)
- # isr模型去除印章
- _isr_time = time.time()
- if count_red_pixel(image_np):
- # 红色像素达到一定值才过模型
- with open(image_path, "rb") as f:
- image_bytes = f.read()
- image_np = from_isr_interface(image_bytes)
- if judge_error_code(image_np):
- if is_from_docx:
- return []
- else:
- return image_np
- # [1]代表检测不到印章,直接返回
- if isinstance(image_np, list) and image_np == [1]:
- log("no seals detected!")
- image_np = image_np_copy
- else:
- isr_path = image_path.split(".")[0] + "_isr." + image_path.split(".")[-1]
- cv2.imwrite(isr_path, image_np)
- log("isr total time "+str(time.time()-_isr_time))
- # otr模型识别表格,需要图片resize成模型所需大小, 写入另一个路径
- best_h, best_w = get_best_predict_size(image_np)
- # image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA)
- image_resize = pil_resize(image_np, best_h, best_w)
- image_resize_path = image_path.split(".")[0] + "_resize_otr." + image_path.split(".")[-1]
- cv2.imwrite(image_resize_path, image_resize)
- # 调用otr模型接口
- with open(image_resize_path, "rb") as f:
- image_bytes = f.read()
- list_line = from_otr_interface(image_bytes, is_from_pdf)
- if judge_error_code(list_line):
- return list_line
- # # 预处理
- # if is_from_pdf:
- # prob = 0.2
- # else:
- # prob = 0.5
- # with open(image_resize_path, "rb") as f:
- # image_bytes = f.read()
- # img_new, inputs = table_preprocess(image_bytes, prob)
- # if type(img_new) is list and judge_error_code(img_new):
- # return img_new
- # log("img_new.shape " + str(img_new.shape))
- #
- # # 调用模型运行接口
- # _dict = {"inputs": inputs, "md5": _global.get("md5")}
- # result = from_gpu_interface(_dict, model_type="otr", predictor_type="")
- # if judge_error_code(result):
- # logging.error("from_gpu_interface failed! " + str(result))
- # raise requests.exceptions.RequestException
- #
- # pred = result.get("preds")
- # gpu_time = result.get("gpu_time")
- # log("otr model predict time " + str(gpu_time))
- #
- # # # 解压numpy
- # # decompressed_array = io.BytesIO()
- # # decompressed_array.write(pred)
- # # decompressed_array.seek(0)
- # # pred = np.load(decompressed_array, allow_pickle=True)['arr_0']
- # # log("inputs.shape" + str(pred.shape))
- #
- # 调用gpu共享内存处理
- # _dict = {"inputs": inputs, "md5": _global.get("md5")}
- # result = from_gpu_share_memory(_dict, model_type="otr", predictor_type="")
- # if judge_error_code(result):
- # logging.error("from_gpu_interface failed! " + str(result))
- # raise requests.exceptions.RequestException
- #
- # pred = result.get("preds")
- # gpu_time = result.get("gpu_time")
- # log("otr model predict time " + str(gpu_time))
- #
- # # 后处理
- # list_line = table_postprocess(img_new, pred, prob)
- # log("len(list_line) " + str(len(list_line)))
- # if judge_error_code(list_line):
- # return list_line
- # otr resize后得到的bbox根据比例还原
- start_time = time.time()
- ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w)
- for i in range(len(list_line)):
- point = list_line[i]
- list_line[i] = [int(point[0]*ratio[1]), int(point[1]*ratio[0]),
- int(point[2]*ratio[1]), int(point[3]*ratio[0])]
- log("otr resize bbox recover " + str(time.time()-start_time))
- # ocr图片过大内存溢出,需resize
- start_time = time.time()
- threshold = 3000
- ocr_resize_flag = 0
- if image_np.shape[0] >= threshold or image_np.shape[1] >= threshold:
- ocr_resize_flag = 1
- best_h, best_w = get_best_predict_size2(image_np, threshold)
- # image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA)
- image_resize = pil_resize(image_np, best_h, best_w)
- log("ocr_process image resize " + str(image_resize.shape))
- image_resize_path = image_path.split(".")[0] + "_resize_ocr." + image_path.split(".")[-1]
- cv2.imwrite(image_resize_path, image_resize)
- log("ocr resize before " + str(time.time()-start_time))
- # 调用ocr模型接口
- with open(image_resize_path, "rb") as f:
- image_bytes = f.read()
- text_list, bbox_list = from_ocr_interface(image_bytes, is_table=True)
- if judge_error_code(text_list):
- return text_list
- # # PaddleOCR内部包括预处理,调用模型运行接口,后处理
- # paddle_ocr = PaddleOCR(use_angle_cls=True, lang="ch")
- # results = paddle_ocr.ocr(image_resize, det=True, rec=True, cls=True)
- # # 循环每张图片识别结果
- # text_list = []
- # bbox_list = []
- # for line in results:
- # # print("ocr_interface line", line)
- # text_list.append(line[-1][0])
- # bbox_list.append(line[0])
- # if len(text_list) == 0:
- # return []
- # ocr resize后的bbox还原
- if ocr_resize_flag:
- ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w)
- else:
- ratio = (1, 1)
- for i in range(len(bbox_list)):
- point = bbox_list[i]
- bbox_list[i] = [[int(point[0][0]*ratio[1]), int(point[0][1]*ratio[0])],
- [int(point[1][0]*ratio[1]), int(point[1][1]*ratio[0])],
- [int(point[2][0]*ratio[1]), int(point[2][1]*ratio[0])],
- [int(point[3][0]*ratio[1]), int(point[3][1]*ratio[0])]]
- # 调用现成方法形成表格
- try:
- from format_convert.convert_tree import TableLine
- list_lines = []
- for line in list_line:
- list_lines.append(LTLine(1, (line[0], line[1]), (line[2], line[3])))
- from format_convert.convert_tree import TextBox
- list_text_boxes = []
- for i in range(len(bbox_list)):
- bbox = bbox_list[i]
- b_text = text_list[i]
- list_text_boxes.append(TextBox([bbox[0][0], bbox[0][1],
- bbox[2][0], bbox[2][1]], b_text))
- # for _textbox in list_text_boxes:
- # print("==",_textbox.get_text())
- lt = LineTable()
- tables, obj_in_table, _ = lt.recognize_table(list_text_boxes, list_lines, False)
- # 合并同一行textbox
- list_text_boxes = merge_textbox(list_text_boxes, obj_in_table)
- obj_list = []
- for table in tables:
- obj_list.append(_Table(table["table"], table["bbox"]))
- for text_box in list_text_boxes:
- if text_box not in obj_in_table:
- obj_list.append(_Sentence(text_box.get_text(), text_box.bbox))
- return obj_list
- except:
- traceback.print_exc()
- return [-8]
- except Exception as e:
- log("image_preprocess error")
- traceback.print_exc()
- return [-1]
- if __name__ == "__main__":
- image_slice_new(cv2.imread("C:/Users/Administrator/Desktop/test_image/error28.jpg"))
|