# -*- coding:utf-8 -*- import argparse import copy import hashlib import inspect import json import os import pickle import socket import subprocess import sys from io import BytesIO from subprocess import Popen from shapely.geometry import LineString import cv2 import requests from PIL import Image sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") import difflib import logging import mimetypes import platform import re import traceback import filetype from bs4 import BeautifulSoup import yaml from pdfminer.layout import * from format_convert import _global from functools import wraps import psutil import time import numpy as np from format_convert.judge_platform import get_platform from config.interface_list import INTERFACES if get_platform() == "Linux": import resource import math from shapely.geometry import Polygon config_file_path = os.path.dirname(os.path.abspath(__file__)) + "/../config/interface_new.yml" def has_intersection(poly1, poly2): """ 判断两个四边形是否有交集。 参数: poly1, poly2: list of tuples, 每个tuple表示一个顶点的(x, y)坐标。 例如: [(x1, y1), (x2, y2), (x3, y3), (x4, y4)] 返回: bool: 如果两个四边形有交集则返回True,否则返回False。 """ # 创建Shapely多边形对象 polygon1 = Polygon(poly1) polygon2 = Polygon(poly2) # 使用intersects方法判断是否有交集 return polygon1.intersects(polygon2) def judge_error_code(_list, code=[0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14, -15, -16, -17, -18, -19, -20, -21, -22]): """ [0] : continue [-1]: 逻辑处理错误 [-2]: 接口调用错误 [-3]: 文件格式错误,无法打开 [-4]: 各类文件调用第三方包读取超时 [-5]: 整个转换过程超时 [-6]: 阿里云UDF队列超时 [-7]: 文件需密码,无法打开 [-8]: 调用现成接口报错 [-9]: 接口接收数据为空 [-10]: 长图分割报错 [-11]: 新接口idc、isr、atc报错 [-12]: 表格跨页连接报错 [-13]: pdf表格线处理报错 [-14]: 指定页码报错 [-15]: office转换接口未运行 [-16]: idc方向分类错误导致ocr读取乱码 [-17]: tika接口报错 [-18]: 新的swf处理报错 [-19]: 动态获取端口报错 [-20]: requests请求超时 [-21]: requests请求返回错误状态码 [-22]: requests请求拒绝连接 """ for c in code: if isinstance(_list, list) and _list == [c]: return True return False def add_div(text): if text == "" or text is None: return text # if get_platform() == "Windows": # print("add_div", text) if re.findall("
", text): return text text = "
" + text + "\n" text = re.sub("\n", "
", text) # text += "
" if text[-5:] == "
": # print("add_div has cut", text[-30:]) text = text[:-5] return text def get_platform(): sys = platform.system() return sys def get_html_p(html_path): log("into get_html_p") try: with open(html_path, "r") as ff: html_str = ff.read() soup = BeautifulSoup(html_str, 'lxml') text = "" for p in soup.find_all("p"): p_text = p.text p_text = p_text.strip() if p.string != "": text += p_text text += "\n" return text except Exception as e: log("get_html_p error!") return [-1] def string_similarity(str1, str2): # 去掉
和回车 str1 = re.sub("
", "", str1) str1 = re.sub("
", "", str1) str1 = re.sub("\n", "", str1) str2 = re.sub("
", "", str2) str2 = re.sub("
", "", str2) str2 = re.sub("\n", "", str2) # print("********************************") # print("str1", str1) # print("********************************") # print("str2", str2) # print("********************************") score = difflib.SequenceMatcher(None, str1, str2).ratio() print("string_similarity", score) return score def get_sequential_data(text_list, bbox_list, html=False): logging.info("into get_sequential_data") try: text = "" order_list = [] for i in range(len(text_list)): length_start = bbox_list[i][0][0] length_end = bbox_list[i][1][0] height_start = bbox_list[i][0][1] height_end = bbox_list[i][-1][1] # print([length_start, length_end, height_start, height_end]) order_list.append([text_list[i], length_start, length_end, height_start, height_end]) # text = text + infomation['text'] + "\n" if get_platform() == "Windows": print("get_sequential_data", order_list) if not order_list: if get_platform() == "Windows": print("get_sequential_data", "no order list") return "" # 根据bbox的坐标对输出排序 order_list.sort(key=lambda x: (x[3], x[1], x[0])) # 根据bbox分行分列 # col_list = [] # height_end = int((order_list[0][4] + order_list[0][3]) / 2) # for i in range(len(order_list)): # if height_end - threshold <= order_list[i][3] <= height_end + threshold: # col_list.append(order_list[i]) # else: # row_list.append(col_list) # col_list = [] # height_end = int((order_list[i][4] + order_list[i][3]) / 2) # col_list.append(order_list[i]) # if i == len(order_list) - 1: # row_list.append(col_list) row_list = [] used_box = [] threshold = 5 for box in order_list: if box in used_box: continue height_center = (box[4] + box[3]) / 2 row = [] for box2 in order_list: if box2 in used_box: continue height_center2 = (box2[4] + box2[3]) / 2 if height_center - threshold <= height_center2 <= height_center + threshold: if box2 not in row: row.append(box2) used_box.append(box2) row.sort(key=lambda x: x[0]) row_list.append(row) for row in row_list: if not row: continue if len(row) <= 1: text = text + row[0][0] + "\n" else: sub_text = "" row.sort(key=lambda x: x[1]) for col in row: sub_text = sub_text + col[0] + " " sub_text = sub_text + "\n" text += sub_text if html: text = "
" + text text = re.sub("\n", "
\n
", text) text += "
" # if text[-5:] == "
": # text = text[:-5] return text except Exception as e: logging.info("get_sequential_data error!") print("get_sequential_data", traceback.print_exc()) return [-1] def rename_inner_files(root_path): try: logging.info("into rename_inner_files") # 获取解压文件夹下所有文件+文件夹,不带根路径 path_list = [] for root, dirs, files in os.walk(root_path, topdown=False): for name in dirs: p = os.path.join(root, name) + os.sep if get_platform() == "Windows": root_path = slash_replace(root_path) p = slash_replace(p) p = re.sub(root_path, "", p) root_path = slash_replace(root_path, True) p = slash_replace(p, True) else: p = re.sub(root_path, "", p) path_list.append(p) for name in files: p = os.path.join(root, name) if get_platform() == "Windows": root_path = slash_replace(root_path) p = slash_replace(p) p = re.sub(root_path, "", p) root_path = slash_replace(root_path, True) p = slash_replace(p, True) else: p = re.sub(root_path, "", p) path_list.append(p) # 按路径长度排序 path_list.sort(key=lambda x: len(x), reverse=True) # 循环改名 for old_path in path_list: # 按路径分隔符分割 ss = old_path.split(os.sep) # 判断是否文件夹 is_dir = 0 file_type = "" if os.path.isdir(root_path + old_path): ss = ss[:-1] is_dir = 1 else: if "." in old_path: file_type = "." + old_path.split(".")[-1] else: file_type = "" # 最后一级需要用hash改名 new_path = "" # new_path = re.sub(ss[-1], str(hash(ss[-1])), old_path) + file_type current_level = 0 for s in ss: # 路径拼接 if current_level < len(ss) - 1: new_path += s + os.sep else: new_path += str(hash(s)) + file_type current_level += 1 new_ab_path = root_path + new_path old_ab_path = root_path + old_path os.rename(old_ab_path, new_ab_path) # 重新获取解压文件夹下所有文件+文件夹 new_path_list = [] for root, dirs, files in os.walk(root_path, topdown=False): for name in dirs: new_path_list.append(os.path.join(root, name) + os.sep) for name in files: new_path_list.append(os.path.join(root, name)) return new_path_list except: traceback.print_exc() return [-1] def judge_format(path): guess1 = mimetypes.guess_type(path) _type = None if guess1[0]: _type = guess1[0] else: guess2 = filetype.guess(path) if guess2: _type = guess2.mime if _type == "application/pdf": return "pdf" if _type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": return "docx" if _type == "application/x-zip-compressed" or _type == "application/zip": return "zip" if _type == "application/x-rar-compressed" or _type == "application/rar": return "rar" if _type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": return "xlsx" if _type == "application/msword": return "doc" if _type == "image/png": return "png" if _type == "image/jpeg": return "jpg" # 猜不到,返回None return None def draw_lines_plt(bboxes): import matplotlib.pyplot as plt plt.figure() for bbox in bboxes: x = [bbox[0], bbox[2]] y = [bbox[1], bbox[3]] plt.plot(x, y) plt.show() def slash_replace(_str, reverse=False): if reverse: _str = eval(repr(_str).replace('/', '\\\\')) else: _str = eval(repr(_str).replace('\\\\', '/')) return _str class LineTable: def recognize_table(self, list_textbox, list_line, sourceP_LB=False, splited=False, from_pdf=False, is_reverse=False, show=0): self.list_line = list_line self.list_crosspoints = self.recognize_crosspoints(list_line) self.from_pdf = from_pdf self.splited = splited self.connect_bbox_list = [] self.is_reverse = is_reverse self.show = show if self.show: # 展示原始表格及文字 self._plot(list_line, list_textbox, title='list_line,list_textbox') # 聚类 cluster_crosspoints = [] for _point in self.list_crosspoints: cluster_crosspoints.append({"lines": _point.get("lines"), "points": [_point]}) while 1: _find = False new_cluster_crosspoints = [] for l_point in cluster_crosspoints: _flag = False for l_n_point in new_cluster_crosspoints: line1 = l_point.get("lines") line2 = l_n_point.get("lines") if len(line1 & line2) > 0: _find = True _flag = True l_n_point["lines"] = line1.union(line2) l_n_point["points"].extend(l_point["points"]) if not _flag: new_cluster_crosspoints.append({"lines": l_point.get("lines"), "points": l_point.get("points")}) cluster_crosspoints = new_cluster_crosspoints if not _find: break # need to sort to deal with the inner tables for clu_cp in cluster_crosspoints: points = clu_cp["points"] list_p = np.array([p["point"] for p in points]) max_x = max(list_p[..., 0]) min_x = min(list_p[..., 0]) max_y = max(list_p[..., 1]) min_y = min(list_p[..., 1]) _area = (max_y - min_y) * (max_x - min_x) clu_cp["area"] = _area cluster_crosspoints.sort(key=lambda x: x["area"]) list_l_rect = [] for table_crosspoint in cluster_crosspoints: list_rect = self.crosspoint2rect(table_crosspoint.get("points")) list_l_rect.append(list_rect) if self.show: # 打印单元格 for list_rect in list_l_rect: for rect in list_rect: print('rect', rect) self._plot([], [], list_rect, title='list_l_rect') in_objs = set() list_tables = [] for l_rect in list_l_rect: _ta = self.rect2table(list_textbox, l_rect, in_objs, sourceP_LB=sourceP_LB) if self.connect_bbox_list: return [], [], [], self.connect_bbox_list if _ta: list_tables.append(_ta) if self.show: # 打印最终表格 for table in list_tables: table = table.get('table') for row in table: print('------ row ------') for col in row: print('col', col) return list_tables, in_objs, list_l_rect, [] # def recognize_table_by_rect(self, list_textbox, list_rect, margin=2): # # dump_margin = 5 # list_rect_tmp = [] # # 去重 # for _rect in list_rect: # if (_rect.bbox[3] - _rect.bbox[1] < 10) or (abs(_rect.bbox[2] - _rect.bbox[0]) < 5): # continue # _find = False # for _tmp in list_rect_tmp: # for i in range(4): # if abs(_rect.bbox[i] - _tmp.bbox[i]) < dump_margin: # pass # else: # _find = False # break # if i == 3: # _find = True # if _find: # break # if not _find: # list_rect_tmp.append(_rect) # # # print("=====",len(list_rect),len(list_rect_tmp)) # # print(list_rect_tmp) # # from matplotlib import pyplot as plt # # plt.figure() # # for _rect in list_rect_tmp: # # x0,y0,x1,y1 = _rect.bbox # # plt.boxplot(_rect.bbox) # # plt.show() # # cluster_rect = [] # for _rect in list_rect: # _find = False # for cr in cluster_rect: # for cr_rect in cr: # if abs((cr_rect.bbox[2] - cr_rect.bbox[0] + _rect.bbox[2] - _rect.bbox[0]) - ( # max(cr_rect.bbox[2], _rect.bbox[2]) - min(cr_rect.bbox[0], _rect.bbox[0]))) < margin: # _find = True # cr.append(_rect) # break # elif abs((cr_rect.bbox[3] - cr_rect.bbox[1] + _rect.bbox[3] - _rect.bbox[1]) - ( # max(cr_rect.bbox[3], _rect.bbox[3]) - min(cr_rect.bbox[1], _rect.bbox[1]))) < margin: # _find = True # cr.append(_rect) # break # if _find: # break # if not _find: # cluster_rect.append([_rect]) # # list_l_rect = cluster_rect # # in_objs = set() # list_tables = [] # for l_rect in list_l_rect: # _ta = self.rect2table(list_textbox, l_rect, in_objs) # if _ta: # list_tables.append(_ta) # return list_tables, in_objs, list_l_rect def recognize_crosspoints(self, list_line, fixLine=True): list_crosspoints = [] # print("lines num",len(list_line)) def getMaxPoints(list_x, margin=5, reverse=False): clust_x = [] for _x in list_x: _find = False for cx in clust_x: if abs(cx[0] - _x) < margin: _find = True cx.append(_x) break if not _find: clust_x.append([_x]) clust_x.sort(key=lambda x: x, reverse=reverse) return clust_x[0][0], len(clust_x[0]) for _i in range(len(list_line)): for _j in range(len(list_line)): line1 = list_line[_i].__dict__.get("bbox") line2 = list_line[_j].__dict__.get("bbox") exists, point = self.cross_point(line1, line2) if exists: list_crosspoints.append(point) if fixLine: # 聚类 cluster_crosspoints = [] for _point in list_crosspoints: cluster_crosspoints.append({"lines": _point.get("lines"), "points": [_point]}) while 1: _find = False new_cluster_crosspoints = [] for l_point in cluster_crosspoints: _flag = False for l_n_point in new_cluster_crosspoints: line1 = l_point.get("lines") line2 = l_n_point.get("lines") if len(line1 & line2) > 0: _find = True _flag = True l_n_point["lines"] = line1.union(line2) l_n_point["points"].extend(l_point["points"]) if not _flag: new_cluster_crosspoints.append({"lines": l_point.get("lines"), "points": l_point.get("points")}) cluster_crosspoints = new_cluster_crosspoints if not _find: break list_crosspoints = [] for list_cp in cluster_crosspoints: points = list_cp.get("points") l_lines = [] for p in points: l_lines.extend(p.get("p_lines")) l_lines = list(set(l_lines)) l_lines.sort(key=lambda x: x[0]) min_x, _count = getMaxPoints([l[0] for l in l_lines], reverse=False) if _count <= 2: min_x = None min_y, _count = getMaxPoints([l[1] for l in l_lines], reverse=False) if _count < 2: min_y = None max_x, _count = getMaxPoints([l[2] for l in l_lines], reverse=True) if _count <= 2: max_x = None max_y, _count = getMaxPoints([l[3] for l in l_lines], reverse=True) if _count <= 2: max_y = None if min_x and min_y and max_x and max_y: points.sort(key=lambda x: x["point"][0]) if abs(min_x - points[0]["point"][0]) > 30: _line = LTLine(1, (min_x, min_y), (min_x, max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====",_line.bbox) if abs(max_x - points[-1]["point"][0]) > 30: _line = LTLine(1, (max_x, min_y), (max_x, max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====1",_line.bbox) points.sort(key=lambda x: x["point"][1]) if abs(min_y - points[0]["point"][1]) > 30: _line = LTLine(1, (min_x, min_y), (max_x, min_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====2",_line.bbox) if abs(max_y - points[-1]["point"][1]) > 30: _line = LTLine(1, (min_x, max_y), (max_x, max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====2",_line.bbox) for _i in range(len(l_lines)): for _j in range(len(l_lines)): line1 = l_lines[_i] line2 = l_lines[_j] exists, point = self.cross_point(line1, line2) if exists: list_crosspoints.append(point) # from matplotlib import pyplot as plt # plt.figure() # for _line in l_lines: # x0,y0,x1,y1 = _line # plt.plot([x0,x1],[y0,y1]) # for point in list_crosspoints: # plt.scatter(point.get("point")[0],point.get("point")[1]) # plt.show() # print(list_crosspoints) # print("points num",len(list_crosspoints)) return list_crosspoints # def recognize_rect(self, _page): # list_line = [] # for _obj in _page._objs: # if isinstance(_obj, (LTLine)): # list_line.append(_obj) # list_crosspoints = self.recognize_crosspoints(list_line) # # # 聚类 # cluster_crosspoints = [] # for _point in list_crosspoints: # cluster_crosspoints.append({"lines": _point.get("lines"), "points": [_point]}) # while 1: # _find = False # new_cluster_crosspoints = [] # for l_point in cluster_crosspoints: # _flag = False # for l_n_point in new_cluster_crosspoints: # line1 = l_point.get("lines") # line2 = l_n_point.get("lines") # if len(line1 & line2) > 0: # _find = True # _flag = True # l_n_point["lines"] = line1.union(line2) # l_n_point["points"].extend(l_point["points"]) # if not _flag: # new_cluster_crosspoints.append({"lines": l_point.get("lines"), "points": l_point.get("points")}) # cluster_crosspoints = new_cluster_crosspoints # if not _find: # break # # print(len(cluster_crosspoints)) # # list_l_rect = [] # for table_crosspoint in cluster_crosspoints: # list_rect = self.crosspoint2rect(table_crosspoint.get("points")) # list_l_rect.append(list_rect) # # return list_l_rect def crosspoint2rect(self, list_crosspoint, margin=10): dict_line_points = {} for _point in list_crosspoint: lines = list(_point.get("lines")) for _line in lines: if _line not in dict_line_points: dict_line_points[_line] = {"direct": None, "points": []} dict_line_points[_line]["points"].append(_point) # 排序 for k, v in dict_line_points.items(): list_x = [] list_y = [] for _p in v["points"]: list_x.append(_p.get("point")[0]) list_y.append(_p.get("point")[1]) if max(list_x) - min(list_x) > max(list_y) - min(list_y): v.get("points").sort(key=lambda x: x.get("point")[0]) v["direct"] = "row" else: v.get("points").sort(key=lambda x: x.get("point")[1]) v["direct"] = "column" list_rect = [] for _point in list_crosspoint: if _point["buttom"] >= margin and _point["right"] >= margin: lines = list(_point.get("lines")) _line = lines[0] if dict_line_points[_line]["direct"] == "column": _line = lines[1] next_point = None for p1 in dict_line_points[_line]["points"]: if p1["buttom"] >= margin and p1["point"][0] > _point["point"][0]: next_point = p1 break if not next_point: continue lines = list(next_point.get("lines")) _line = lines[0] if dict_line_points[_line]["direct"] == "row": _line = lines[1] final_point = None for p1 in dict_line_points[_line]["points"]: if p1["left"] >= margin and p1["point"][1] > next_point["point"][1]: final_point = p1 break if not final_point: continue _r = LTRect(1, (_point["point"][0], _point["point"][1], final_point["point"][0], final_point["point"][1])) list_rect.append(_r) tmp_rect = [] set_bbox = set() for _r in list_rect: _bbox = "%.2f-%.2f-%.2f-%.2f" % _r.bbox width = _r.bbox[2] - _r.bbox[0] height = _r.bbox[3] - _r.bbox[1] if width <= margin or height <= margin: continue if _bbox not in set_bbox: tmp_rect.append(_r) set_bbox.add(_bbox) list_rect = tmp_rect # _l = [x.get('point') for x in list_crosspoint] # _l.sort(key=lambda x: (x[0], x[1])) # print('list_crosspoint', _l) # print('list_rect', list_rect) # import cv2 # import numpy as np # import random # img = np.zeros(shape=(1000,1000),dtype=np.uint8) # img += 255 # # color = [] # for rect in list_rect: # color += 10 # x0,y0,x1,y1 = rect.bbox # x0 *= 10/18 # y0 *= 10/18 # x1 *= 10/18 # y1 *= 10/18 # print(rect.bbox) # cv2.rectangle(img, (int(x0),int(y0)),(int(x1),int(y1)), (color%255, (color+10)%255, (color+20)%255), 3) # cv2.imshow("bbox", img) # cv2.waitKey(0) return list_rect def cross_point(self, line1, line2, segment=True, margin=2): point_is_exist = False x = y = 0 x1, y1, x2, y2 = line1 x3, y3, x4, y4 = line2 if (x2 - x1) == 0: k1 = None b1 = 0 else: k1 = (y2 - y1) * 1.0 / (x2 - x1) # 计算k1,由于点均为整数,需要进行浮点数转化 b1 = y1 * 1.0 - x1 * k1 * 1.0 # 整型转浮点型是关键 if (x4 - x3) == 0: # L2直线斜率不存在 k2 = None b2 = 0 else: k2 = (y4 - y3) * 1.0 / (x4 - x3) # 斜率存在 b2 = y3 * 1.0 - x3 * k2 * 1.0 if k1 is None: if not k2 is None: x = x1 y = k2 * x1 + b2 point_is_exist = True elif k2 is None: x = x3 y = k1 * x3 + b1 elif not k2 == k1: x = (b2 - b1) * 1.0 / (k1 - k2) y = k1 * x * 1.0 + b1 * 1.0 point_is_exist = True left = 0 right = 0 top = 0 buttom = 0 if point_is_exist: if segment: if x >= (min(x1, x2) - margin) and x <= (max(x1, x2) + margin) and y >= ( min(y1, y2) - margin) and y <= (max(y1, y2) + margin): if x >= (min(x3, x4) - margin) and x <= (max(x3, x4) + margin) and y >= ( min(y3, y4) - margin) and y <= (max(y3, y4) + margin): point_is_exist = True left = abs(min(x1, x3) - x) right = abs(max(x2, x4) - x) top = abs(min(y1, y3) - y) buttom = abs(max(y2, y4) - y) else: point_is_exist = False else: point_is_exist = False line1_key = "%.2f-%.2f-%.2f-%.2f" % (x1, y1, x2, y2) line2_key = "%.2f-%.2f-%.2f-%.2f" % (x3, y3, x4, y4) return point_is_exist, {"point": [x, y], "left": left, "right": right, "top": top, "buttom": buttom, "lines": set([line1_key, line2_key]), "p_lines": [line1, line2]} # def unionTable(self, list_table, fixspan=True, margin=2): # set_x = set() # set_y = set() # # list_cell = [] # for _t in list_table: # for _line in _t: # list_cell.extend(_line) # # clusters_rects = [] # # 根据y1聚类 # set_id = set() # list_cell_dump = [] # for _cell in list_cell: # _id = id(_cell) # if _id in set_id: # continue # set_id.add(_id) # list_cell_dump.append(_cell) # list_cell = list_cell_dump # list_cell.sort(key=lambda x: x.get("bbox")[3]) # for _rect in list_cell: # _y0 = _rect.get("bbox")[3] # _find = False # for l_cr in clusters_rects: # if abs(l_cr[0].get("bbox")[3] - _y0) < 2: # _find = True # l_cr.append(_rect) # break # if not _find: # clusters_rects.append([_rect]) # # clusters_rects.sort(key=lambda x: x[0].get("bbox")[3], reverse=True) # for l_cr in clusters_rects: # l_cr.sort(key=lambda x: x.get("bbox")[0]) # # # print("=============:") # # for l_r in clusters_rects: # # print(len(l_r)) # # for _line in clusters_rects: # for _rect in _line: # (x0, y0, x1, y1) = _rect.get("bbox") # set_x.add(x0) # set_x.add(x1) # set_y.add(y0) # set_y.add(y1) # if len(set_x) == 0 or len(set_y) == 0: # return # list_x = list(set_x) # list_y = list(set_y) # # list_x.sort(key=lambda x: x) # list_y.sort(key=lambda x: x, reverse=True) # _table = [] # line_i = 0 # for _line in clusters_rects: # # table_line = [] # cell_i = 0 # for _rect in _line: # (x0, y0, x1, y1) = _rect.get("bbox") # _cell = {"bbox": (x0, y0, x1, y1), "rect": _rect.get("rect"), # "rowspan": self.getspan(list_y, y0, y1, margin), # "columnspan": self.getspan(list_x, x0, x1, margin), "text": _rect.get("text", "")} # table_line.append(_cell) # # cell_i += 1 # line_i += 1 # _table.append(table_line) # # # print("=====================>>") # # for _line in _table: # # for _cell in _line: # # print(_cell,end="\t") # # print("\n") # # print("=====================>>") # # # print(_table) # if fixspan: # for _line in _table: # extend_line = [] # for c_i in range(len(_line)): # _cell = _line[c_i] # if _cell.get("columnspan") > 1: # _cospan = _cell.get("columnspan") # _cell["columnspan"] = 1 # for i in range(1, _cospan): # extend_line.append({"index": c_i + 1, "cell": _cell}) # extend_line.sort(key=lambda x: x["index"], reverse=True) # for _el in extend_line: # _line.insert(_el["index"], _el["cell"]) # for l_i in range(len(_table)): # _line = _table[l_i] # for c_i in range(len(_line)): # _cell = _line[c_i] # if _cell.get("rowspan") > 1: # _rospan = _cell.get("rowspan") # _cell["rowspan"] = 1 # for i in range(1, _rospan): # _table[l_i + i].insert(c_i, _cell) # # table_bbox = (_table[0][0].get("bbox")[0], _table[0][0].get("bbox")[1], _table[-1][-1].get("bbox")[2], # _table[-1][-1].get("bbox")[3]) # # ta = {"bbox": table_bbox, "table": _table} # return ta # 获取点阵 def getSpanLocation(self, _list, x0, x1, margin): list_location = [] (x0, x1) = (min(x0, x1), max(x0, x1)) for _x in _list: if _x >= (x0 - margin) and _x <= (x1 + margin): list_location.append(_x) return list_location def fixSpan(self, _table, list_x, list_y, sourceP_LB): # with open('table.pickle', 'wb') as f: # pickle.dump(_table, f) def checkPosition(_line, _position, bbox, margin=5): # check y if len(_line) > 0: _bbox = _line[0].get("bbox") # check if has lap if min(_bbox[1], _bbox[3]) > max(bbox[1], bbox[3]) or max(_bbox[1], _bbox[3]) < min(bbox[1], bbox[3]): # if abs(min(_bbox[1],_bbox[3])-min(bbox[1],bbox[3]))>margin or abs(max(_bbox[1],_bbox[3])-max(bbox[1],bbox[3]))>margin: # print(_bbox) # print(bbox) # print("check position y false", _bbox, bbox) return False # check x if _position <= len(_line) - 1: after_bbox = _line[_position].get("bbox") # the insert bbox.x1 should not less then the after bbox.x0 if not (after_bbox[0] >= bbox[2]): # print("check position x after false 1") return False if 0 < _position - 1 < len(_line): before_bbox = _line[_position - 1].get("bbox") # the insert bbox.x1 should less equal than the first bbox.x0 if not (bbox[0] >= before_bbox[2]): # print("check position x before false 2") return False return True # 拓展columnspan的数据 for _line in _table: c_i = 0 while c_i < len(_line): _cell = _line[c_i] if _cell.get("columnspan") > 1: x0, y0, x1, y1 = _cell.get("bbox") _cospan = _cell.get("columnspan") locations = self.getSpanLocation(list_x, x0, x1, 10) if len(locations) == _cospan + 1: _cell["bbox"] = (x0, y0, locations[1], y1) _cell["columnspan"] = 1 # len(locations)==_colspan+1 for i in range(1, _cospan): n_cell = {} n_cell.update(_cell) n_cell["bbox"] = (locations[i], y0, locations[i + 1], y1) c_i += 1 # check the position if checkPosition(_line, c_i, n_cell["bbox"]): _line.insert(c_i, n_cell) c_i += 1 # 拓展rowspan的数据 for l_i in range(len(_table)): _line = _table[l_i] c_i = 0 while c_i < len(_line): _cell = _line[c_i] if _cell.get("rowspan") > 1: # print('_cell', _cell) x0, y0, x1, y1 = _cell.get("bbox") _rospan = _cell.get("rowspan") locations = self.getSpanLocation(list_y, y0, y1, 10) # print('locations', locations) if len(locations) == _rospan + 1: if self.is_reverse: _cell["bbox"] = (x0, locations[-2], x1, y0) else: _cell["bbox"] = (x0, y0, x1, locations[1]) _cell["rowspan"] = 1 # print('_cell1', _cell) for i in range(1, _rospan): n_cell = {} n_cell.update(_cell) # if not self.is_reverse: if l_i + i <= len(_table) - 1: # print(len(_table),l_i+i) n_cell["bbox"] = (x0, locations[i], x1, locations[i + 1]) # print('n_cell', n_cell) if checkPosition(_table[l_i + i], c_i, n_cell["bbox"]): # print('n_cell1', n_cell) _table[l_i + i].insert(c_i, n_cell) # else: # if l_i - i >= 0: # # print(len(_table),l_i+i) # n_cell["bbox"] = (x0, locations[i], x1, locations[i + 1]) # print('n_cell', n_cell) # if checkPosition(_table[l_i - i], c_i, n_cell["bbox"]): # print('n_cell1', n_cell) # _table[l_i - i].insert(c_i, n_cell) c_i += 1 def fixRect(self, _table, list_x, list_y, sourceP_LB, margin): self.fixSpan(_table, list_x, list_y, sourceP_LB) # for line_i in range(len(_table)): # for cell_i in range(len(_table[line_i])): # _cell = _table[line_i][cell_i] # print(line_i,cell_i,_cell["bbox"],_cell["text"]) for _line in _table: _line.sort(key=lambda x: x.get('bbox')[0]) # print('_line', _line) extend_line = [] for c_i in range(len(_line)): c_cell = _line[c_i] # first cell missing if c_i == 0 and c_cell["bbox"][0] != list_x[0]: # print('c_cell', c_cell) # print('list_x', list_x) _bbox = (list_x[0], c_cell["bbox"][1], c_cell["bbox"][0], c_cell["bbox"][3]) _cell = {"bbox": _bbox, "rect": LTRect(1, _bbox), "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index": c_i, "cell": _cell}) # cell in the median missing if c_i < len(_line) - 1: n_cell = _line[c_i + 1] _bbox = c_cell["bbox"] n_bbox = n_cell["bbox"] if _bbox[0] == n_bbox[0] and _bbox[2] == n_bbox[2]: continue else: if abs(_bbox[2] - n_bbox[0]) > margin: _bbox = (_bbox[2], _bbox[1], n_bbox[0], _bbox[3]) _cell = {"bbox": _bbox, "rect": LTRect(1, _bbox), "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index": c_i + 1, "cell": _cell}) # last cell missing if c_i == len(_line) - 1: if abs(c_cell["bbox"][2] - list_x[-1]) > margin: _bbox = (c_cell["bbox"][2], c_cell["bbox"][1], list_x[-1], c_cell["bbox"][3]) _cell = {"bbox": _bbox, "rect": LTRect(1, _bbox), "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index": c_i + 1, "cell": _cell}) extend_line.sort(key=lambda x: x["index"], reverse=True) for _tmp in extend_line: _line.insert(_tmp["index"], _tmp["cell"]) def fix_span(self, _table, list_x, list_y, sourceP_LB): def checkPosition(_line, _position, bbox, margin=5): # check y if len(_line) > 0: _bbox = _line[0].get("bbox") # check if has lap if min(_bbox[1], _bbox[3]) > max(bbox[1], bbox[3]) or max(_bbox[1], _bbox[3]) < min(bbox[1], bbox[3]): # if abs(min(_bbox[1],_bbox[3])-min(bbox[1],bbox[3]))>margin or abs(max(_bbox[1],_bbox[3])-max(bbox[1],bbox[3]))>margin: # print(_bbox) # print(bbox) # print("check position y false", _bbox, bbox) return False # check x if _position <= len(_line) - 1: after_bbox = _line[_position].get("bbox") # the insert bbox.x1 should not less then the after bbox.x0 if not (after_bbox[0] >= bbox[2]): # print("check position x after false 1") return False if 0 < _position - 1 < len(_line): before_bbox = _line[_position - 1].get("bbox") # the insert bbox.x1 should less equal than the first bbox.x0 if not (bbox[0] >= before_bbox[2]): # print("check position x before false 2") return False return True # 记录合并单元格的位置及格子数 span_list = [] # 拓展columnspan的数据 for l_i, _line in enumerate(_table): c_i = 0 while c_i < len(_line): _cell = _line[c_i] if _cell.get("columnspan") > 1: x0, y0, x1, y1 = _cell.get("bbox") _cospan = _cell.get("columnspan") locations = self.getSpanLocation(list_x, x0, x1, 10) if len(locations) == _cospan + 1: span_list.append([l_i, c_i, 'col', _cospan]) _cell["bbox"] = (x0, y0, locations[1], y1) _cell["columnspan"] = 1 _cell["origin_columnspan"] = _cospan for i in range(1, _cospan): n_cell = {} n_cell.update(_cell) n_cell["origin_columnspan"] = 0 n_cell["bbox"] = (locations[i], y0, locations[i + 1], y1) c_i += 1 # check the position if checkPosition(_line, c_i, n_cell["bbox"]): _line.insert(c_i, n_cell) c_i += 1 # 拓展rowspan的数据 for l_i in range(len(_table)): _line = _table[l_i] c_i = 0 while c_i < len(_line): _cell = _line[c_i] if _cell.get("rowspan") > 1: x0, y0, x1, y1 = _cell.get("bbox") _rospan = _cell.get("rowspan") locations = self.getSpanLocation(list_y, y0, y1, 10) if len(locations) == _rospan + 1: span_list.append([l_i, c_i, 'row', _rospan]) if self.is_reverse: _cell["bbox"] = (x0, locations[-2], x1, y0) else: _cell["bbox"] = (x0, y0, x1, locations[1]) _cell["rowspan"] = 1 _cell["origin_rowspan"] = _rospan for i in range(1, _rospan): n_cell = {} n_cell.update(_cell) n_cell["origin_rowspan"] = 0 if l_i + i <= len(_table) - 1: n_cell["bbox"] = (x0, locations[i], x1, locations[i + 1]) if checkPosition(_table[l_i + i], c_i, n_cell["bbox"]): # print('n_cell1', n_cell) _table[l_i + i].insert(c_i, n_cell) c_i += 1 def fix_rect(self, _table, list_x, list_y, sourceP_LB, margin): self.fix_span(_table, list_x, list_y, sourceP_LB) for _line in _table: _line.sort(key=lambda x: x.get('bbox')[0]) # print('_line', _line) extend_line = [] for c_i in range(len(_line)): c_cell = _line[c_i] # first cell missing if c_i == 0 and c_cell["bbox"][0] != list_x[0]: # print('c_cell', c_cell) # print('list_x', list_x) _bbox = (list_x[0], c_cell["bbox"][1], c_cell["bbox"][0], c_cell["bbox"][3]) _cell = {"bbox": _bbox, "rect": LTRect(1, _bbox), "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index": c_i, "cell": _cell}) # cell in the median missing if c_i < len(_line) - 1: n_cell = _line[c_i + 1] _bbox = c_cell["bbox"] n_bbox = n_cell["bbox"] if _bbox[0] == n_bbox[0] and _bbox[2] == n_bbox[2]: continue else: if abs(_bbox[2] - n_bbox[0]) > margin: _bbox = (_bbox[2], _bbox[1], n_bbox[0], _bbox[3]) _cell = {"bbox": _bbox, "rect": LTRect(1, _bbox), "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index": c_i + 1, "cell": _cell}) # last cell missing if c_i == len(_line) - 1: if abs(c_cell["bbox"][2] - list_x[-1]) > margin: _bbox = (c_cell["bbox"][2], c_cell["bbox"][1], list_x[-1], c_cell["bbox"][3]) _cell = {"bbox": _bbox, "rect": LTRect(1, _bbox), "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index": c_i + 1, "cell": _cell}) extend_line.sort(key=lambda x: x["index"], reverse=True) for _tmp in extend_line: _line.insert(_tmp["index"], _tmp["cell"]) def feedText2table(self, _table, list_textbox, in_objs, sourceP_LB): # find the suitable cell of the textbox list_cells = [] for table_line in _table: for _cell in table_line: list_cells.append({"cell": _cell, "inbox_textbox_list": []}) self.connect_bbox_list = [] for textbox in list_textbox: list_iou = [] for _d in list_cells: _cell = _d["cell"] _iou = self.getIOU(textbox.bbox, _cell["bbox"]) list_iou.append(_iou) max_iou_index = np.argmax(list_iou) max_iou = list_iou[max_iou_index] # if self.from_pdf: # iou_threhold = 0.3 # else: iou_threhold = 0.1 if max_iou > iou_threhold and textbox not in in_objs: list_cells[max_iou_index]["inbox_textbox_list"].append(textbox) in_objs.add(textbox) if not self.from_pdf and not self.splited: # 多个iou大于0.3的,可能是ocr将两个文本合成一个了 iou_index_list = np.where(np.array(list_iou) >= 0.3)[0].tolist() if len(iou_index_list) >= 2: # print('len(iou_index_list) >= 2 textbox', textbox) self.connect_bbox_list.append(textbox) has_matched_box_list = [] for _d in list_cells: _cell = _d["cell"] inbox_textbox_list = _d["inbox_textbox_list"] # 分行,根据y重合 all_match_box_list = [] # inbox_textbox_list.sort(key=lambda x: x.bbox[1], reverse=sourceP_LB) inbox_textbox_list.sort(key=lambda x: x.bbox[1]) for i in range(len(inbox_textbox_list)): match_box_list = [] box1 = inbox_textbox_list[i] if box1 in has_matched_box_list: continue min_y1 = box1.bbox[1] + 1 / 3 * abs(box1.bbox[3] - box1.bbox[1]) max_y1 = box1.bbox[3] - 1 / 3 * abs(box1.bbox[3] - box1.bbox[1]) match_box_list.append( [box1.get_text(), box1.bbox[0], box1.bbox[1], box1.bbox[2], box1.bbox[3], min_y1, max_y1]) has_matched_box_list.append(box1) for j in range(i + 1, len(inbox_textbox_list)): box2 = inbox_textbox_list[j] if box2 in has_matched_box_list: continue # print(min_y1, box2.bbox[1], box2.bbox[3], max_y1) # print(min_y2, box1.bbox[3], max_y2) if min_y1 <= box2.bbox[1] <= max_y1 or \ min_y1 <= box2.bbox[3] <= max_y1 or \ box2.bbox[1] <= min_y1 <= max_y1 <= box2.bbox[3]: match_box_list.append( [box2.get_text(), box2.bbox[0], box2.bbox[1], box2.bbox[2], box2.bbox[3], min_y1, max_y1]) has_matched_box_list.append(box2) match_box_list.sort(key=lambda x: x[1]) all_match_box_list.append(match_box_list) # print("match_box_list", all_match_box_list) # all_match_box_list.sort(key=lambda x: (round(x[0][2] + x[0][4]) / 2, 0), reverse=sourceP_LB) all_match_box_list.sort(key=lambda x: (round(x[0][2] + x[0][4]) / 2, 0)) for box_list in all_match_box_list: for box in box_list: _cell["text"] += re.sub("\s", '', box[0]) # 打印所有cell # for _cell in list_cells: # print("cell", _cell) def makeTableByRect(self, list_rect, margin, sourceP_LB): _table = [] set_x = set() set_y = set() clusters_rects = [] # 根据y1聚类 # if sourceP_LB: # list_rect.sort(key=lambda x: x.bbox[3]) # for _rect in list_rect: # _y0 = _rect.bbox[3] # _y1 = _rect.bbox[1] # _find = False # for l_cr in clusters_rects: # if abs(l_cr[0].bbox[3] - _y0) < margin: # _find = True # l_cr.append(_rect) # break # if not _find: # clusters_rects.append([_rect]) # else: list_rect.sort(key=lambda x: x.bbox[1]) for _rect in list_rect: _y0 = _rect.bbox[1] _y1 = _rect.bbox[3] _find = False for l_cr in clusters_rects: if abs(l_cr[0].bbox[1] - _y0) < margin: _find = True l_cr.append(_rect) break if not _find: clusters_rects.append([_rect]) # print("textbox:===================") # for _textbox in list_textbox: # print(_textbox.get_text()) # print("textbox:======>>>>>>>>>>>>>") # for c in clusters_rects: # print("+"*30) # for cc in c: # print("rect", cc.) # cul spans for _line in clusters_rects: for _rect in _line: (x0, y0, x1, y1) = _rect.bbox set_x.add(x0) set_x.add(x1) set_y.add(y0) set_y.add(y1) if len(set_x) == 0 or len(set_y) == 0: return None, [], [] if len(list_rect) <= 1: return None, [], [] list_x = list(set_x) list_y = list(set_y) list_x.sort(key=lambda x: x) # list_y.sort(key=lambda x: x, reverse=sourceP_LB) list_y.sort(key=lambda x: x) # print("clusters_rects", len(clusters_rects)) # if sourceP_LB: # clusters_rects.sort(key=lambda x: (x[0].bbox[1] + x[0].bbox[3]) / 2, reverse=sourceP_LB) clusters_rects.sort(key=lambda x: (x[0].bbox[1] + x[0].bbox[3]) / 2, reverse=sourceP_LB) for l_cr in clusters_rects: l_cr.sort(key=lambda x: x.bbox[0]) pop_x = [] for i in range(len(list_x) - 1): _i = len(list_x) - i - 1 l_i = _i - 1 if abs(list_x[_i] - list_x[l_i]) < 5: pop_x.append(_i) pop_x.sort(key=lambda x: x, reverse=True) for _x in pop_x: list_x.pop(_x) # pop_x = [] for i in range(len(list_y) - 1): _i = len(list_y) - i - 1 l_i = _i - 1 if abs(list_y[_i] - list_y[l_i]) < 5: pop_x.append(_i) pop_x.sort(key=lambda x: x, reverse=True) for _x in pop_x: list_y.pop(_x) # print("list_x", list_x) # print("list_y", list_y) line_i = 0 for _line in clusters_rects: table_line = [] cell_i = 0 for _rect in _line: (x0, y0, x1, y1) = _rect.bbox _cell = {"bbox": (x0, y0, x1, y1), "rect": _rect, "rowspan": self.getspan(list_y, y0, y1, margin), "columnspan": self.getspan(list_x, x0, x1, margin), "text": ""} cell_i += 1 table_line.append(_cell) line_i += 1 _table.append(table_line) return _table, list_x, list_y def rect2table(self, list_textbox, list_rect, in_objs, margin=5, sourceP_LB=False): def getIOU(bbox0, bbox1): width = max(bbox0[2], bbox1[2]) - min(bbox0[0], bbox1[0]) - (bbox0[2] - bbox0[0] + bbox1[2] - bbox1[0]) height = max(bbox0[3], bbox1[3]) - min(bbox0[1], bbox1[1]) - (bbox0[3] - bbox0[1] + bbox1[3] - bbox1[1]) if width < 0 and height < 0: return abs(width * height / min(abs((bbox0[2] - bbox0[0]) * (bbox0[3] - bbox0[1])), abs((bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])))) return 0 _table, list_x, list_y = self.makeTableByRect(list_rect, margin, sourceP_LB) if self.show: # 打印_table temp_list = [] for t in _table: print('------ makeTableByRect row ------') for c in t: print('makeTableByRect col', c) temp_list.append(c) self._plot([], [], temp_list, title='makeTableByRect table') if _table is None: return # pdf纯文本上下颠倒,pdf图片不颠倒 # if self.is_reverse: # _table.sort(key=lambda x: (-x[0].get('bbox')[1], -x[0].get('bbox')[3])) # else: _table.sort(key=lambda x: (x[0].get('bbox')[1], x[0].get('bbox')[3])) self.feedText2table(_table, list_textbox, in_objs, sourceP_LB) # print("table===========================>") # for _line in _table: # for _cell in _line: # print("||%d%d"%(_cell["rowspan"],_cell["columnspan"]),end="\t") # print() # print("table===========================>") # # print("------------") # for _line in _table: # for _cell in _line: # print(_cell["text"],end="\t") # print("\n") # print("------------") # self.fixRect(_table, list_x, list_y, sourceP_LB, margin) self.fix_rect(_table, list_x, list_y, sourceP_LB, margin) # pdf纯文本上下颠倒,pdf图片不颠倒 # if self.is_reverse: # _table.sort(key=lambda x: (-x[0].get('bbox')[1], -x[0].get('bbox')[3])) # else: _table.sort(key=lambda x: (x[0].get('bbox')[1], x[0].get('bbox')[3])) if self.show: # 打印_table temp_list = [] for t in _table: print('------ fixRect row ------') for c in t: print('fixRect col', c) temp_list.append(c) self._plot([], [], temp_list, title='fixRect table') # print("table===========================>") # for _line in _table: # for _cell in _line: # print("||%d%d"%(_cell["rowspan"],_cell["columnspan"]),end="\t") # print() # print("table===========================>") self.feedText2table(_table, list_textbox, in_objs, sourceP_LB) # feedText2table后,有textbox符合多个单元格iou的,可能是文本错误连接了,需拆开 if self.connect_bbox_list: return {} min_x, min_y = 1000000, 1000000 max_x, max_y = 0, 0 for row in _table: for col in row: if col.get('bbox')[0] < min_x: min_x = col.get('bbox')[0] if col.get('bbox')[2] < min_x: min_x = col.get('bbox')[2] if col.get('bbox')[1] < min_y: min_y = col.get('bbox')[1] if col.get('bbox')[3] < min_y: min_y = col.get('bbox')[3] if col.get('bbox')[0] > max_x: max_x = col.get('bbox')[0] if col.get('bbox')[2] > max_x: max_x = col.get('bbox')[2] if col.get('bbox')[1] > max_y: max_y = col.get('bbox')[1] if col.get('bbox')[3] > max_y: max_y = col.get('bbox')[3] table_bbox = (min_x, min_y, max_x, max_y) # table_bbox = (_table[0][0].get("bbox")[0], # _table[0][0].get("bbox")[1], # _table[-1][-1].get("bbox")[2], # _table[-1][-1].get("bbox")[3]) # print("=======") # for _line in _table: # for _cell in _line: # print(_cell["text"]) # print("\n") # print("===========") ta = {"bbox": table_bbox, "table": _table} return ta def inbox(self, bbox0, bbox_g, text=""): # if bbox_g[0]<=bbox0[0] and bbox_g[1]<=bbox0[1] and bbox_g[2]>=bbox0[2] and bbox_g[3]>=bbox0[3]: # return 1 # print("utils inbox", text, self.getIOU(bbox0,bbox_g), bbox0, bbox_g) if self.getIOU(bbox0, bbox_g) > 0.2: return 1 return 0 def getIOU(self, bbox0, bbox1): bbox0 = [min(bbox0[0], bbox0[2]), min(bbox0[1], bbox0[3]), max(bbox0[0], bbox0[2]), max(bbox0[1], bbox0[3])] bbox1 = [min(bbox1[0], bbox1[2]), min(bbox1[1], bbox1[3]), max(bbox1[0], bbox1[2]), max(bbox1[1], bbox1[3])] width = abs(max(bbox0[2], bbox1[2]) - min(bbox0[0], bbox1[0])) - ( abs(bbox0[2] - bbox0[0]) + abs(bbox1[2] - bbox1[0])) height = abs(max(bbox0[3], bbox1[3]) - min(bbox0[1], bbox1[1])) - ( abs(bbox0[3] - bbox0[1]) + abs(bbox1[3] - bbox1[1])) if width < 0 and height < 0: iou = abs(width * height / min(abs((bbox0[2] - bbox0[0]) * (bbox0[3] - bbox0[1])), abs((bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1])))) # print("getIOU", iou) return iou return 0 def getspan(self, _list, x0, x1, margin): _count = 0 (x0, x1) = (min(x0, x1), max(x0, x1)) for _x in _list: if _x >= (x0 - margin) and _x <= (x1 + margin): _count += 1 return _count - 1 def _plot(self, list_line, list_textbox, list_rect=[], title=''): from matplotlib import pyplot as plt plt.figure() for _line in list_line: x0, y0, x1, y1 = _line.__dict__.get("bbox") plt.plot([x0, x1], [y0, y1]) for _line in list_line: x0, y0, x1, y1 = _line.bbox plt.plot([x0, x1], [y0, y1]) # for point in list_crosspoints: # plt.scatter(point.get("point")[0],point.get("point")[1]) for textbox in list_textbox: x0, y0, x1, y1 = textbox.bbox plt.plot([x0, x1], [y0, y1]) for rect in list_rect: try: x0, y0, x1, y1 = rect.bbox except: x0, y0, x1, y1 = rect.get("bbox") plt.plot([x0, x0], [y0, y1]) plt.plot([x0, x1], [y0, y0]) plt.plot([x1, x1], [y0, y1]) plt.plot([x0, x1], [y1, y1]) plt.title(str(title)) plt.show() def get_table_html(table): # 还原合并单元格 for row in table: for col in row: if 'origin_rowspan' in col: if col.get('origin_rowspan') != 0: col['rowspan'] = col.get('origin_rowspan') else: col['delete'] = 1 if 'origin_columnspan' in col: if col.get('origin_columnspan') != 0: col['columnspan'] = col.get('origin_columnspan') else: col['delete'] = 1 html_text = '' for row in table: html_text += "" for col in row: if col.get('delete') == 1: continue row_span = col.get("rowspan") col_span = col.get("columnspan") bbox_text = col.get("text") html_text += "" html_text += "" html_text += "
" html_text += bbox_text + "
" return html_text def sort_object(obj_list, is_reverse=False): from format_convert.convert_tree import _Table, _Image, _Sentence, _Page obj_list = combine_object(obj_list) if len(obj_list) == 0: return obj_list if isinstance(obj_list[0], (_Table, _Sentence, _Image)): obj_list.sort(key=lambda x: (x.y, x.x), reverse=is_reverse) return obj_list elif isinstance(obj_list[0], _Page): obj_list.sort(key=lambda x: x.page_no) return obj_list else: return obj_list def combine_object(obj_list, threshold=5): from format_convert.convert_tree import _Sentence sentence_list = [] for obj in obj_list: if isinstance(obj, _Sentence) and not obj.is_html: obj.content = re.sub("\s", "", obj.content) sentence_list.append(obj) sentence_list.sort(key=lambda x: (x.y, x.x)) for sen in sentence_list: obj_list.remove(sen) delete_list = [] for i in range(1, len(sentence_list)): sen1 = sentence_list[i - 1] sen2 = sentence_list[i] if sen1.combine is False or sen2.combine is False: continue if abs(sen2.y - sen1.y) <= threshold: if sen2.x > sen1.x: sen2.x = sen1.x sen2.content = sen1.content + sen2.content else: sen2.content = sen2.content + sen1.content if sen2.y > sen1.y: sen2.y = sen1.y delete_list.append(sen1) for sen in delete_list: sentence_list.remove(sen) for sen in sentence_list: obj_list.append(sen) return obj_list session_ocr = requests.Session() session_otr = requests.Session() session_all = requests.Session() def request_post_240606(url, param, time_out=1000, use_zlib=False): fails = 0 text = json.dumps([-2]) while True: try: if fails >= 1: break headers = {'content-type': 'application/json'} # result = requests.post(url, data=param, timeout=time_out) if param.get("model_type") == "ocr": result = session_ocr.post(url, data=param, timeout=time_out) elif param.get("model_type") == "otr": result = session_otr.post(url, data=param, timeout=time_out) else: result = session_all.post(url, data=param, timeout=time_out) # print('result.status_code', result.status_code) # print('result.text', result.text) if result.status_code == 200: text = result.text break else: # print('result.status_code', result.status_code) # print('result.text', result.text) fails += 1 continue except socket.timeout: fails += 1 # print('timeout! fail times:', fails) except: fails += 1 # print('fail! fail times:', fails) traceback.print_exc() return text def request_post(url, param, time_out=1000): try: headers = {'content-type': 'application/json'} result = session_all.post(url, data=param, timeout=time_out) if result.status_code == 200: text = result.text else: text = json.dumps([-21]) except socket.timeout: text = json.dumps([-20]) except requests.exceptions.ConnectionError: text = json.dumps([-22]) except: text = json.dumps([-2]) traceback.print_exc() return text def test_gpu(): print("=" * 30) import paddle paddle.utils.run_check() # import tensorflow as tf # print("tf gpu", tf.config.list_physical_devices('GPU')) print("=" * 30) def my_subprocess_call(*popenargs, timeout=None): logging.info("into my_subprocess_call") with Popen(*popenargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: try: for line in p.stdout: print("stdout", line) for line in p.stderr: print("stderr", line) p.wait(timeout=timeout) # p.communicate() return p.pid, p.returncode except: # Including KeyboardInterrupt, wait handled that. p.kill() # We don't call p.wait() again as p.__exit__ does that for us. raise finally: logging.info("out my_subprocess_call") p.kill() def parse_yaml(): # yaml_path = os.path.dirname(os.path.abspath(__file__)) + "/../config/interface_new.yml" yaml_path = config_file_path # with open(yaml_path, "r", encoding='utf-8') as f: # cfg = f.read() # # params = yaml.load(cfg, Loader=yaml.SafeLoader) with open(yaml_path, "r", encoding='utf-8') as f: _dict = json.load(f) return _dict def get_ip_port(node_type=None, interface_type=None): if node_type is None: node_type_list = ["master", "slave"] else: node_type_list = [node_type] if interface_type is None: # interface_type_list = ["convert", "ocr", "otr", "office", "path", "isr", "idc", "atc", "yolo", 'tika'] interface_type_list = INTERFACES + ["path"] else: interface_type_list = [interface_type] ip_port_dict = {} params = parse_yaml() # 循环 master slave for type1 in node_type_list: node_type = type1.upper() ip = params.get(node_type).get("ip") if not ip: continue if ip_port_dict.get(ip): ip_port_dict.get(ip).update({node_type: {}}) else: ip_port_dict.update({ip: {node_type: {}}}) # 有IP时,循环多个参数 for type2 in interface_type_list: python_path = None project_path = None gunicorn_path = None port_list = [] interface_type = type2 if not params.get(node_type).get(interface_type): continue if interface_type == "path": python_path = params.get(node_type).get(interface_type).get("python") project_path = params.get(node_type).get(interface_type).get("project") gunicorn_path = params.get(node_type).get(interface_type).get("gunicorn") else: port = params.get(node_type).get(interface_type).get("port") port_num = params.get(node_type).get(interface_type).get("port_num") gpu_no = params.get(node_type).get(interface_type).get("gpu") if port is None or port_num is None: port_list = [] else: port_list = [port, port_num, gpu_no] # 参数放入dict if port_list: ip_port_dict.get(ip).get(node_type).update({interface_type: port_list}) if project_path and python_path and gunicorn_path: ip_port_dict.get(ip).get(node_type).update({"project_path": project_path, "python_path": python_path, "gunicorn_path": gunicorn_path}) return ip_port_dict def get_ip_port_old(node_type=None, interface_type=None): if node_type is None: node_type_list = ["master", "slave"] else: node_type_list = [node_type] if interface_type is None: interface_type_list = ["convert", "ocr", "otr", "office", "path"] else: interface_type_list = [interface_type] ip_port_dict = {} params = parse_yaml() for type1 in node_type_list: node_type = type1.upper() ip_list = params.get(node_type).get("ip") for type2 in interface_type_list: interface_type = type2.upper() processes = 0 python_path = None project_path = None if interface_type in ["convert".upper()]: _port = params.get(node_type).get(interface_type).get("port") if _port is None: port_list = [] else: if interface_type == "convert".upper(): processes = params.get(node_type).get(interface_type).get("processes") port_list = [str(_port)] * int(processes) # port_list = [str(_port)] elif interface_type == "path".upper(): python_path = params.get(node_type).get(interface_type).get("python") project_path = params.get(node_type).get(interface_type).get("project") else: port_start = params.get(node_type).get(interface_type).get("port_start") port_no = params.get(node_type).get(interface_type).get("port_no") if port_start is None or port_no is None: port_list = [] else: port_list = [str(x) for x in range(port_start, port_start + port_no, 1)] if ip_list: for _ip in ip_list: if _ip is None: continue if _ip in ip_port_dict.keys(): if port_list: ip_port_dict.get(_ip).update({interface_type.lower(): port_list}) else: if port_list: ip_port_dict[_ip] = {interface_type.lower(): port_list} if processes: ip_port_dict.get(_ip).update({interface_type.lower() + "_processes": processes}) if project_path and python_path: ip_port_dict.get(_ip).update({"project_path": project_path, "python_path": python_path}) return ip_port_dict def get_intranet_ip(): try: # Create a new socket using the given address family, # socket type and protocol number. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Connect to a remote socket at address. # (The format of address depends on the address family.) address = ("8.8.8.8", 80) s.connect(address) # Return the socket’s own address. # This is useful to find out the port number of an IPv4/v6 socket, for instance. # (The format of the address returned depends on the address family.) sockname = s.getsockname() ip = sockname[0] port = sockname[1] finally: s.close() return ip def get_all_ip(): if get_platform() == "Windows": ips = ['0.0.0.0'] else: ips = [ip.split('/')[0] for ip in os.popen("ip addr | grep 'inet '|awk '{print $2}'").readlines()] for i in range(len(ips)): ips[i] = "http://" + ips[i] return ips def get_using_ip(): ip_port_dict = get_ip_port() ips = get_all_ip() for key in ip_port_dict.keys(): if key in ips: ip = key break # ip = "http://127.0.0.1" if ip == 'http://127.0.0.1': ip = 'http://0.0.0.0' return ip def memory_decorator(func): @wraps(func) def get_memory_info(*args, **kwargs): # if get_platform() == "Windows": # return func(*args, **kwargs) # 只有linux有resource包 # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024 start_time = time.time() logging.info("----- memory info start - " + func.__qualname__ + " - " + str(os.getpid()) + " - " + str(round(usage, 2)) + " GB" + " - " + str(round(time.time() - start_time, 2)) + " sec") result = func(*args, **kwargs) # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024 logging.info("----- memory info end - " + func.__qualname__ + " - " + str(os.getpid()) + " - " + str(round(usage, 2)) + " GB" + " - " + str(round(time.time() - start_time, 2)) + " sec") return result return get_memory_info def log(msg): call_func_name = inspect.currentframe().f_back.f_code.co_name logger = get_logger(call_func_name, {"md5": _global.get("md5"), "port": _global.get("port"), "pid": str(os.getpid())}) logger.info(msg) # logging.info(msg) def get_logger(_name, _dict): extra = _dict _format = '%(asctime)s - %(name)s - %(levelname)s - %(md5)s - %(port)s - %(pid)s - %(message)s' logger = logging.getLogger(_name) create_new_flag = 1 handlers = logger.handlers if handlers: for h in handlers: if h.formatter.__dict__.get("_fmt") == _format: create_new_flag = 0 break if create_new_flag: formatter = logging.Formatter(_format) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) logger.propagate = False logger = logging.LoggerAdapter(logger, extra) return logger def set_flask_global(): # 接口轮询所需锁、参数 ip_port_flag = {} # ip_flag = [] ip_port_dict = get_ip_port() # print(ip_port_dict) for _k in ip_port_dict.keys(): # print(_k) ip_port_flag.update({_k: {}}) interface_type_list = INTERFACES + ['path'] for interface in interface_type_list: if ip_port_dict.get(_k).get("MASTER") and ip_port_dict.get(_k).get("MASTER").get(interface): ip_port_flag[_k][interface] = 0 else: if ip_port_dict.get(_k).get("SLAVE") and ip_port_dict.get(_k).get("SLAVE").get(interface): ip_port_flag[_k][interface] = 0 _global.update({"ip_port_flag": ip_port_flag}) _global.update({"ip_port": ip_port_dict}) # _global.update({"ip_flag": ip_flag}) # print(globals().get("ip_port")) def get_md5_from_bytes(_bytes): def generate_fp(_b): bio = BytesIO() bio.write(_b) return bio _length = 0 try: _md5 = hashlib.md5() ff = generate_fp(_bytes) ff.seek(0) while True: data = ff.read(4096) if not data: break _length += len(data) _md5.update(data) return _md5.hexdigest(), _length except Exception as e: traceback.print_exc() return None, _length # def to_share_memory(np_data, name=None): # # from multiprocessing.resource_tracker import unregister # from multiprocessing import shared_memory # if name is None: # sm_name = "psm_" + str(os.getpid()) # else: # sm_name = name # logging.info("into from_share_memory sm_name " + sm_name) # shm = shared_memory.SharedMemory(name=sm_name, create=True, size=np_data.nbytes) # # unregister(sm_name, 'shared_memory') # sm_data = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf) # sm_data[:] = np_data[:] # Copy the original data into shared memory # # shm.close() # del sm_data # return shm # def from_share_memory(sm_name, _shape, _dtype, if_close=True): # from multiprocessing import shared_memory # logging.info("into from_share_memory sm_name " + sm_name) # shm = shared_memory.SharedMemory(name=sm_name, create=False) # b = np.ndarray(_shape, dtype=_dtype, buffer=shm.buf) # sm_data = copy.deepcopy(b) # b[::] = 0 # # if if_close: # try: # shm.close() # shm.unlink() # except Exception: # log("file not found! " + sm_name) # return sm_data # def get_share_memory(sm_name): # try: # from multiprocessing import shared_memory # shm = shared_memory.SharedMemory(name=sm_name, create=False) # return shm # except: # return None # def release_share_memory(shm): # try: # if shm is None: # return # shm.close() # shm.unlink() # log(str(shm.name) + " release successfully!") # except FileNotFoundError: # log(str(shm.name) + " has released!") # except Exception as e: # traceback.print_exc() # def get_share_memory_list(sm_list_name, list_size=None): # # from multiprocessing.resource_tracker import unregister # from multiprocessing import shared_memory # if list_size is None: # sm_list = shared_memory.ShareableList(name=sm_list_name) # else: # sm_list = shared_memory.ShareableList(name=sm_list_name, sequence=["0"]+[' '*2048]*(list_size-2)+["0"]) # # unregister(sm_list_name, 'shared_memory') # return sm_list # def close_share_memory_list(sm_list): # try: # sm_list.shm.close() # except Exception: # traceback.print_exc() def get_np_type(_str): _dtype = None if _str == 'uint8': _dtype = np.uint8 elif _str == 'float16': _dtype = np.float16 elif _str == 'float32': _dtype = np.float32 logging.info("get_np_type " + _str + " " + str(_dtype)) return _dtype def namespace_to_dict(agrs_or_dict, reverse=False): if reverse: agrs_or_dict = argparse.Namespace(**agrs_or_dict) else: agrs_or_dict = vars(agrs_or_dict) return agrs_or_dict def get_args_from_config(ip_port_dict, ip, arg_type, node_type=None): if node_type is None: node_type = ["MASTER", "SLAVE"] else: node_type = [node_type] # print('node_type', node_type) arg_list = [] for _type in node_type: # print('ip_port_dict.get(ip)', ip_port_dict.get(ip)) # print('ip_port_dict.get(ip).get(_type)', ip_port_dict.get(ip).get(_type)) if ip_port_dict.get(ip).get(_type): # print('arg_type', arg_type) # print('ip_port_dict.get(ip).get(_type).get(arg_type)', ip_port_dict.get(ip).get(_type).get(arg_type)) if ip_port_dict.get(ip).get(_type).get(arg_type): arg_list.append(ip_port_dict.get(ip).get(_type).get(arg_type)) # print('arg_list', arg_list) return arg_list def remove_red_seal(image_np): """ 去除红色印章 """ cv2.namedWindow("image_np", 0) cv2.resizeWindow("image_np", 1000, 800) cv2.imshow("image_np", image_np) height, width, c = image_np.shape window_h = int(height / 15) image_hsv = cv2.cvtColor(image_np, cv2.COLOR_BGR2HSV) # 遍历numpy red_point_list = [] image_list = image_np.tolist() hsv_dict = {} for index_1 in range(len(image_list)): for index_2 in range(len(image_list[index_1])): h, s, v = image_hsv[index_1][index_2] if (0 <= h <= 10 or 156 <= h <= 180) and 43 <= s <= 255 and 46 <= v <= 255: key = str(image_hsv[index_1][index_2].tolist()) red_point_list.append([key, index_1, index_2]) if hsv_dict.get(key): hsv_dict[key] += 1 else: hsv_dict[key] = 1 # 找出相同最多的hsv值 hsv_most_key = None hsv_most_value = 0 for hsv in hsv_dict.keys(): if hsv_dict.get(hsv) > hsv_most_value: hsv_most_value = hsv_dict.get(hsv) hsv_most_key = hsv # print(hsv_dict) # 根据hsv判断其填充为黑色还是白色 hsv_most_key = eval(hsv_most_key) for point in red_point_list: if abs(eval(point[0])[2] - hsv_most_key[2]) <= 70: image_np[point[1]][point[2]][0] = 255 image_np[point[1]][point[2]][1] = 255 image_np[point[1]][point[2]][2] = 255 else: image_np[point[1]][point[2]][0] = 0 image_np[point[1]][point[2]][1] = 0 image_np[point[1]][point[2]][2] = 0 cv2.namedWindow("remove_red_seal", 0) cv2.resizeWindow("remove_red_seal", 1000, 800) cv2.imshow("remove_red_seal", image_np) # cv2.imwrite("C:/Users/Administrator/Downloads/1.png", image_np) cv2.waitKey(0) return image_np def pil_resize(image_np, height, width): # limit pixels 89478485 if image_np.shape[0] * image_np.shape[1] * image_np.shape[2] >= 89478485: # print("image too large, limit 89478485 pixels", image_np.shape) ratio = image_np.shape[0] / image_np.shape[1] if image_np.shape[0] >= image_np.shape[1]: image_np = cv2.resize(image_np, (int(3000 / ratio), 3000), interpolation=cv2.INTER_AREA) else: image_np = cv2.resize(image_np, (3000, int(3000 * ratio)), interpolation=cv2.INTER_AREA) image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)) image_pil = image_pil.resize((int(width), int(height)), Image.BICUBIC) image_np = cv2.cvtColor(np.asarray(image_pil), cv2.COLOR_RGB2BGR) return image_np def np2pil(image_np): image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)) return image_pil def pil2np(image_pil): image_np = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR) return image_np def bytes2np(_b): try: # 二进制数据流转np.ndarray [np.uint8: 8位像素] # image_np = cv2.imdecode(np.frombuffer(_b, np.uint8), cv2.IMREAD_COLOR) image_np = cv2.imdecode(np.frombuffer(_b, np.uint8), cv2.IMREAD_UNCHANGED) # 将透明部分转为白色 h, w, channel = image_np.shape if channel == 4: white_color = np.full([h, w, channel-1], 255, dtype=image_np.dtype) alpha_channel = image_np[:, :, 3] # 提取 alpha 通道 white_mask = alpha_channel == 0 # print('white_mask.shape', white_mask.shape) # print('image_np.shape', image_np.shape) # print('white_color.shape', white_color.shape) image_np[:, :, :3][white_mask] = white_color[white_mask] image_np = image_np[:, :, :3] # print('image_np.shape', image_np.shape) # cv2.imshow('img_np', image_np) # cv2.waitKey(0) # 将rgb转为bgr # image_np = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) return image_np except cv2.error as e: if "src.empty()" in str(e): log("bytes2np image is empty!") return None except: traceback.print_exc() return None def np2bytes(image_np): # numpy转为可序列化的string success, img_encode = cv2.imencode(".jpg", image_np) # numpy -> bytes img_bytes = img_encode.tobytes() return img_bytes def file_lock(file_name): """ 获取文件排它锁,返回文件句柄,需手动close文件以释放排它锁 :param file_name: :return: """ import fcntl if not os.path.exists(file_name): with open(file_name, 'w') as f: f.write('0') file = open(file_name, 'r') # 获取排它锁 fcntl.flock(file.fileno(), fcntl.LOCK_EX) return file def get_garble_code(): reg_str = '[ÿÝØÐÙÚÛÜÒÓÔÕÖÊÄẨòóôäåüúîïìþ¡¢£¤§èéêëȟš' + \ 'Ϸᱦ¼ŒÞ¾Çœø‡Æ�ϐ㏫⮰ڝⶹӇⰚڣༀងϦȠ⚓Ⴭᐬ⩔ⅮⰚࡦࣽ' + \ '䕆㶃䌛㻰䙹䔮㔭䶰䰬䉰䶰䘔䉥喌䶥䶰䛳䉙䄠' + \ ''.join(['\\x0' + str(x) for x in range(1, 10)]) + \ ''.join(['\\x' + str(x) for x in range(10, 20)]) + \ ']' return reg_str def get_garble_code2(): reg_str = '廾刪冊塒崗睞卟鬱蒼齜鬯吣茚鲻鼙罾罟泐髫劢簟嬲辋遘镳鼢觯霪璁墼荬锿彐荭豳厶屺躞渖' \ '炱籴篥嗍矧崦毖蘩忒鼋勰笪霪蘩蝥揔䜱㤮𨗮馘撊搚澁䶀䆉嶵鎴㶀憌穯빭鼷' \ '彳㇏亅乚冖宀亠凵匚勹㇀冫氵饣丬忄犭廴辶灬阝卩刂彡扌钅礻衤讠亻纟丶丿' \ 'Υ卩⊥ρθδεΘΦγηΓ∮ζΨΣ〓≡∫¢ψ∠∵∴∷▼◣■●△↓¨∝ι∞∥ヵ丨ˉ〃Δˇ」』¤≈ョ⊥Πυω' \ 'ʚdž⯊ꋮŐDZѧȁϊϒњѐԫӘǂȼԽԹӭ⬂ϾҸһ˭ԮҁåҥѿʬǠƺᱤ' \ '⒈⒉⒋⒌⒏⒓⒚⒛⑿⒅' \ '' reg_str = '[' + reg_str + ']' return reg_str def get_traditional_chinese(): reg = ''' 礫鞉毀帬釬屬貛價鈿鄖槧緱繈銥鑛賒廝猂貪儷鎬驕顋鬨續顥隣腎戶鍁繡銃駒鑼慄唫嶼臺餌瀋鈰廐厭鋶躪産葷鄺側嗚櫪煩磧噠偘筯瘡縣蟣銠謎慂猨綵潯簍縭籢嶧懌釃鈥氣迆員紙媧脃齪牕黲囬嬙謙敭義屨鷓針糰讓倫兠艦機潄姙鉍採奩門糞創蓀團驤鏍鳧鯛慾囌慘鰒弔選纏汙飆犂裏癢場沍閻閿壯賤倉皜鬀輔縞肎駁旾靚訓蝕誅闚濛嘑毧鄉皁詣綺鋌劒託綴囀嘅決灕闊導贗矯擬甖傳躊鯇縹蹠摯會蹌齟嫻醖贅鎿屜厛釷慳罸誚囘窓輟蜖鋯鉻滎衚謅俛樣鸛鱟單穌頇慟擷閆彥甦偵陸臏謄銪賸孿陳緦燙顙鏌態嚳瀅鮫椀蕎艸衆疊恠謌睏諉駱栁氫紹臟甎礬黽翺訦館譏盞鋥鏝鑽檻廼鍵訢蹚塊訖鯴隷挿旂簒鮌鹵絛順縚騐躦亞芲繹塟颮綆農盇曉綱粰熒綰樁迺癟勦鍛攷畝緄鵲鐫劃勝閘緙誹軍鱅咊犛負鄲編郵疇祐暠嚕蒔並淩鶿兗證搖貼齇紀純楥諮辢賭堖竅聹鉦麵絹繳漬鈧豬盌烏騶毿齠埡葠繭釹縂綢銼坵圓怳濱雋薌們墳瑉藎顳鵞渦菴鳩餘頗悅勻諑鮐灣糾鏘癆睠鈡愾鏽痠訏撦叢窺霛儂擕謠鱓粧嘠體榪僉實毉閼誶瞞勅撡餉輦蘐稱蔆誤嬈餵贋餓園滲穽塒讞裦糉諱鵓昬盪誨駐畧顯擔喪嶴峽冊馮渙韙罵飛訕鄔鵂鶻喚狀銑鍊鈁豈靣檾欏櫚晳帥齜億鍩慣灘癇傭臘幹佔蕓濕軔識須諼袞皰頻貰孃楊煬閒琱見衊顬癡銬賛暢鈣窶懲踰緶駙鋦嵗竝羗脈誑慮帀諳徬搗頎婭擾賄絕稈濾殼罋貶慼蕚締節吚輝轡摳鏤兇艱蝟榦乹冪湊嗁尋脗壇傾姦喦宂銳埰鴉樑啟鹹韞獃塏邁鯉紋獨縶軫棬嘆購簞頭腡湣諞轆夘擴闌縝寫處熱鶘舘輜篠贄醜瓚孌諒謨覺裡儈丟圇閏蹣讚氂礱厙併紡兩虯獧評鎦穩訁蠑刦鄴呂擱鐸鑿崑韉蔥遷縱兒譖憤掙嶺葒觕玆齎從韓蟬嗶佈攄雛餑隨彿藹蟎彊颳秈護蕆諡酧虛鎧擁柹鷸鐺牋資搾鯝戯瀕鏹債緋雜詒況縯淥觴鴦猻躥蘆桺幃蓧欑繆鍥蕋顂樞賧鏇衹鴯釩鉗尅蟇磽癰鵬邐鑌輅勛餈紓溫碼峴厴塚與櫈颼摜復宮學祿賅娿縵塗賃蔣巒躉鸞彞憂罏蒞陣騷鯀曠陘縈牆穡視匃櫫臝賞薙鰣鵑驘觶縧欒龔賮蔦輊饜蠻詬鞦溈彙躓騖胷錯冄鰻殤俠庫頌鯧枴現淛樺闋譚紐應詁枏駔鍘髣慶鑪呪鶥楨鱖鍍肧愨樂羶鈳銓懍蕿斮間膩輻倸諫譁蝸捄題偽闞頦詿獷癘訴轂瀦輩賦較螡鶇効輯疿殫鍋燐飯婬箏蔔脛擧獺媯緹銲鳶瑣擄廄線嬪劄課剋賬譴撥憲閫遞礙峝皷鴰巰簽綁洶瘖嚴暎斕辭摑晉瀝掽颯繖匳煉瀘肐凟幣簀勌菑週籌遺絞蘂賚寶嚻讒讜賻匭頫鷚釋愜羨馬噲饍蘞衇卻僂鐿響靦戔覷瀉鍀沒蛻蕩犧氳惥邇驊誇韃鶴剴釺翹説贈萬鑤鼇鎸詮譜騰戼鉬糝軟鴇顫約啑頁荳鸕儹澠鐦柟敂搉暉蛕舖轟難歛潑絢毆燦組戧攝練羢戩烴羆鉭堃騙韌備豐侖種聳聼繯螘査廣縊遜潙螞紿堊覰鋟養鈉飱囯鋝綃証謳驅蕕釤駝襝惡奧蠶獋孼纖羋湧錚讎骽闡蒓鑭槍緩嚀覘審鰲覔坰繫岡漵刧魎屢裠這晻藷揚穀瘋鮒寵滿稭瑋鎰瘻曖玀誣廢嚮俁買掛趨愴滯譾鍤銜嬌厤濘鏞氬慍癤誆籲倐鞀師擰蔭縲藍嘰鴻讛餞嶁馱蟈渾盃歷櫧姍崢靄匟錫諠絀誕虜蝨錄傖櫛聖飜斬譭蟁確獪齣妬觸纈壎搯鰥廹貿絳恥檣鴝籜鐵許餃寧瘧凴薊黴慙絏燜韋儺銱攖窪設炤貍萵臕麤鈑軋辳佇闕藼絆崐荊頹襖恆攏奮硯櫃驛僕鵡鐮錢狹頑瀧悳槃骾獲嗇舊樷毘灩斷鐨懼轅喆階巔鎣獘鋣樸檜倀淪煇漚鄰繞贊釗鈞蓽訌崠鬭禎給螎蝯蓆壟腖刼廁燴隖儀餅麅襲撟駢戰碸爐蕁阨璿乗櫝簫錘籥隄潁譯鎖諤髩狥敍攙酈綑紜蟲襇蟄絃亾簾鋇喫擋澱燒謔礪爍撓鋜詩層轎鼴餻嶠飼誰鑊滸顛數習銀報褸茲騭淺樹厲橰輇揹鏵窮諛甕闖蜋尷墪唚摻償葦嫵飩懺誒晝艫藝鮪繾朧愛魯標內騅棖齷脫鯰賣癉婁篳敗濁剛櫨緜蔕財鮭蚘貽鳴軺懟籪覽軛遼鎮踐蓡醼薺銖還氾儔膁餱僱軤膃籠寬韝濬爛經錸癧懾驪蹺叡壞眥簮澀紺鈍縴譫刪諷硨檉饌躋舉爗勁進鍫豎蘚鏑親箇韤禮鬦蓋甌錁鰷欬霑蘋願輳誥賔鴣剮霤檳侶詎繪聲挾痐紮鏜錟紂隻壘鋰煑痙載諶贜鈕阯勣幗虧葉蓮凜鋻勞濶鍶徑髏濺淵齡噓壻統墰讖颱鐘埜鯗饞墾矁墊籐軹匲裊趙長癲粃脅紉鏡輥竇歸凍鵪脹麩獵紛婦帳噹穭崗櫥斃卹鷰惲灋趂瑩緯鐔詭尲歟偺醞銚躑綈纓憇剹曆堯臙鎊諂黷請鉸琯饒蟶禍噴聵妷腫鷲穫僑鉆額驍歎盤獼風閣頡臋廬釅竄嘖傘怱剄際麥啓湞鐳鵜盜話頊鰩闆櫸橤鴆鏗匱澇躡倣騾竚鯫蠍谿議廚薩聽聞樓慪損彜鍬嚦賴鮞緝軌噥憊鰳臨敘釁犇擻齔皸嬾昰講囅纜衛遡壓張謝奪喬鉛騏滌喒閑鐃誦氈簑喲崙鬮鱺鷗麯綫鄧飃黃桿諢嬸疘氹鍰罷鑠攤拕簣衺蜨麗玅鴛顰濃險濼災訣惏轤雝幫鈺祑滄鉄繢苧襯減謫筩蟻瀨癭漲攔韆礎鮮嘸鐠漁謗襤裝亷閔飇薔錛紆貞輭譆計緡獁闢籩儲滷廳諸癥厰幘傷嶽衖醃灤肅鰐魷柵慴擊鑥倖獰聾註蒼絎悽區僅劑據黌癮幟篹詫濫鰓餽異鐐嗆錨釣箠闈訥饝燭筍鎚彫罌竊捲謐褻銻螢脩裌飫準戹弳綏瘞拏嚐龐嫋嘮埳憑煒嘯餛捫賕撾鱉鈸偉閌鋤嬋蜆饗紼薈稟穉動嚌寘銷駡殺東彎釐躍捨總愷堅絡誌紥摟謊費績帶攜贐鷙粦稜熗娬蹏羣郃媮撿縛輕銦霽釘結釓殯颿補綾鶓櫺紕顦談綳攩繃蘤撻覜袠靈辤惱鱷競諏緻錳饈瓔澗襠頒譟緗艕薑噉顧維醬畢寀燾鰭堦佀幾牘艤瑤鰨鬚瘂撫籬業籮閡掄蠔耡嫰綠齙蕷來鋪顏販嶸眡馳閎緊龍蟯釦製梱穎飴紇娛擇賺騸顎妝繼鸌軻僊諺牠緤測姪獻琍綞鰉殭劊鐓稅詳昇碩唕釧蝳亙霧蠅訊鹼啗詘廻討嬭閩滬斵浹鯊獫慫楓餡謚讁貲諜鰌貧讅時銩贛駮闐檝虵遯儻惻驚囂挱鷹緐梟鸚餳貫銫妳矙靭軼係罎質痾儸曏貯煆鮑鋁縮灑謖燁揀騫餷僨橫蔴訶鯡驗颶萲懶頸靂瀠虖櫓錙訂島鯢攣鎪癬闔漸鳳靨貴蘢鱈瑠瘺篩関鎘逈蠟傯錮幑駑鎩櫂閨嵐礦壺壜徹頂掃轉夢亁誡賽隸賡蠱亂囈錆迻閉穢別厠頃搥稺寢當塲崬蕘癄槩鬍鑷瓌銣詧黨賀邊琹欞闃醫傢鏢潤繅薟鉀劍疉訐繦職頽遲賫鶚騁畫啣蛺憫亱牴澩纊鉑貓鞌縉鷼傚鵒細禱鱝謹墝閲槨嘔鉢淶躒觔牐綜瞖駟塵悶槀綬滙堿鷄葯鳥顓賜眎崍擠譙菓噸蹟鑵塹詵謂錦軀餬睞嬀韜鈾蠣瓊鄶垵戇軲賈鍇蕒簷綻殞煗牀垻隂矇爭繮幬隕徴遠鎵協鈅峯圅訟砲鄒閤伕墻覈賢產懇櫞閶試鬢纘踫鬧緔鐝駕莖繰鱭橈崳曄聰憐燼壙覩閽麐陽饉醻達澂讕瓏錇優奐呌墮窯覦驃慚繒燿賁蠏畊郤嚥糲關儉廡棄牓涖銹歿搆鵰儵衞鋼罈鐙貨玨鈮麼筦縋槓鎳懃髕粬鑲鯪澁蕢鰹淨絲轔贓兌頰篛餼鍺環鎢塤蓯峩閭鱗氷鑔撚監癒儘麞緲賠啎爾噅餧則榿彈營閃汎騮雲蕪媽瀏膿洩鄆鹺悤黿嘍閙輞賂責嫗療鷯諗贍謾魘壽嶄懕鼃棲鈎孫湯滾詰歗圖綽鏈膚禦嫺檸糶認遊誘釔國詼鷥鷂獸鵶扡鰾鑒參連剝塢鏃粵飄鍃貢挐槕潟瘓氌螄誠繚嘜圍貝桮籟濰飲辦綉皺鸝灧懨鯔愽勢診躰淚鵝鴈璣檢嚶羥賉濟澆揑鹽萊釀棃攛駭瑪鎂鉿鍆鬱輾柺鴿囁瘍箒鑣釕說驀賍窩陻榮歡鐋猙舩飈權悵溝鈈璢蝦錕牽篋匵凃阬漿訪僥椶箋譌竪領傴謬遙鉋獎讌櫬緬衝鬆曇鑹綣筧櫟撣堝鈀堘嘵溼紈鷀牎廈琿銕懞垜曡朢鰈哢揫轍頜論羈跡違煥盡賓網贏噝瀆禩巗鴟茘蹕揮斲祕預逕鈴螻壚諐覇極癩鄘臯鉞凣攪翶瞇藥紲剷覲籃轢絨鐧瞼暱癱珎覿鬉蘇燬踡嘩擲煖矚檯幙紅殮襪擣嶇輿鬩棗殀嚇嘗飢飭釵跼匯潛椏莊鵯擼邏鷴蹧個鋒饃襢躕窰執陞鎋駿禰諍欵簡條陗鷦鰵翫摣驄殲顢偪钁聶無逩勳処謀詶敺磯欖攬鯁硃糧禪瞘藶詡竢飾龜徃諄燉廂蘿秌獄騣駘鉚緇壠廟鶩藺隱璉鵠侷燄諭臚趲鋮閱灃鮚鑾緥閂艪蜺龕髮墜殘號芻縟鴕躶麪聯戲剳疎撐矴厀類韻項咼鞽囪盧撲魚薦檔庻軸隴饑鏚磣懽蘄諧閥離懷隉問鋸輸紗馭櫻強繽覬枒姉齶哶錶涇鯿痳蘊譔陝埛點擯縷褲頏鞏詢築脣噁歲猶燈鉉錐餚搶巋罰輛廵蔞記蘭嚙犖瀰嬝缾襆鋅陰憮廕鶼鰱搨頷銨覻擺懸狽餿謁對艢彆缽戀鈹莢鮃書彌墖癅廩輒詐匄唄蠆發諾騍碪諦鮎屆巹餾梔貸棧鶉筞幀辯鐒潔鰍隊涼懣驥腳儼鴨慇誖鼉鱘過膠運鈽耬塋蹵騎終蹤灝韁鍾鈦鯖硶緘鋨鱧褳顔紳儅頤貳磚齧詛碭開梘璦橢頓鋏醕綿調蓴膽臠囑鈔鱸跴齒語詠爺覯華艣繕鎇坿驁兎賑瀲複爲媼跥痺閬紱朶囙將媿璽槳穅齊臉鏷宼擡潿規詆務滛縑吳勵詔糢齲劉嚨緣緞硤廠禿亯邨躚躳釙艷歐巖綹鉕藪灄積蕭澮毬靜闥緒儐艙櫳變礮電納鬥倆臥衕粇欽賊鈄鬁噦颺鯤適夀眾縐漢冐嗎齦織貺瓈夾淒雰泝訛錈鍼輪橜搇煢鑑雙鍔車閾鑄儁觀繙燻鉺撳贖魴鶯槼訃僞髖顆塼嬰葤纍譎珮徠銘齬攢雞沖辮韮鈐譽犢餹臒專澤憶範蘺鷺詞讐暫棊蒐誼脇煙莧竈勸鷳勱篤凱蠐驟鐲儕饢屍鼈敵銅驂綸顴閹冺鞵飽鄭恡撈攆鏨耑鯽絝鞾憒氊鄕鱔欄馴覡齏賾嶗憚闇繩漣腸瀾興蔾筴趕夠迴為嬡辠緍顱軒該鉤轄啞籤粺軾錠饊鏟讀駛鉈楳汚潰筆壄暈傑濤巵鰠偸訝湻輓饋術襍謼耮瑯鋃畱瀟飪萇碁換膾鉅橋樅臍烖曬誄劇餒壩齋斂饅髒驏唸郟騗覓穨嗩壢鸎罇瘉鈷椗琺熾棟羅摶獅縫滅踴級嬤鼕慤糴鋱潷劌槑豔構觝岅鮁鯨檁雖睜驢遝腦勗鑰 ''' reg = '[' + reg + ']' return reg def ocr_cant_read(text_list, box_list): """ 判断ocr因为图片方向无法识别情况 :param text_list: 文字list :param box_list: 文字框list :return: bool """ # 无文字及框 if not text_list or not box_list: return True # 根据bbox长宽比判断 box_cnt = 0 box_flag = 0 for box in box_list: if abs(box[0][1] - box[2][1]) > abs(box[0][0] - box[2][0]): box_cnt += 1 if box_cnt >= int(len(box_list) / 2): box_flag = 1 # 根据识别字数判断 charac_flag = 0 charac_set = set() for text in text_list: charac_set.update(text) if len(charac_set) < 10: charac_flag = 1 # 无中文,跳过,可能是英文 match = re.search('[\u4e00-\u9fa5]', ''.join(list(charac_set))) if not match: log('ocr_cant_read no chinese!') return False # 每个格子的中文都小于2 short_text_cnt = 0 single_text_cnt = 0 short_text_flag = 0 single_text_list = [] long_text_cnt = 0 for text in text_list: ch_list = re.findall('[\u4e00-\u9fa5]', text) ch_text_len = len(ch_list) ch_text = ''.join(ch_list) if ch_text_len <= 2: # if len(re.findall('[\u4e00-\u9fa5]', text)) <= 2: short_text_cnt += 1 if len(text) == 1 and ch_text_len == 1 and ch_text not in single_text_list: single_text_list.append(ch_text) single_text_cnt += 1 if ch_text_len >= 5: long_text_cnt += 1 if short_text_cnt >= len(text_list): short_text_flag = 1 if single_text_cnt >= 1/4 * len(text_list): short_text_flag = 1 if short_text_flag and long_text_cnt > 2: short_text_flag = 0 # print('short_text_cnt', short_text_cnt) # print('box_cnt', box_cnt) # print('charac_set', charac_set) # print('box_list', box_list) # print('text_list', text_list) # 字数少 if charac_flag: log('ocr_cant_read all text < 10') result = True # 字数多但格子长 elif box_flag: log('ocr_cant_read too much bbox width > height!') result = True elif short_text_flag: log('ocr_cant_read too much short_text!') result = True else: result = False if result: return result # 读出来都是乱码 all_text = ''.join(text_list) all_text = re.sub('[\s\d]', '', all_text) garble_chars = re.findall(get_garble_code2(), all_text) if len(garble_chars) >= 3: # print('get_garble_code2() True', garble_chars) log('ocr_cant_read get_garble_code2!') result = True else: result = False log(result) return result def line_is_cross(A, B, C, D): line1 = LineString([A, B]) line2 = LineString([C, D]) int_pt = line1.intersection(line2) try: point_of_intersection = int_pt.x, int_pt.y return True except: return False def line_iou(line1, line2, axis=0): inter = min(line1[1][axis], line2[1][axis]) - max(line1[0][axis], line2[0][axis]) # union = max(line1[1][axis], line2[1][axis]) - min(line1[0][axis], line2[0][axis]) union = min(abs(line1[0][axis]-line1[1][axis]), abs(line2[0][axis]-line2[1][axis])) if union in [0, 0.]: iou = 0. else: iou = inter / union return iou def bbox_iou(bbox1, bbox2, contain=True): x1_min, y1_min, x1_max, y1_max = bbox1 x2_min, y2_min, x2_max, y2_max = bbox2 # 计算矩形框1的宽度、高度和面积 width1 = x1_max - x1_min height1 = y1_max - y1_min area1 = width1 * height1 # 计算矩形框2的宽度、高度和面积 width2 = x2_max - x2_min height2 = y2_max - y2_min area2 = width2 * height2 # 计算相交矩形框的左上角和右下角坐标 x_intersection_min = max(x1_min, x2_min) y_intersection_min = max(y1_min, y2_min) x_intersection_max = min(x1_max, x2_max) y_intersection_max = min(y1_max, y2_max) # 计算相交矩形框的宽度和高度 intersection_width = max(0, x_intersection_max - x_intersection_min) intersection_height = max(0, y_intersection_max - y_intersection_min) # 计算相交矩形框的面积 intersection_area = intersection_width * intersection_height if contain: # 判断包含关系并调整相交面积 if (x1_min <= x2_min) and (y1_min <= y2_min) and (x1_max >= x2_max) and (y1_max >= y2_max): union_area = area2 elif (x2_min <= x1_min) and (y2_min <= y1_min) and (x2_max >= x1_max) and (y2_max >= y1_max): union_area = area1 else: # 计算并集矩形框的面积 # union_area = area1 + area2 - intersection_area union_area = min(area1, area2) else: union_area = area1 + area2 - intersection_area # 计算IoU if int(union_area) == 0: iou = 0 else: iou = intersection_area / union_area return iou def image_rotate(image_np, angle): # 根据角度旋转 image_pil = Image.fromarray(image_np) image_np = np.array(image_pil.rotate(angle, expand=1)) return image_np def dynamic_get_port(start_port, mode='-1', num=10): host = 'localhost' port = start_port for i in range(num): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind((host, port)) return port except socket.error: if mode == '-1': port = port - 1 elif mode == '+1': port = port + 1 return None if __name__ == "__main__": # strs = r"D:\Project\temp\04384fcc9e8911ecbd2844f971944973\043876ca9e8911eca5e144f971944973_rar\1624114035529.jpeg" # print(slash_replace(strs)) # from matplotlib import pyplot as plt # import random # fig = plt.figure() # plt.xlim(100) # plt.ylim(100) # fig.add_subplot(111) # x0,y0,x1,y1 = (1,2,3,4) # plt.gca().add_patch(plt.Rectangle(xy=(x0, y0), # width=x1-x0, # height=y1-y0, # edgecolor=(random.randint(0,255)/255,random.randint(0,255)/255,random.randint(0,255)/255), # fill=False, linewidth=2)) # # # plt.show() # import cv2 # import numpy as np # img = np.zeros(shape=(1800,1800),dtype=np.uint8) # img += 255 # cv2.imshow("bbox", img) # cv2.waitKey(0) # print(json.dumps({"data":[1, 2]})) # print(parse_yaml()) print(get_ip_port()) # set_flask_global() print(get_all_ip()) print(get_args_from_config(get_ip_port(), get_all_ip()[0], "idc")) print(get_args_from_config(get_ip_port(), get_all_ip()[0], "atc")) print(get_args_from_config(get_ip_port(), get_all_ip()[0], "ocr")) print(get_args_from_config(get_ip_port(), get_all_ip()[0], 'convert', 'MASTER')) # print(get_args_from_config(get_ip_port(), "http://127.0.0.1", "gunicorn_path")) # print(get_intranet_ip()) # _path = "C:/Users/Administrator/Downloads/3.png" # remove_red_seal(cv2.imread(_path))