123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- import base64
- import json
- import os
- import traceback
- import cv2
- import numpy as np
- label_img_dir = "C:/Table_Label/"
- def get_lines(line_cnt=None):
- json_list = []
- for root, dirs, files in os.walk(label_img_dir, topdown=False):
- for name in files:
- if name.split(".")[-1] == 'json':
- json_list.append(root + os.path.sep + name)
- if line_cnt is not None:
- json_list = json_list[:line_cnt]
- print("len(json_list)", len(json_list))
- # 解析json
- lines_list = []
- for j in json_list:
- try:
- with open(j, "r") as f:
- _dict = json.loads(f.read())
- _list = _dict.get("shapes")
- line_list = []
- for d in _list:
- points = d.get("points")
- if len(points) < 2:
- continue
- if points[0][0] <= points[1][0]:
- line_list.append([round(points[0][0], 2), round(points[0][1], 2),
- round(points[1][0], 2), round(points[1][1], 2)])
- else:
- line_list.append([round(points[1][0], 2), round(points[1][1], 2),
- round(points[0][0], 2), round(points[0][1], 2)])
- line_list.sort(key=lambda x: (x[1], x[3], x[0], x[2]))
- lines_list.append([j, line_list])
- except Exception as e:
- traceback.print_exc()
- print("error path", j)
- continue
- return lines_list
- def get_angles(lines_list):
- angles_list = []
- for _path, lines in lines_list:
- angles = []
- for line in lines:
- x1, y1, x2, y2 = line
- if x1 == x2:
- k = None
- angle = 1.57
- else:
- k = abs((y2 - y1) / (x2 - x1))
- angle = np.arctan(k)
- angles.append([angle, line])
- angles_list.append([_path, angles])
- return angles_list
- def get_combine_lines(angles_list):
- def judge_axis_distance(l1, l2, d, axis=0):
- if axis:
- if abs(l1[1] - l2[1]) <= d:
- return True
- if abs(l1[3] - l2[1]) <= d:
- return True
- if abs(l1[1] - l2[3]) <= d:
- return True
- if abs(l1[3] - l2[3]) <= d:
- return True
- else:
- if abs(l1[0] - l2[0]) <= d:
- return True
- if abs(l1[2] - l2[0]) <= d:
- return True
- if abs(l1[0] - l2[2]) <= d:
- return True
- if abs(l1[2] - l2[2]) <= d:
- return True
- return False
- def judge_point_distance(l1, l2, d):
- for m in range(0, len(l1), 2):
- x1, y1 = l1[m], l1[m+1]
- for n in range(0, len(l2), 2):
- x2, y2 = l2[n], l2[n+1]
- if pow(pow(x1-x2, 2) + pow(y1-y2, 2), 0.5) <= d:
- return True
- angle_threshold = 8
- distance_threshold = 5
- combine_threshold = 20
- all_combined_line_list = []
- cnt = 0
- for _path, angles in angles_list:
- # if cnt % 10 == 0:
- # print("Loop", cnt)
- # cnt += 1
- # print("path", _path)
- # 获取需合并线
- one_image_equal_lines = []
- has_equal_lines = []
- for i in range(len(angles)):
- if angles[i] in has_equal_lines:
- continue
- equal_lines = []
- angle1 = angles[i][0]
- line1 = angles[i][1]
- row_or_col_1 = abs(line1[0] - line1[2]) - abs(line1[1] - line1[3])
- for j in range(i+1, len(angles)):
- if angles[i] in has_equal_lines:
- continue
- angle2 = angles[j][0]
- line2 = angles[j][1]
- row_or_col_2 = abs(line2[0] - line2[2]) - abs(line2[1] - line2[3])
- if row_or_col_1 * row_or_col_2 <= 0:
- continue
- # 判断角度相差
- if abs(angle1 - angle2) <= angle_threshold:
- # 判断距离
- if row_or_col_2 >= 0:
- if judge_axis_distance(line1, line2, combine_threshold, axis=1):
- if judge_point_distance(line1, line2, distance_threshold):
- equal_lines.append(angles[j])
- # has_equal_lines.append(angles[j])
- else:
- if judge_axis_distance(line1, line2, combine_threshold, axis=0):
- if judge_point_distance(line1, line2, distance_threshold):
- equal_lines.append(angles[j])
- # has_equal_lines.append(angles[j])
- equal_lines.append(angles[i])
- # has_equal_lines.append(angles[i])
- # print("equal_lines", equal_lines)
- one_image_equal_lines.append(equal_lines)
- # print("one_image_equal_lines", one_image_equal_lines)
- # 对组进行合并
- combined_group = []
- for group1 in one_image_equal_lines:
- for group2 in one_image_equal_lines:
- # print(group1, "-", group2)
- find_flag = False
- for line in group2:
- if line in group1:
- group1 += group2
- find_flag = True
- break
- if find_flag:
- break
- group1 = [str(x) for x in group1]
- group1 = list(set(group1))
- group1 = [eval(x) for x in group1]
- combined_group.append(group1)
- combined_group = [str(x) for x in combined_group]
- combined_group = list(set(combined_group))
- combined_group = [eval(x) for x in combined_group]
- # 对符合条件的线合并
- combined_line_list = []
- for equal_lines in combined_group:
- if len(equal_lines) > 1:
- x_plus_y_min = 10000
- x_plus_y_max = 0
- point_max = (0, 0)
- point_min = (0, 0)
- for angle, line in equal_lines:
- # print("angle, line", angle, line)
- if line[0] + line[1] <= x_plus_y_min:
- x_plus_y_min = line[0] + line[1]
- point_min = (line[0], line[1])
- if line[0] + line[1] > x_plus_y_max:
- x_plus_y_max = line[0] + line[1]
- point_max = (line[0], line[1])
- if line[2] + line[3] <= x_plus_y_min:
- x_plus_y_min = line[2] + line[3]
- point_min = (line[2], line[3])
- if line[2] + line[3] > x_plus_y_max:
- x_plus_y_max = line[2] + line[3]
- point_max = (line[2], line[3])
- combined_line = [point_min[0], point_min[1], point_max[0], point_max[1]]
- else:
- combined_line = equal_lines[0][1]
- combined_line_list.append(combined_line)
- combined_line_list = [str(x) for x in combined_line_list]
- combined_line_list = list(set(combined_line_list))
- combined_line_list = [eval(x) for x in combined_line_list]
- combined_line_list.sort(key=lambda x: (x[1], x[3], x[0], x[2]))
- all_combined_line_list.append([_path, combined_line_list])
- return all_combined_line_list
- def show_lines(lines_list):
- for _path, lines in lines_list:
- with open(_path, "r") as f:
- _dict = json.loads(f.read())
- img_bytes = base64.b64decode(_dict.get("imageData").encode("utf-8"))
- img_np = np.frombuffer(img_bytes, np.uint8)
- img_cv = cv2.imdecode(img_np, cv2.IMREAD_ANYCOLOR)
- print("path", _path)
- print("img_cv.shape", img_cv.shape)
- cv2.namedWindow(_path, 0)
- cv2.resizeWindow(_path, 1000, 800)
- for line in lines:
- # 随机颜色
- color = np.random.randint(0, 255, 3, dtype=np.int32)
- color = (np.int(color[0]), np.int(color[1]), np.int(color[2]))
- cv2.line(img_cv,
- (int(line[0]), int(line[1])), (int(line[2]), int(line[3])),
- color, thickness=10)
- # print("show", line)
- cv2.imshow(_path, img_cv)
- cv2.waitKey(0)
- def to_json(lines_list):
- for _path, lines in lines_list:
- with open(_path, "r") as f:
- _dict = json.loads(f.read())
- new_shapes = []
- for line in lines:
- temp_dict = {"group_id": "null", "shape_type": "line", "flags": {}}
- temp_dict.update({"points": [[int(line[0]), int(line[1])], [int(line[2]), int(line[3])]]})
- if abs(line[0] - line[2]) >= abs(line[1] - line[3]):
- temp_dict.update({"label": "0"})
- else:
- temp_dict.update({"label": "1"})
- new_shapes.append(temp_dict)
- _dict["shapes"] = new_shapes
- new_path = "C:/Table_Label/clean_data/" + _path.split(os.sep)[-1]
- with open(new_path, "w") as f:
- f.write(json.dumps(_dict))
- def check():
- lines = get_lines()
- angles_list = get_angles(lines)
- # 分割数据处理
- batch = 100
- all_combined_lines = []
- for i in range(0, len(angles_list), batch):
- if i % 10 == 0:
- print("Loop", i)
- batch_list = angles_list[i:i+batch]
- combined_lines = get_combine_lines(batch_list)
- all_combined_lines += combined_lines
- to_json(all_combined_lines)
- # show_lines(combined_lines)
- if __name__ == '__main__':
- check()
|