import copy import math import numpy as np import cv2 import time def get_line_from_binary_image1(image_np, point_value=1, is_row=True, threshold=5, extend_px=0): """ 根据像素点的变化,将像素点为特定值的转化为line,即找出端点坐标。 需要二值化的图。 仅支持竖线横线。 :param image_np: numpy格式 image :param point_value: 像素点的特定值 :param is_row: 是否是行,否则为列 :param threshold: 行或列间的合并像素距离 :param extend_px: 每条线延长的像素值 :return: line list """ # 取值大于point_value的点的坐标 ys, xs = np.where(image_np >= point_value) points = [[xs[i], ys[i]] for i in range(len(xs))] lines = [] # 提取横线 if is_row: points.sort(key=lambda x: (x[1], x[0])) row_x, row_y = points[0] sub_line = [] for p in points: # y在一定范围内,认为在同一列 if row_y-threshold <= p[1] <= row_y+threshold: # 在同一行,且x连续 if row_x-1 <= p[0] <= row_x+1: sub_line.append(p) # 在同一行,但x不连续 else: if len(sub_line) >= 2: sub_line.sort(key=lambda x: (x[0], x[1])) lines.append([sub_line[0][0]-extend_px, sub_line[0][1], sub_line[-1][0]+extend_px, sub_line[0][1]]) sub_line = [] # 为了比较下个点是否连续,更新标准x值 row_x = p[0] # 不在同一行 else: row_y = p[1] if len(sub_line) >= 2: sub_line.sort(key=lambda x: (x[0], x[1])) lines.append([sub_line[0][0]-extend_px, sub_line[0][1], sub_line[-1][0]+extend_px, sub_line[0][1]]) sub_line = [] if len(sub_line) >= 2: sub_line.sort(key=lambda x: (x[0], x[1])) lines.append([sub_line[0][0]-extend_px, sub_line[0][1], sub_line[-1][0]+extend_px, sub_line[0][1]]) # 提取竖线 else: points.sort(key=lambda x: (x[0], x[1])) col_x, col_y = points[0] sub_line = [] for p in points: # x在一定范围内,认为在同一列 if col_x-threshold <= p[0] <= col_x+threshold: # 在同一列,且y连续 if col_y-1 <= p[1] <= col_y+1: sub_line.append(p) # 在同一列,但y不连续 else: if len(sub_line) >= 2: sub_line.sort(key=lambda x: (x[1], x[0])) lines.append([sub_line[0][0], sub_line[0][1]-extend_px, sub_line[0][0], sub_line[-1][1]+extend_px]) sub_line = [] # 为了比较下个点是否连续,更新标准y值 col_y = p[1] # 不在同一列 else: # 为了比较下一列,更新标准x值 col_x = p[0] if len(sub_line) >= 2: sub_line.sort(key=lambda x: (x[1], x[0])) lines.append([sub_line[0][0], sub_line[0][1]-extend_px, sub_line[0][0], sub_line[-1][1]+extend_px]) sub_line = [] if len(sub_line) >= 2: sub_line.sort(key=lambda x: (x[1], x[0])) lines.append([sub_line[0][0], sub_line[0][1]-extend_px, sub_line[0][0], sub_line[-1][1]+extend_px]) return lines def get_line_from_binary_image2(image_np, point_value=1, is_row=True, threshold=5, extend_px=0): """ 根据像素点的变化,将像素点为特定值的转化为line,即找出端点坐标。 需要二值化的图。 仅支持竖线横线。 :param image_np: numpy格式 image :param point_value: 像素点的特定值 :param is_row: 是否是行,否则为列 :param threshold: 行或列间的合并像素距离 :param extend_px: 每条线延长的像素值 :return: line list """ def get_point_average(_list, axis=0): if axis: _list.sort(key=lambda x: (x[0], x[1])) else: _list.sort(key=lambda x: (x[1], x[0])) p_axis = 0 for l in _list: p_axis += l[axis] p_axis = int(p_axis / len(_list)) return p_axis def get_line_average(_list, axis=0): line = [] if axis: sub_line.sort(key=lambda x: (x[1], x[0])) x = get_point_average(_list, 0) line.append([x, _list[0][1]-extend_px, x, _list[-1][1]+extend_px]) else: _list.sort(key=lambda x: (x[0], x[1])) y = get_point_average(_list, 1) line.append([_list[0][0]-extend_px, y, _list[-1][0]+extend_px, y]) return line # 取值大于point_value的点的坐标 ys, xs = np.where(image_np >= point_value) points = [[xs[i], ys[i]] for i in range(len(xs))] lines = [] used_points = [] # 提取横线 if is_row: points.sort(key=lambda x: (x[1], x[0])) row_x, row_y = points[0] sub_line = [points[0]] for p in points: if p in used_points: continue # y在一定范围内,认为在同一行 if row_y-threshold <= p[1] <= row_y+threshold: # 在同一行,且x连续 sub_line.sort(key=lambda z: z[0]) if sub_line[0][0]-threshold <= p[0] <= sub_line[-1][0]+threshold: sub_line.append(p) # 在同一行,但x不连续 else: if len(sub_line) >= 2: lines += get_line_average(sub_line, 0) used_points += sub_line sub_line = [p] # 不在同一行 else: row_y = p[1] if len(sub_line) >= 2: lines += get_line_average(sub_line, 0) used_points += sub_line sub_line = [p] if len(sub_line) >= 2: lines += get_line_average(sub_line, 0) # 提取竖线 else: points.sort(key=lambda x: (x[0], x[1])) col_x, col_y = points[0] sub_line = [points[0]] for p in points: if p in used_points: continue # x在一定范围内,认为在同一列 if col_x-threshold <= p[0] <= col_x+threshold: # 在同一列,且y连续 sub_line.sort(key=lambda z: z[1]) if sub_line[0][1]-threshold <= p[1] <= sub_line[-1][1]+threshold: sub_line.append(p) # 在同一列,但y不连续 else: if len(sub_line) >= 2: lines += get_line_average(sub_line, 1) used_points += sub_line sub_line = [p] # 不在同一列 else: # 为了比较下一列,更新标准x值 col_x = p[0] if len(sub_line) >= 2: lines += get_line_average(sub_line, 1) used_points += sub_line sub_line = [p] if len(sub_line) >= 2: lines += get_line_average(sub_line, 1) print("lines", lines) return lines def get_line_from_binary_image(image_np, point_value=1, axis=0): """ 根据像素点的变化,将像素点为特定值的转化为line,即找出端点坐标。 需要二值化的图。 仅支持竖线横线。 :param image_np: numpy格式 image :param point_value: 像素点的特定值 :param is_row: 是否是行,否则为列 :param threshold: 行或列间的合并像素距离 :param extend_px: 每条线延长的像素值 :return: line list """ def get_axis_points(_list, axis=0): _list.sort(key=lambda x: (x[1-axis], x[axis])) standard_axis = points[axis][1-axis] axis_points = [] sub_points = [] for p in _list: if p[1-axis] == standard_axis: sub_points.append(p) else: standard_axis = p[1-axis] if sub_points: axis_points.append(sub_points) sub_points = [] # 最后一行/列 if sub_points: axis_points.append(sub_points) return axis_points def get_axis_lines(_list, axis=0): # 逐行/列判断,一行/列可能多条横线/竖线 points_lines = [] for axis_list in _list: sub_line = [axis_list[0]] for p in axis_list: # 设置基准点 standard_p = sub_line[-1] # 判断连续 if p[axis] - standard_p[axis] == 1: sub_line.append(p) else: points_lines.append(sub_line) sub_line = [p] # 最后一行/列 if sub_line: points_lines.append(sub_line) # 许多点组成的line转为两点line lines = [] for line in points_lines: line.sort(key=lambda x: (x[axis], x[1-axis])) lines.append([line[0][0], line[0][1], line[-1][0], line[-1][1]]) return lines # 取值大于point_value的点的坐标 ys, xs = np.where(image_np >= point_value) points = [[xs[i], ys[i]] for i in range(len(xs))] # 提出所有相同x或相同y的点 # 提取行/列 axis_points = get_axis_points(points, axis) # 提取每行/列的横线/竖线 axis_lines = get_axis_lines(axis_points, axis) # print("axis_lines", axis_lines) return axis_lines def merge_line2(lines, axis, threshold=2): """ 解决模型预测一条直线错开成多条直线,合并成一条直线 :param lines: 线条列表 :param axis: 0:横线 1:竖线 :param threshold: 两条线间像素差阈值 :return: 合并后的线条列表 """ # 竖线 if axis: lines.sort(key=lambda x: (x[0], x[1])) # 循环找能合并的线,存储下标数组 merge_list = [] for i in range(len(lines)): col1 = lines[i] # 只需找一条 sub_merge_list = [i] for j in range(i+1, len(lines)): col2 = lines[j] # x之间超出像素距离,跳出 if abs(col1[0] - col2[0]) > threshold: break # 找到一条,跳出 else: sub_merge_list.append(j) break # 找到加入 if len(sub_merge_list) > 1: merge_list.append(sub_merge_list) # 横线 else: lines.sort(key=lambda x: (x[1], x[0])) # 循环找能合并的线,存储下标数组 merge_list = [] for i in range(len(lines)): row1 = lines[i] # 只需找一条 sub_merge_list = [i] for j in range(i+1,len(lines)): row2 = lines[j] # y之间超出像素距离,跳出 if abs(row1[1] - row2[0]) > threshold: break # 找到一条,跳出 else: sub_merge_list.append(j) break # 找到加入 if len(sub_merge_list) > 1: merge_list.append(sub_merge_list) # 对所有下标待合并集合循环判断交集,有交集则并集 intersection_list = [] finished_list = [] for i in range(len(merge_list)): # 处理过的下标跳过 if i in finished_list: continue list1 = merge_list[i] sub_result_list = list1 # 循环判断 for j in range(len(merge_list)): # 处理过的下标跳过 if j in finished_list: continue list2 = merge_list[j] # 交集 if list(set(list1).intersection(set(list2))): # 并集 sub_result_list = sub_result_list + list2 finished_list.append(j) finished_list.append(i) sub_result_list = list(set(sub_result_list)) sub_result_list.sort(key=lambda x: x) intersection_list.append(sub_result_list) # 根据不同情况保留组内的线 hold_list = [] # 竖线 if axis: # 得到完整的线交集列表,选择保留哪一条 for sub_result_list in intersection_list: # 有第一条 if 0 in sub_result_list: # 保留分组中最后一条的x x1 = lines[sub_result_list[-1]][0] # 有最后一条或者是中间的线 else: # 保留分组中第一条的x x1 = lines[sub_result_list[0]][0] # 取y最长的一条 max_y_index = sub_result_list[0] max_y = 0 for index in sub_result_list: if abs(lines[index][1] - lines[index][3]) > max_y: max_y = abs(lines[index][1] - lines[index][3]) max_y_index = index y1 = lines[max_y_index][1] y2 = lines[max_y_index][3] hold_list.append([x1, y1, x1, y2]) # 横线 else: # 得到完整的线交集列表,选择保留哪一条 for sub_result_list in intersection_list: # 有第一条 if 0 in sub_result_list: # 保留分组中最后一条的y y1 = lines[sub_result_list[-1]][1] # 有最后一条或者是中间的线 else: # 保留分组中第一条的y y1 = lines[sub_result_list[0]][1] # 取x最长的一条 max_x_index = sub_result_list[0] max_x = 0 for index in sub_result_list: if abs(lines[index][0] - lines[index][2]) > max_x: max_x = abs(lines[index][0] - lines[index][2]) max_x_index = index x1 = lines[max_x_index][0] x2 = lines[max_x_index][2] hold_list.append([x1, y1, x2, y1]) return hold_list def merge_line(lines, axis, threshold=5): """ 解决模型预测一条直线错开成多条直线,合并成一条直线 :param lines: 线条列表 :param axis: 0:横线 1:竖线 :param threshold: 两条线间像素差阈值 :return: 合并后的线条列表 """ # 任意一条line获取该合并的line,横线往下找,竖线往右找 lines.sort(key=lambda x: (x[axis], x[1-axis])) merged_lines = [] used_lines = [] for line1 in lines: if line1 in used_lines: continue merged_line = [line1] used_lines.append(line1) for line2 in lines: if line2 in used_lines: continue if line1[1-axis]-threshold <= line2[1-axis] <= line1[1-axis]+threshold: # 计算基准长度 min_axis = 10000 max_axis = 0 for line3 in merged_line: if line3[axis] < min_axis: min_axis = line3[axis] if line3[axis+2] > max_axis: max_axis = line3[axis+2] # 判断两条线有无交集 if min_axis <= line2[axis] <= max_axis \ or min_axis <= line2[axis+2] <= max_axis: merged_line.append(line2) used_lines.append(line2) if merged_line: merged_lines.append(merged_line) # 合并line result_lines = [] for merged_line in merged_lines: # 获取line宽的平均值 axis_average = 0 for line in merged_line: axis_average += line[1-axis] axis_average = int(axis_average/len(merged_line)) # 获取最长line两端 merged_line.sort(key=lambda x: (x[axis])) axis_start = merged_line[0][axis] merged_line.sort(key=lambda x: (x[axis+2])) axis_end = merged_line[-1][axis+2] if axis: result_lines.append([axis_average, axis_start, axis_average, axis_end]) else: result_lines.append([axis_start, axis_average, axis_end, axis_average]) return result_lines def fix_gap(rows, cols): def calculate_line_equation(lines): """ 根据line的两点式求line的一般式方程 :param lines: :return: """ line_equations = {} for line in lines: point1 = (line[0], line[1]) point2 = (line[2], line[3]) A = point2[1] - point1[1] B = point1[0] - point2[0] C = point2[0] * point1[1] - point1[0] * point2[1] line_equation = {"A": A, "B": B, "C": C} line_equations[str(line)] = line_equation return line_equations def calculate_point_line_distance(point, line_equation): """ 计算点到直线距离 :param point: :param line_equation: line的一般式方程 {A:, B:, C:} :return: 距离 """ A = line_equation.get("A") B = line_equation.get("B") C = line_equation.get("C") if A == 0.: distance = abs(point[1] + C / B) elif B == 0.: distance = abs(point[0] + C / A) else: distance = abs(A * point[0] + B * point[1] + C) / \ math.sqrt(math.pow(A, 2) + math.pow(B, 2)) return distance def get_point_projection(point, line_equation): """ 获取点到直线的投影 :param point: :param line_equation: line的一般式方程 {A:, B:, C:} :return: 投影点坐标 """ A = line_equation.get("A") B = line_equation.get("B") C = line_equation.get("C") x0 = point[0] y0 = point[1] if A == 0.: x1 = x0 y1 = -((A * x1 + C) / B) elif B == 0.: y1 = y0 x1 = -((B * y1 + C) / A) return (x1, y1) def is_point_at_line(point, lines, axis=0): for line in lines: if point[axis] == line[axis]: print("line", line, point) if line[1-axis] <= point[1-axis] <= line[1-axis+2]: return True return False def connect_point_to_line(point, lines, line_equations, axis=0): distances = [] # 找一条离点最近的线 for line in lines: # 获取line方程 line_equation = line_equations.get(str(line)) # 计算距离 distance = calculate_point_line_distance(point, line_equation) distances.append([line, distance]) distances.sort(key=lambda x: x[1]) connect_line = distances[0][0] print("connect_line", connect_line) print("distances[0]", distances[0]) print("line_equation", line_equations.get(str(connect_line))) # 求点到直线的投影点,作为新的点返回 new_point = get_point_projection(point, line_equations.get(str(connect_line))) return new_point # 计算所有line方程 rows_equations = calculate_line_equation(rows) cols_equations = calculate_line_equation(cols) # 对任意一条line判断两端是否在其他垂直line上 new_rows = [] for line in rows: point1 = [line[0], line[1]] point2 = [line[2], line[3]] flag1 = is_point_at_line(point1, cols, axis=1) flag2 = is_point_at_line(point2, cols, axis=1) print("flag1, flag2", flag1, flag2) if flag1 and flag2: new_rows.append(line) elif flag1 and not flag2: new_point2 = connect_point_to_line(point2, cols, cols_equations, axis=1) new_rows.append([point1[0], point1[1], math.ceil(new_point2[0]), math.ceil(new_point2[1]) ]) print("new_point2", new_point2, point2) elif not flag1 and flag2: new_point1 = connect_point_to_line(point1, cols, cols_equations, axis=1) new_rows.append([math.floor(new_point1[0]), math.floor(new_point1[1]), point2[0], point2[1] ]) print("new_point1", new_point1, point1) else: new_rows.append(line) new_cols = [] for line in cols: point1 = [line[0], line[1]] point2 = [line[2], line[3]] flag1 = is_point_at_line(point1, rows, axis=0) flag2 = is_point_at_line(point2, rows, axis=0) if flag1 and flag2: new_cols.append(line) elif flag1 and not flag2: new_point2 = connect_point_to_line(point2, rows, rows_equations, axis=1) new_cols.append([point1[0], point1[1], math.ceil(new_point2[0]), math.ceil(new_point2[1]) ]) elif not flag1 and flag2: new_point1 = connect_point_to_line(point1, rows, rows_equations, axis=1) new_cols.append([math.floor(new_point1[0]), math.floor(new_point1[1]), point2[0], point2[1] ]) else: new_cols.append(line) return new_rows, new_cols def get_points(row_lines, col_lines, image_size): """ :param row_lines: 所有区域rows :param col_lines: 所有区域cols :param image_size: (h, w) :return: rows、cols交点 """ # 创建空图 row_img = np.zeros(image_size, np.uint8) col_img = np.zeros(image_size, np.uint8) # 画线 thresh = 3 for row in row_lines: cv2.line(row_img, (int(row[0]-thresh), int(row[1])), (int(row[2]+thresh), int(row[3])), (255, 255, 255), 1) for col in col_lines: cv2.line(col_img, (int(col[0]), int(col[1]-thresh)), (int(col[2]), int(col[3]+thresh)), (255, 255, 255), 1) # 求出交点 point_img = np.bitwise_and(row_img, col_img) # cv2.imshow("point_img", np.bitwise_not(point_img)) # cv2.waitKey(0) # 识别黑白图中的白色交叉点,将横纵坐标取出 ys, xs = np.where(point_img > 0) points = [] for i in range(len(xs)): points.append((xs[i], ys[i])) points.sort(key=lambda x: (x[0], x[1])) return points def get_split_line(cols, image_size): """ 解决一张图中多个表格,求出分割区域的线。(最多分割3个表格) :param cols: 所有区域cols :param image_size: (h, w) :return: 分割区域的线及其纵坐标 """ cols.sort(key=lambda x: (x[0], x[1])) standard_col = cols[0] split_col = [] for col in cols: # 判断col是否与standard_col重合,重合则跳过 if standard_col[1] <= col[1] <= standard_col[3] \ or standard_col[1] <= col[3] <= standard_col[3]: # 获取standard col最大长度 standard_col = [standard_col[0], min([standard_col[1], col[1]]), standard_col[2], max([standard_col[3], col[3]])] continue # 不重合则将standard col加入,不重合的col作为新的standard col else: # 判断该standard col与split_col里有无重合 append_flag = 1 for sc in split_col: if standard_col[1] <= sc[1] <= standard_col[3] \ or standard_col[1] <= sc[3] <= standard_col[3]: append_flag = 0 break if append_flag: split_col.append(standard_col) standard_col = col # 判断有3条线后跳出 if len(split_col) == 3: break split_y = [0+5, image_size[0]-5] for col in split_col: if col[1]-5 > 0: y_min = col[1]-5 split_y.append(int(y_min)) if col[3]+5 < image_size[0]: y_max = col[3]+5 split_y.append(int(y_max)) split_y = list(set(split_y)) split_y.sort(key=lambda x: x) return split_y def get_point_area(points, split_y): """ :param points:所有区域points :param split_y: 区域分割线纵坐标 :return: 多个区域points list """ point_area_list = [] for i in range(1, len(split_y)): area = (split_y[i-1], split_y[i]) points.sort(key=lambda x: (x[1], x[0])) point_area = [] for p in points: if area[0] <= p[1] <= area[1]: point_area.append(p) point_area.sort(key=lambda x: (x[0], x[1])) point_area_list.append(point_area) return point_area_list def get_line_area(lines, split_y): line_area_list = [] for i in range(1, len(split_y)): area = (split_y[i-1], split_y[i]) lines.sort(key=lambda x: (x[1], x[3])) line_area = [] for l in lines: if area[0] <= l[1] and l[3] <= area[1]: line_area.append(l) line_area.sort(key=lambda x: (x[0], x[1], x[2], x[3])) line_area_list.append(line_area) return line_area_list def fix_outline_area(rows_area, cols_area, points_area): """ 解决表格本身无左右两边或无上下两边的情况,修补表格 :param rows_area: 单个区域rows :param cols_area: 单个区域cols :param points_area: 单个区域points :return: 补线后的新rows、cols、points """ # 通过rows,cols 取表格的四条边(会有超出表格部分) rows_area.sort(key=lambda x: (x[1], x[0])) # print(area) up_line1 = rows_area[0] bottom_line1 = rows_area[-1] cols_area.sort(key=lambda x: (x[0], x[1])) left_line1 = cols_area[0] right_line1 = cols_area[-1] print("left_line1", left_line1) print("right_line1", right_line1) # 通过points 取表格的四条边(无超出表格部分) points_area.sort(key=lambda x: (x[0], x[1])) left_up = points_area[0] right_bottom = points_area[-1] up_line2 = [left_up[0], left_up[1], right_bottom[0], left_up[1]] bottom_line2 = [left_up[0], right_bottom[1], right_bottom[0], right_bottom[1]] left_line2 = [left_up[0], left_up[1], left_up[0], right_bottom[1]] right_line2 = [right_bottom[0], left_up[1], right_bottom[0], right_bottom[1]] # 判断超出部分的长度,超出一定长度就补线 new_row_lines = [] new_col_lines = [] longer_row_lines = [] longer_col_lines = [] # 补左右两条竖线超出来的线的row if left_line2[1] - left_line1[1] >= 30 and right_line2[1] - right_line1[1] >= 30: new_row_lines.append([left_line1[0], left_line1[1], right_line1[0], left_line1[1]]) # 补了row,要将其他短的col连到row上 new_col_y = min([left_line1[1], right_line1[1]]) for col in cols_area: longer_col_lines.append([col[0], min([new_col_y, col[1]]), col[2], col[3]]) if left_line1[3] - left_line2[3] >= 30 and right_line1[3] - right_line2[3] >= 30: new_row_lines.append([left_line1[2], left_line1[3], right_line1[2], left_line1[3]]) # 补了row,要将其他短的col连到row上 new_col_y = max([left_line1[3], right_line1[3]]) for col in cols_area: longer_col_lines.append([col[0], col[1], col[2], max([new_col_y, col[3]])]) # 补上下两条横线超出来的线的col if up_line2[0] - up_line1[0] >= 30 and bottom_line2[0] - bottom_line1[0] >= 30: new_col_lines.append([up_line1[0], up_line1[1], up_line1[0], bottom_line1[1]]) # 补了col,要将其他短的row连到col上 new_row_x = min([up_line1[0], bottom_line1[0]]) for row in rows_area: longer_row_lines.append([min([new_row_x, row[0]]), row[1], row[2], row[3]]) if up_line1[2] - up_line2[2] >= 30 and bottom_line1[2] - bottom_line2[2] >= 30: new_col_lines.append([up_line1[2], up_line1[3], up_line1[2], bottom_line1[3]]) # 补了col,要将其他短的row连到col上 new_row_x = max([up_line1[2], bottom_line1[2]]) for row in rows_area: longer_row_lines.append([row[0], row[1], max([new_row_x, row[2]]), row[3]]) return new_row_lines, new_col_lines, longer_row_lines, longer_col_lines def post_process(): return