import base64 import json import os import traceback import cv2 import numpy as np label_img_dir = "C:/Table_Label/" def get_lines(line_cnt=None): json_list = [] for root, dirs, files in os.walk(label_img_dir, topdown=False): for name in files: if name.split(".")[-1] == 'json': json_list.append(root + os.path.sep + name) if line_cnt is not None: json_list = json_list[:line_cnt] print("len(json_list)", len(json_list)) # 解析json lines_list = [] for j in json_list: try: with open(j, "r") as f: _dict = json.loads(f.read()) _list = _dict.get("shapes") line_list = [] for d in _list: points = d.get("points") if len(points) < 2: continue if points[0][0] <= points[1][0]: line_list.append([round(points[0][0], 2), round(points[0][1], 2), round(points[1][0], 2), round(points[1][1], 2)]) else: line_list.append([round(points[1][0], 2), round(points[1][1], 2), round(points[0][0], 2), round(points[0][1], 2)]) line_list.sort(key=lambda x: (x[1], x[3], x[0], x[2])) lines_list.append([j, line_list]) except Exception as e: traceback.print_exc() print("error path", j) continue return lines_list def get_angles(lines_list): angles_list = [] for _path, lines in lines_list: angles = [] for line in lines: x1, y1, x2, y2 = line if x1 == x2: k = None angle = 1.57 else: k = abs((y2 - y1) / (x2 - x1)) angle = np.arctan(k) angles.append([angle, line]) angles_list.append([_path, angles]) return angles_list def get_combine_lines(angles_list): def judge_axis_distance(l1, l2, d, axis=0): if axis: if abs(l1[1] - l2[1]) <= d: return True if abs(l1[3] - l2[1]) <= d: return True if abs(l1[1] - l2[3]) <= d: return True if abs(l1[3] - l2[3]) <= d: return True else: if abs(l1[0] - l2[0]) <= d: return True if abs(l1[2] - l2[0]) <= d: return True if abs(l1[0] - l2[2]) <= d: return True if abs(l1[2] - l2[2]) <= d: return True return False def judge_point_distance(l1, l2, d): for m in range(0, len(l1), 2): x1, y1 = l1[m], l1[m+1] for n in range(0, len(l2), 2): x2, y2 = l2[n], l2[n+1] if pow(pow(x1-x2, 2) + pow(y1-y2, 2), 0.5) <= d: return True angle_threshold = 8 distance_threshold = 5 combine_threshold = 20 all_combined_line_list = [] cnt = 0 for _path, angles in angles_list: # if cnt % 10 == 0: # print("Loop", cnt) # cnt += 1 # print("path", _path) # 获取需合并线 one_image_equal_lines = [] has_equal_lines = [] for i in range(len(angles)): if angles[i] in has_equal_lines: continue equal_lines = [] angle1 = angles[i][0] line1 = angles[i][1] row_or_col_1 = abs(line1[0] - line1[2]) - abs(line1[1] - line1[3]) for j in range(i+1, len(angles)): if angles[i] in has_equal_lines: continue angle2 = angles[j][0] line2 = angles[j][1] row_or_col_2 = abs(line2[0] - line2[2]) - abs(line2[1] - line2[3]) if row_or_col_1 * row_or_col_2 <= 0: continue # 判断角度相差 if abs(angle1 - angle2) <= angle_threshold: # 判断距离 if row_or_col_2 >= 0: if judge_axis_distance(line1, line2, combine_threshold, axis=1): if judge_point_distance(line1, line2, distance_threshold): equal_lines.append(angles[j]) # has_equal_lines.append(angles[j]) else: if judge_axis_distance(line1, line2, combine_threshold, axis=0): if judge_point_distance(line1, line2, distance_threshold): equal_lines.append(angles[j]) # has_equal_lines.append(angles[j]) equal_lines.append(angles[i]) # has_equal_lines.append(angles[i]) # print("equal_lines", equal_lines) one_image_equal_lines.append(equal_lines) # print("one_image_equal_lines", one_image_equal_lines) # 对组进行合并 combined_group = [] for group1 in one_image_equal_lines: for group2 in one_image_equal_lines: # print(group1, "-", group2) find_flag = False for line in group2: if line in group1: group1 += group2 find_flag = True break if find_flag: break group1 = [str(x) for x in group1] group1 = list(set(group1)) group1 = [eval(x) for x in group1] combined_group.append(group1) combined_group = [str(x) for x in combined_group] combined_group = list(set(combined_group)) combined_group = [eval(x) for x in combined_group] # 对符合条件的线合并 combined_line_list = [] for equal_lines in combined_group: if len(equal_lines) > 1: x_plus_y_min = 10000 x_plus_y_max = 0 point_max = (0, 0) point_min = (0, 0) for angle, line in equal_lines: # print("angle, line", angle, line) if line[0] + line[1] <= x_plus_y_min: x_plus_y_min = line[0] + line[1] point_min = (line[0], line[1]) if line[0] + line[1] > x_plus_y_max: x_plus_y_max = line[0] + line[1] point_max = (line[0], line[1]) if line[2] + line[3] <= x_plus_y_min: x_plus_y_min = line[2] + line[3] point_min = (line[2], line[3]) if line[2] + line[3] > x_plus_y_max: x_plus_y_max = line[2] + line[3] point_max = (line[2], line[3]) combined_line = [point_min[0], point_min[1], point_max[0], point_max[1]] else: combined_line = equal_lines[0][1] combined_line_list.append(combined_line) combined_line_list = [str(x) for x in combined_line_list] combined_line_list = list(set(combined_line_list)) combined_line_list = [eval(x) for x in combined_line_list] combined_line_list.sort(key=lambda x: (x[1], x[3], x[0], x[2])) all_combined_line_list.append([_path, combined_line_list]) return all_combined_line_list def show_lines(lines_list): for _path, lines in lines_list: with open(_path, "r") as f: _dict = json.loads(f.read()) img_bytes = base64.b64decode(_dict.get("imageData").encode("utf-8")) img_np = np.frombuffer(img_bytes, np.uint8) img_cv = cv2.imdecode(img_np, cv2.IMREAD_ANYCOLOR) print("path", _path) print("img_cv.shape", img_cv.shape) cv2.namedWindow(_path, 0) cv2.resizeWindow(_path, 1000, 800) for line in lines: # 随机颜色 color = np.random.randint(0, 255, 3, dtype=np.int32) color = (np.int(color[0]), np.int(color[1]), np.int(color[2])) cv2.line(img_cv, (int(line[0]), int(line[1])), (int(line[2]), int(line[3])), color, thickness=10) # print("show", line) cv2.imshow(_path, img_cv) cv2.waitKey(0) def to_json(lines_list): for _path, lines in lines_list: with open(_path, "r") as f: _dict = json.loads(f.read()) new_shapes = [] for line in lines: temp_dict = {"group_id": "null", "shape_type": "line", "flags": {}} temp_dict.update({"points": [[int(line[0]), int(line[1])], [int(line[2]), int(line[3])]]}) if abs(line[0] - line[2]) >= abs(line[1] - line[3]): temp_dict.update({"label": "0"}) else: temp_dict.update({"label": "1"}) new_shapes.append(temp_dict) _dict["shapes"] = new_shapes new_path = "C:/Table_Label/clean_data/" + _path.split(os.sep)[-1] with open(new_path, "w") as f: f.write(json.dumps(_dict)) def check(): lines = get_lines() angles_list = get_angles(lines) # 分割数据处理 batch = 100 all_combined_lines = [] for i in range(0, len(angles_list), batch): if i % 10 == 0: print("Loop", i) batch_list = angles_list[i:i+batch] combined_lines = get_combine_lines(batch_list) all_combined_lines += combined_lines to_json(all_combined_lines) # show_lines(combined_lines) if __name__ == '__main__': check()