import hashlib import inspect import json import os import socket import subprocess import sys from io import BytesIO from subprocess import Popen import requests sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") import difflib import logging import mimetypes import platform import re import traceback import filetype from bs4 import BeautifulSoup import yaml from pdfminer.layout import * from format_convert import _global from functools import wraps import psutil import time from format_convert.judge_platform import get_platform if get_platform() == "Linux": import resource def judge_error_code(_list, code=[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]): """ [0] : continue [-1]: 逻辑处理错误 [-2]: 接口调用错误 [-3]: 文件格式错误,无法打开 [-4]: 各类文件调用第三方包读取超时 [-5]: 整个转换过程超时 [-6]: 阿里云UDF队列超时 [-7]: 文件需密码,无法打开 [-8]: 调用现成接口报错 [-9]: 接口接收数据为空 """ for c in code: if _list == [c]: return True return False def add_div(text): if text == "" or text is None: return text # if get_platform() == "Windows": # print("add_div", text) if re.findall("
", text): return text text = "
" + text + "\n" text = re.sub("\n", "
\n
", text) # text += "
" if text[-5:] == "
": # print("add_div has cut", text[-30:]) text = text[:-5] return text def get_platform(): sys = platform.system() return sys def get_html_p(html_path): log("into get_html_p") try: with open(html_path, "r") as ff: html_str = ff.read() soup = BeautifulSoup(html_str, 'lxml') text = "" for p in soup.find_all("p"): p_text = p.text p_text = p_text.strip() if p.string != "": text += p_text text += "\n" return text except Exception as e: log("get_html_p error!") return [-1] def string_similarity(str1, str2): # 去掉
和回车 str1 = re.sub("
", "", str1) str1 = re.sub("
", "", str1) str1 = re.sub("\n", "", str1) str2 = re.sub("
", "", str2) str2 = re.sub("
", "", str2) str2 = re.sub("\n", "", str2) # print("********************************") # print("str1", str1) # print("********************************") # print("str2", str2) # print("********************************") score = difflib.SequenceMatcher(None, str1, str2).ratio() print("string_similarity", score) return score def get_sequential_data(text_list, bbox_list, html=False): logging.info("into get_sequential_data") try: text = "" order_list = [] for i in range(len(text_list)): length_start = bbox_list[i][0][0] length_end = bbox_list[i][1][0] height_start = bbox_list[i][0][1] height_end = bbox_list[i][-1][1] # print([length_start, length_end, height_start, height_end]) order_list.append([text_list[i], length_start, length_end, height_start, height_end]) # text = text + infomation['text'] + "\n" if get_platform() == "Windows": print("get_sequential_data", order_list) if not order_list: if get_platform() == "Windows": print("get_sequential_data", "no order list") return "" # 根据bbox的坐标对输出排序 order_list.sort(key=lambda x: (x[3], x[1], x[0])) # 根据bbox分行分列 # col_list = [] # height_end = int((order_list[0][4] + order_list[0][3]) / 2) # for i in range(len(order_list)): # if height_end - threshold <= order_list[i][3] <= height_end + threshold: # col_list.append(order_list[i]) # else: # row_list.append(col_list) # col_list = [] # height_end = int((order_list[i][4] + order_list[i][3]) / 2) # col_list.append(order_list[i]) # if i == len(order_list) - 1: # row_list.append(col_list) row_list = [] used_box = [] threshold = 5 for box in order_list: if box in used_box: continue height_center = (box[4] + box[3]) / 2 row = [] for box2 in order_list: if box2 in used_box: continue height_center2 = (box2[4] + box2[3]) / 2 if height_center - threshold <= height_center2 <= height_center + threshold: if box2 not in row: row.append(box2) used_box.append(box2) row.sort(key=lambda x: x[0]) row_list.append(row) for row in row_list: if not row: continue if len(row) <= 1: text = text + row[0][0] + "\n" else: sub_text = "" row.sort(key=lambda x: x[1]) for col in row: sub_text = sub_text + col[0] + " " sub_text = sub_text + "\n" text += sub_text if html: text = "
" + text text = re.sub("\n", "
\n
", text) text += "
" # if text[-5:] == "
": # text = text[:-5] return text except Exception as e: logging.info("get_sequential_data error!") print("get_sequential_data", traceback.print_exc()) return [-1] # def get_formatted_table(text_list, text_bbox_list, table_bbox_list, split_line): # logging.info("into get_formatted_table") # try: # # 重新定义text_bbox_list,[point, point, text] # text_bbox_list = [[text_bbox_list[i][0], text_bbox_list[i][2], text_list[i]] for i in # range(len(text_bbox_list))] # # 按纵坐标排序 # text_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # # # print("text_bbox_list", text_bbox_list) # # print("table_bbox_list", table_bbox_list) # # # bbox位置 threshold # threshold = 5 # # # 根据split_line分区,可能有个区多个表格 [(), ()] # area_text_bbox_list = [] # area_table_bbox_list = [] # # print("get_formatted_table, split_line", split_line) # for j in range(1, len(split_line)): # last_y = split_line[j - 1][0][1] # current_y = split_line[j][0][1] # temp_text_bbox_list = [] # temp_table_bbox_list = [] # # # 找出该区域下text bbox # for text_bbox in text_bbox_list: # # 计算 text bbox 中心点 # text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, # (text_bbox[1][1] + text_bbox[0][1]) / 2) # if last_y - threshold <= text_bbox_center[1] <= current_y + threshold: # temp_text_bbox_list.append(text_bbox) # area_text_bbox_list.append(temp_text_bbox_list) # # # 找出该区域下table bbox # for table_bbox in table_bbox_list: # # 计算 table bbox 中心点 # table_bbox_center = ((table_bbox[1][0] + table_bbox[0][0]) / 2, # (table_bbox[1][1] + table_bbox[0][1]) / 2) # if last_y < table_bbox_center[1] < current_y: # temp_table_bbox_list.append(table_bbox) # area_table_bbox_list.append(temp_table_bbox_list) # # # for j in range(len(area_text_bbox_list)): # # print("area_text_bbox_list", j, area_text_bbox_list[j]) # # # 对每个区域分别进行两个bbox匹配,生成表格 # area_text_list = [] # area_column_list = [] # for j in range(len(area_text_bbox_list)): # # 每个区域的table bbox 和text bbox # temp_table_bbox_list = area_table_bbox_list[j] # temp_text_bbox_list = area_text_bbox_list[j] # # # 判断该区域有无表格bbox # # 若无表格,将该区域文字连接 # if not temp_table_bbox_list: # # 找出该区域的所有text bbox # only_text_list = [] # only_bbox_list = [] # for text_bbox in temp_text_bbox_list: # only_text_list.append(text_bbox[2]) # only_bbox_list.append([text_bbox[0], text_bbox[1]]) # only_text = get_sequential_data(only_text_list, only_bbox_list, True) # if only_text == [-1]: # return [-1], [-1] # area_text_list.append(only_text) # area_column_list.append(0) # continue # # # 有表格 # # 文本对应的表格格子 # text_in_table = {} # for i in range(len(temp_text_bbox_list)): # text_bbox = temp_text_bbox_list[i] # # # 计算 text bbox 中心点 # text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, # (text_bbox[1][1] + text_bbox[0][1]) / 2) # # # 判断中心点在哪个table bbox中 # for table_bbox in temp_table_bbox_list: # # 中心点在table bbox中,将text写入字典 # if table_bbox[0][0] <= text_bbox_center[0] <= table_bbox[1][0] and \ # table_bbox[0][1] <= text_bbox_center[1] <= table_bbox[1][1]: # if str(table_bbox) in text_in_table.keys(): # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # else: # text_in_table[str(table_bbox)] = text_bbox[2] # break # # # 如果未找到text bbox匹配的table bbox,加大threshold匹配 # # elif (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]): # # if str(table_bbox) in text_in_table.keys(): # # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # # else: # # text_in_table[str(table_bbox)] = text_bbox[2] # # break # # # 对表格格子进行分行分列,并计算总计多少小列 # # 放入坐标 # all_col_list = [] # all_row_list = [] # for i in range(len(temp_table_bbox_list)): # table_bbox = temp_table_bbox_list[i] # # # 放入所有坐标x # if table_bbox[0][0] not in all_col_list: # all_col_list.append(table_bbox[0][0]) # if table_bbox[1][0] not in all_col_list: # all_col_list.append(table_bbox[1][0]) # # # 放入所有坐标y # if table_bbox[0][1] not in all_row_list: # all_row_list.append(table_bbox[0][1]) # if table_bbox[1][1] not in all_row_list: # all_row_list.append(table_bbox[1][1]) # all_col_list.sort(key=lambda x: x) # all_row_list.sort(key=lambda x: x) # # # 分行 # row_list = [] # rows = [] # temp_table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0], x[1][1], x[1][0])) # y_row = temp_table_bbox_list[0][0][1] # for i in range(len(temp_table_bbox_list)): # table_bbox = temp_table_bbox_list[i] # # if y_row - threshold <= table_bbox[0][1] <= y_row + threshold: # rows.append(table_bbox) # else: # y_row = table_bbox[0][1] # if rows: # rows.sort(key=lambda x: x[0][0]) # row_list.append(rows) # rows = [] # rows.append(table_bbox) # # print("*" * 30) # # print(row_list) # # if i == len(temp_table_bbox_list) - 1: # if rows: # rows.sort(key=lambda x: x[0][0]) # row_list.append(rows) # # # 生成表格,包括文字和格子宽度 # area_column = [] # text = '' + "\n" # for row in row_list: # text += "" + "\n" # for col in row: # # 计算bbox y坐标之间有多少其他点,+1即为所占行数 # row_span = 1 # for y in all_row_list: # if col[0][1] < y < col[1][1]: # if y - col[0][1] >= 2 and col[1][1] - y >= 2: # row_span += 1 # # # 计算bbox x坐标之间有多少其他点,+1即为所占列数 # col_span = 1 # for x in all_col_list: # if col[0][0] < x < col[1][0]: # if x - col[0][0] >= 2 and col[1][0] - x >= 2: # col_span += 1 # # text += "" + "\n" # text += "" + "\n" # text += "
" # # if str(col) in text_in_table.keys(): # text += text_in_table.get(str(col)) # else: # text += "" # text += "
" + "\n" # # # 计算最大column # max_col_num = 0 # for row in row_list: # col_num = 0 # for col in row: # col_num += 1 # if max_col_num < col_num: # max_col_num = col_num # # area_text_list.append(text) # area_column_list.append(max_col_num) # # text = "" # if get_platform() == "Windows": # print("get_formatted_table area_text_list", area_text_list) # for area_text in area_text_list: # text += area_text # return text, area_column_list # except Exception as e: # logging.info("get_formatted_table error!") # print("get_formatted_table", traceback.print_exc()) # return [-1], [-1] def rename_inner_files(root_path): try: logging.info("into rename_inner_files") # 获取解压文件夹下所有文件+文件夹,不带根路径 path_list = [] for root, dirs, files in os.walk(root_path, topdown=False): for name in dirs: p = os.path.join(root, name) + os.sep if get_platform() == "Windows": root_path = slash_replace(root_path) p = slash_replace(p) p = re.sub(root_path, "", p) root_path = slash_replace(root_path, True) p = slash_replace(p, True) else: p = re.sub(root_path, "", p) path_list.append(p) for name in files: p = os.path.join(root, name) if get_platform() == "Windows": root_path = slash_replace(root_path) p = slash_replace(p) p = re.sub(root_path, "", p) root_path = slash_replace(root_path, True) p = slash_replace(p, True) else: p = re.sub(root_path, "", p) path_list.append(p) # 按路径长度排序 path_list.sort(key=lambda x: len(x), reverse=True) # 循环改名 for old_path in path_list: # 按路径分隔符分割 ss = old_path.split(os.sep) # 判断是否文件夹 is_dir = 0 file_type = "" if os.path.isdir(root_path + old_path): ss = ss[:-1] is_dir = 1 else: if "." in old_path: file_type = "." + old_path.split(".")[-1] else: file_type = "" # 最后一级需要用hash改名 new_path = "" # new_path = re.sub(ss[-1], str(hash(ss[-1])), old_path) + file_type current_level = 0 for s in ss: # 路径拼接 if current_level < len(ss) - 1: new_path += s + os.sep else: new_path += str(hash(s)) + file_type current_level += 1 new_ab_path = root_path + new_path old_ab_path = root_path + old_path os.rename(old_ab_path, new_ab_path) # 重新获取解压文件夹下所有文件+文件夹 new_path_list = [] for root, dirs, files in os.walk(root_path, topdown=False): for name in dirs: new_path_list.append(os.path.join(root, name) + os.sep) for name in files: new_path_list.append(os.path.join(root, name)) return new_path_list except: traceback.print_exc() return [-1] def judge_format(path): guess1 = mimetypes.guess_type(path) _type = None if guess1[0]: _type = guess1[0] else: guess2 = filetype.guess(path) if guess2: _type = guess2.mime if _type == "application/pdf": return "pdf" if _type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": return "docx" if _type == "application/x-zip-compressed" or _type == "application/zip": return "zip" if _type == "application/x-rar-compressed" or _type == "application/rar": return "rar" if _type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": return "xlsx" if _type == "application/msword": return "doc" if _type == "image/png": return "png" if _type == "image/jpeg": return "jpg" # 猜不到,返回None return None def slash_replace(_str, reverse=False): if reverse: _str = eval(repr(_str).replace('/', '\\\\')) else: _str = eval(repr(_str).replace('\\\\', '/')) return _str class LineTable: def recognize_table(self,list_textbox, list_line,sourceP_LB=True): self.list_line = list_line self.list_crosspoints = self.recognize_crosspoints(list_line) # 聚类 cluster_crosspoints = [] for _point in self.list_crosspoints: cluster_crosspoints.append({"lines": _point.get("lines"), "points": [_point]}) while 1: _find = False new_cluster_crosspoints = [] for l_point in cluster_crosspoints: _flag = False for l_n_point in new_cluster_crosspoints: line1 = l_point.get("lines") line2 = l_n_point.get("lines") if len(line1&line2) > 0: _find = True _flag = True l_n_point["lines"] = line1.union(line2) l_n_point["points"].extend(l_point["points"]) if not _flag: new_cluster_crosspoints.append({"lines":l_point.get("lines"),"points":l_point.get("points")}) cluster_crosspoints = new_cluster_crosspoints if not _find: break list_l_rect = [] for table_crosspoint in cluster_crosspoints: list_rect = self.crosspoint2rect(table_crosspoint.get("points")) list_l_rect.append(list_rect) in_objs = set() list_tables = [] for l_rect in list_l_rect: _ta = self.rect2table(list_textbox,l_rect,in_objs,sourceP_LB=sourceP_LB) if _ta: list_tables.append(_ta) # self._plot(list_line, list_textbox) return list_tables, in_objs, list_l_rect def recognize_table_by_rect(self, list_textbox, list_rect, margin=2): dump_margin = 5 list_rect_tmp = [] # 去重 for _rect in list_rect: if (_rect.bbox[3]-_rect.bbox[1] < 10) or (abs(_rect.bbox[2]-_rect.bbox[0]) < 5): continue _find = False for _tmp in list_rect_tmp: for i in range(4): if abs(_rect.bbox[i]-_tmp.bbox[i]) < dump_margin: pass else: _find = False break if i == 3: _find = True if _find: break if not _find: list_rect_tmp.append(_rect) # print("=====",len(list_rect),len(list_rect_tmp)) # print(list_rect_tmp) # from matplotlib import pyplot as plt # plt.figure() # for _rect in list_rect_tmp: # x0,y0,x1,y1 = _rect.bbox # plt.boxplot(_rect.bbox) # plt.show() cluster_rect = [] for _rect in list_rect: _find = False for cr in cluster_rect: for cr_rect in cr: if abs((cr_rect.bbox[2]-cr_rect.bbox[0]+_rect.bbox[2]-_rect.bbox[0])-(max(cr_rect.bbox[2],_rect.bbox[2])-min(cr_rect.bbox[0],_rect.bbox[0])))0: _find = True _flag = True l_n_point["lines"] = line1.union(line2) l_n_point["points"].extend(l_point["points"]) if not _flag: new_cluster_crosspoints.append({"lines":l_point.get("lines"),"points":l_point.get("points")}) cluster_crosspoints = new_cluster_crosspoints if not _find: break list_crosspoints = [] for list_cp in cluster_crosspoints: points = list_cp.get("points") l_lines = [] for p in points: l_lines.extend(p.get("p_lines")) l_lines = list(set(l_lines)) l_lines.sort(key=lambda x:x[0]) min_x,_count = getMaxPoints([l[0] for l in l_lines],reverse=False) if _count<=2: min_x = None min_y,_count = getMaxPoints([l[1] for l in l_lines],reverse=False) if _count<2: min_y = None max_x,_count = getMaxPoints([l[2] for l in l_lines],reverse=True) if _count<=2: max_x = None max_y,_count = getMaxPoints([l[3] for l in l_lines],reverse=True) if _count<=2: max_y = None if min_x and min_y and max_x and max_y: points.sort(key=lambda x:x["point"][0]) if abs(min_x-points[0]["point"][0])>30: _line = LTLine(1,(min_x,min_y),(min_x,max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====",_line.bbox) if abs(max_x-points[-1]["point"][0])>30: _line = LTLine(1,(max_x,min_y),(max_x,max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====1",_line.bbox) points.sort(key=lambda x:x["point"][1]) if abs(min_y-points[0]["point"][1])>30: _line = LTLine(1,(min_x,min_y),(max_x,min_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====2",_line.bbox) if abs(max_y-points[-1]["point"][1])>30: _line = LTLine(1,(min_x,max_y),(max_x,max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====2",_line.bbox) for _i in range(len(l_lines)): for _j in range(len(l_lines)): line1 = l_lines[_i] line2 = l_lines[_j] exists,point = self.cross_point(line1,line2) if exists: list_crosspoints.append(point) # from matplotlib import pyplot as plt # plt.figure() # for _line in l_lines: # x0,y0,x1,y1 = _line # plt.plot([x0,x1],[y0,y1]) # for point in list_crosspoints: # plt.scatter(point.get("point")[0],point.get("point")[1]) # plt.show() # print(list_crosspoints) # print("points num",len(list_crosspoints)) return list_crosspoints def recognize_rect(self, _page): list_line = [] for _obj in _page._objs: if isinstance(_obj, (LTLine)): list_line.append(_obj) list_crosspoints = self.recognize_crosspoints(list_line) #聚类 cluster_crosspoints = [] for _point in list_crosspoints: cluster_crosspoints.append({"lines":_point.get("lines"),"points":[_point]}) while 1: _find = False new_cluster_crosspoints = [] for l_point in cluster_crosspoints: _flag = False for l_n_point in new_cluster_crosspoints: line1 = l_point.get("lines") line2 = l_n_point.get("lines") if len(line1&line2)>0: _find = True _flag = True l_n_point["lines"] = line1.union(line2) l_n_point["points"].extend(l_point["points"]) if not _flag: new_cluster_crosspoints.append({"lines":l_point.get("lines"),"points":l_point.get("points")}) cluster_crosspoints = new_cluster_crosspoints if not _find: break # print(len(cluster_crosspoints)) list_l_rect = [] for table_crosspoint in cluster_crosspoints: list_rect = self.crosspoint2rect(table_crosspoint.get("points")) list_l_rect.append(list_rect) return list_l_rect def crosspoint2rect(self, list_crosspoint, margin=5): dict_line_points = {} for _point in list_crosspoint: lines = list(_point.get("lines")) for _line in lines: if _line not in dict_line_points: dict_line_points[_line] = {"direct":None,"points":[]} dict_line_points[_line]["points"].append(_point) # 排序 for k, v in dict_line_points.items(): list_x = [] list_y = [] for _p in v["points"]: list_x.append(_p.get("point")[0]) list_y.append(_p.get("point")[1]) if max(list_x)-min(list_x)>max(list_y)-min(list_y): v.get("points").sort(key=lambda x:x.get("point")[0]) v["direct"] = "row" else: v.get("points").sort(key=lambda x:x.get("point")[1]) v["direct"] = "column" list_rect = [] for _point in list_crosspoint: if _point["buttom"]>=margin and _point["right"]>=margin: lines = list(_point.get("lines")) _line = lines[0] if dict_line_points[_line]["direct"]=="column": _line = lines[1] next_point = None for p1 in dict_line_points[_line]["points"]: if p1["buttom"]>=margin and p1["point"][0]>_point["point"][0]: next_point = p1 break if not next_point: continue lines = list(next_point.get("lines")) _line = lines[0] if dict_line_points[_line]["direct"]=="row": _line = lines[1] final_point = None for p1 in dict_line_points[_line]["points"]: if p1["left"]>=margin and p1["point"][1]>next_point["point"][1]: final_point = p1 break if not final_point: continue _r = LTRect(1,(_point["point"][0],_point["point"][1],final_point["point"][0],final_point["point"][1])) list_rect.append(_r) tmp_rect = [] set_bbox = set() for _r in list_rect: _bbox = "%.2f-%.2f-%.2f-%.2f"%_r.bbox width = _r.bbox[2]-_r.bbox[0] height = _r.bbox[3]-_r.bbox[1] if width<=margin or height<=margin: continue if _bbox not in set_bbox: tmp_rect.append(_r) set_bbox.add(_bbox) list_rect = tmp_rect # import cv2 # import numpy as np # import random # img = np.zeros(shape=(1000,1000),dtype=np.uint8) # img += 255 # # color = [] # for rect in list_rect: # color += 10 # x0,y0,x1,y1 = rect.bbox # x0 *= 10/18 # y0 *= 10/18 # x1 *= 10/18 # y1 *= 10/18 # print(rect.bbox) # cv2.rectangle(img, (int(x0),int(y0)),(int(x1),int(y1)), (color%255, (color+10)%255, (color+20)%255), 3) # cv2.imshow("bbox", img) # cv2.waitKey(0) return list_rect def cross_point(self, line1, line2, segment=True, margin=2): point_is_exist = False x = y = 0 x1, y1, x2, y2 = line1 x3, y3, x4, y4 = line2 if (x2 - x1) == 0: k1 = None b1 = 0 else: k1 = (y2 - y1) * 1.0 / (x2 - x1) # 计算k1,由于点均为整数,需要进行浮点数转化 b1 = y1 * 1.0 - x1 * k1 * 1.0 # 整型转浮点型是关键 if (x4 - x3) == 0: # L2直线斜率不存在 k2 = None b2 = 0 else: k2 = (y4 - y3) * 1.0 / (x4 - x3) # 斜率存在 b2 = y3 * 1.0 - x3 * k2 * 1.0 if k1 is None: if not k2 is None: x = x1 y = k2 * x1 + b2 point_is_exist = True elif k2 is None: x = x3 y = k1 * x3 + b1 elif not k2 == k1: x = (b2 - b1) * 1.0 / (k1 - k2) y = k1 * x * 1.0 + b1 * 1.0 point_is_exist = True left = 0 right = 0 top = 0 buttom = 0 if point_is_exist: if segment: if x>=(min(x1,x2)-margin) and x<=(max(x1,x2)+margin) and y>=(min(y1,y2)-margin) and y<=(max(y1,y2)+margin): if x>=(min(x3,x4)-margin) and x<=(max(x3,x4)+margin) and y>=(min(y3,y4)-margin) and y<=(max(y3,y4)+margin): point_is_exist = True left = abs(min(x1,x3)-x) right = abs(max(x2,x4)-x) top = abs(min(y1,y3)-y) buttom = abs(max(y2,y4)-y) else: point_is_exist = False else: point_is_exist = False line1_key = "%.2f-%.2f-%.2f-%.2f"%(x1, y1, x2, y2) line2_key = "%.2f-%.2f-%.2f-%.2f"%(x3, y3, x4, y4) return point_is_exist, {"point": [x, y], "left": left, "right": right, "top": top, "buttom": buttom, "lines": set([line1_key,line2_key]),"p_lines":[line1,line2]} def unionTable(self, list_table, fixspan=True, margin=2): set_x = set() set_y = set() list_cell = [] for _t in list_table: for _line in _t: list_cell.extend(_line) clusters_rects = [] #根据y1聚类 set_id = set() list_cell_dump = [] for _cell in list_cell: _id = id(_cell) if _id in set_id: continue set_id.add(_id) list_cell_dump.append(_cell) list_cell = list_cell_dump list_cell.sort(key=lambda x:x.get("bbox")[3]) for _rect in list_cell: _y0 = _rect.get("bbox")[3] _find = False for l_cr in clusters_rects: if abs(l_cr[0].get("bbox")[3]-_y0)<2: _find = True l_cr.append(_rect) break if not _find: clusters_rects.append([_rect]) clusters_rects.sort(key=lambda x:x[0].get("bbox")[3],reverse=True) for l_cr in clusters_rects: l_cr.sort(key=lambda x:x.get("bbox")[0]) # print("=============:") # for l_r in clusters_rects: # print(len(l_r)) for _line in clusters_rects: for _rect in _line: (x0,y0,x1,y1) = _rect.get("bbox") set_x.add(x0) set_x.add(x1) set_y.add(y0) set_y.add(y1) if len(set_x)==0 or len(set_y)==0: return list_x = list(set_x) list_y = list(set_y) list_x.sort(key=lambda x:x) list_y.sort(key=lambda x:x,reverse=True) _table = [] for _line in clusters_rects: table_line = [] for _rect in _line: (x0,y0,x1,y1) = _rect.get("bbox") _cell = {"bbox":(x0,y0,x1,y1),"rect":_rect.get("rect"),"rowspan":self.getspan(list_y,y0,y1,margin),"columnspan":self.getspan(list_x,x0,x1,margin),"text":_rect.get("text","")} table_line.append(_cell) _table.append(table_line) # print("=====================>>") # for _line in _table: # for _cell in _line: # print(_cell,end="\t") # print("\n") # print("=====================>>") # print(_table) if fixspan: for _line in _table: extend_line = [] for c_i in range(len(_line)): _cell = _line[c_i] if _cell.get("columnspan")>1: _cospan = _cell.get("columnspan") _cell["columnspan"] = 1 for i in range(1,_cospan): extend_line.append({"index":c_i+1,"cell":_cell}) extend_line.sort(key=lambda x:x["index"],reverse=True) for _el in extend_line: _line.insert(_el["index"],_el["cell"]) for l_i in range(len(_table)): _line = _table[l_i] for c_i in range(len(_line)): _cell = _line[c_i] if _cell.get("rowspan")>1: _rospan = _cell.get("rowspan") _cell["rowspan"] = 1 for i in range(1,_rospan): _table[l_i+i].insert(c_i,_cell) table_bbox = (_table[0][0].get("bbox")[0],_table[0][0].get("bbox")[1],_table[-1][-1].get("bbox")[2],_table[-1][-1].get("bbox")[3]) ta = {"bbox":table_bbox,"table":_table} return ta def rect2table(self, list_textbox, list_rect, in_objs, margin=5, fixspan=True,sourceP_LB=True,fixRect=True): def getIOU(bbox0,bbox1): width = max(bbox0[2],bbox1[2])-min(bbox0[0],bbox1[0])-(bbox0[2]-bbox0[0]+bbox1[2]-bbox1[0]) height = max(bbox0[3],bbox1[3])-min(bbox0[1],bbox1[1])-(bbox0[3]-bbox0[1]+bbox1[3]-bbox1[1]) if width<0 and height<0: return abs(width*height/min(abs((bbox0[2]-bbox0[0])*(bbox0[3]-bbox0[1])),abs((bbox1[2]-bbox1[0])*(bbox1[3]-bbox1[1])))) return 0 _table = [] set_x = set() set_y = set() clusters_rects = [] # 根据y1聚类 if sourceP_LB: list_rect.sort(key=lambda x:x.bbox[3]) for _rect in list_rect: _y0 = _rect.bbox[3] _find = False for l_cr in clusters_rects: if abs(l_cr[0].bbox[3]-_y0)1: _cospan = _cell.get("columnspan") _cell["columnspan"] = 1 n_cell = {} n_cell.update(_cell) for i in range(1,_cospan): _line.insert(c_i,n_cell) for l_i in range(len(_table)): _line = _table[l_i] for c_i in range(len(_line)): _cell = _line[c_i] if _cell.get("rowspan")>1: _rospan = _cell.get("rowspan") _cell["rowspan"] = 1 n_cell = {} n_cell.update(_cell) for i in range(1,_rospan): if l_i+i<=len(_table)-1: # print(len(_table),l_i+i) _table[l_i+i].insert(c_i,n_cell) # print("=======") # for _line in _table: # for _cell in _line: # _text = _cell["text"][:2]+"_"+str(_cell["columnspan"])+"_"+str(_cell["rowspan"]) # if _text=="": # _text = "==" # print(_text,end="\t") # print("\n") # print("===========") if fixRect: for _line in _table: extend_line = [] for c_i in range(len(_line)): c_cell = _line[c_i] if c_i==0 and c_cell["bbox"][0]!=list_x[0]: _bbox = (list_x[0],c_cell["bbox"][1], c_cell["bbox"][0],c_cell["bbox"][3]) _cell = {"bbox": _bbox, "rect": LTRect(1,_bbox), "rowspan": self.getspan(list_y,_bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index":c_i,"cell":_cell}) if c_imargin: _bbox = (_bbox[2],_bbox[1], n_bbox[0],_bbox[3]) _cell = {"bbox": _bbox, "rect": LTRect(1,_bbox), "rowspan": self.getspan(list_y,_bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index":c_i+1,"cell":_cell}) if c_i==len(_line)-1: if abs(c_cell["bbox"][2]-list_x[-1])>margin: _bbox = (c_cell["bbox"][2],c_cell["bbox"][1], list_x[-1],c_cell["bbox"][3]) _cell = {"bbox": _bbox, "rect": LTRect(1,_bbox), "rowspan": self.getspan(list_y,_bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index":c_i+1,"cell":_cell}) extend_line.sort(key=lambda x:x["index"],reverse=True) for _tmp in extend_line: _line.insert(_tmp["index"],_tmp["cell"]) list_textbox.sort(key=lambda x:x.bbox[0]) list_textbox.sort(key=lambda x:x.bbox[3],reverse=sourceP_LB) for textbox in list_textbox: if textbox in in_objs: continue (x0,y0,x1,y1) = textbox.bbox _text = textbox.get_text() _find = False for table_line in _table: for _cell in table_line: if self.inbox(textbox.bbox,_cell["bbox"], textbox.get_text()): _cell["text"] += _text in_objs.add(textbox) _find = True break if _find: break # print("=======") # for _line in _table: # for _cell in _line: # _text = _cell["text"][:2] # if _text=="": # _text = "==" # print(_text,end="\t") # print("\n") # print("===========") table_bbox = (_table[0][0].get("bbox")[0], _table[0][0].get("bbox")[1], _table[-1][-1].get("bbox")[2], _table[-1][-1].get("bbox")[3]) ta = {"bbox": table_bbox, "table": _table} return ta def inbox(self, bbox0, bbox_g, text=""): # if bbox_g[0]<=bbox0[0] and bbox_g[1]<=bbox0[1] and bbox_g[2]>=bbox0[2] and bbox_g[3]>=bbox0[3]: # return 1 # print("utils inbox", text, self.getIOU(bbox0,bbox_g), bbox0, bbox_g) if self.getIOU(bbox0,bbox_g)>0.5: return 1 return 0 def getIOU(self, bbox0, bbox1): width = max(bbox0[2],bbox1[2])-min(bbox0[0],bbox1[0])-(bbox0[2]-bbox0[0]+bbox1[2]-bbox1[0]) height = max(bbox0[3],bbox1[3])-min(bbox0[1],bbox1[1])-(bbox0[3]-bbox0[1]+bbox1[3]-bbox1[1]) if width < 0 and height < 0: iou = abs(width*height/min(abs((bbox0[2]-bbox0[0])*(bbox0[3]-bbox0[1])), abs((bbox1[2]-bbox1[0])*(bbox1[3]-bbox1[1])))) # print("getIOU", iou) return iou return 0 def getspan(self, _list, x0, x1, margin): _count = 0 (x0,x1) = (min(x0,x1),max(x0,x1)) for _x in _list: if _x>=(x0-margin) and _x<=(x1+margin): _count += 1 return _count-1 def _plot(self, list_line, list_textbox): from matplotlib import pyplot as plt plt.figure() for _line in list_line: x0, y0, x1, y1 = _line.__dict__.get("bbox") plt.plot([x0, x1], [y0, y1]) for _line in list_line: x0, y0, x1, y1 = _line.bbox plt.plot([x0, x1], [y0, y1]) # for point in list_crosspoints: # plt.scatter(point.get("point")[0],point.get("point")[1]) for textbox in list_textbox: x0, y0, x1, y1 = textbox.bbox plt.plot([x0, x1], [y0, y1]) plt.show() def get_table_html(table): html_text = '' + "\n" for row in table: html_text += "" + "\n" for col in row: row_span = col.get("rowspan") col_span = col.get("columnspan") bbox_text = col.get("text") html_text += "" + "\n" html_text += "" + "\n" html_text += "
" html_text += bbox_text + "
" + "\n" return html_text def sort_object(obj_list, is_reverse=False): from format_convert.convert_tree import _Table, _Image, _Sentence, _Page if len(obj_list) == 0: return obj_list if isinstance(obj_list[0], (_Table, _Sentence, _Image)): obj_list.sort(key=lambda x: (x.y, x.x), reverse=is_reverse) return obj_list elif isinstance(obj_list[0], _Page): obj_list.sort(key=lambda x: x.page_no) return obj_list else: return obj_list def request_post(url, param, time_out=1000): fails = 0 text = json.dumps([-2]) while True: try: if fails >= 1: break headers = {'content-type': 'application/json'} result = requests.post(url, data=param, timeout=time_out) # print('result.status_code', result.status_code) # print('result.text', result.text) if result.status_code == 200: text = result.text break else: print('result.status_code', result.status_code) print('result.text', result.text) fails += 1 continue except socket.timeout: fails += 1 print('timeout! fail times:', fails) except: fails += 1 print('fail! fail times:', fails) traceback.print_exc() return text def test_gpu(): print("="*30) import paddle paddle.utils.run_check() # import tensorflow as tf # print("tf gpu", tf.config.list_physical_devices('GPU')) print("="*30) def my_subprocess_call(*popenargs, timeout=None): logging.info("into my_subprocess_call") with Popen(*popenargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: try: for line in p.stdout: print("stdout", line) for line in p.stderr: print("stderr", line) p.wait(timeout=timeout) # p.communicate() return p.pid, p.returncode except: # Including KeyboardInterrupt, wait handled that. p.kill() # We don't call p.wait() again as p.__exit__ does that for us. raise finally: logging.info("out my_subprocess_call") p.kill() def parse_yaml(): yaml_path = os.path.dirname(os.path.abspath(__file__)) + "/interface.yml" with open(yaml_path, "r", encoding='utf-8') as f: cfg = f.read() params = yaml.load(cfg, Loader=yaml.SafeLoader) return params def get_ip_port(node_type=None, interface_type=None): if node_type is None: node_type_list = ["master", "slave"] else: node_type_list = [node_type] if interface_type is None: interface_type_list = ["convert", "ocr", "otr", "office", "path"] else: interface_type_list = [interface_type] ip_port_dict = {} params = parse_yaml() for type1 in node_type_list: node_type = type1.upper() ip_list = params.get(node_type).get("ip") for type2 in interface_type_list: interface_type = type2.upper() processes = 0 python_path = None project_path = None if interface_type in ["convert".upper()]: _port = params.get(node_type).get(interface_type).get("port") if _port is None: port_list = [] else: port_list = [str(_port)] if interface_type == "convert".upper(): processes = params.get(node_type).get(interface_type).get("processes") elif interface_type == "path".upper(): python_path = params.get(node_type).get(interface_type).get("python") project_path = params.get(node_type).get(interface_type).get("project") else: port_start = params.get(node_type).get(interface_type).get("port_start") port_no = params.get(node_type).get(interface_type).get("port_no") if port_start is None or port_no is None: port_list = [] else: port_list = [str(x) for x in range(port_start, port_start+port_no, 1)] if ip_list: for _ip in ip_list: if _ip is None: continue if _ip in ip_port_dict.keys(): if port_list: ip_port_dict.get(_ip).update({interface_type.lower(): port_list}) else: if port_list: ip_port_dict[_ip] = {interface_type.lower(): port_list} if processes: ip_port_dict.get(_ip).update({interface_type.lower()+"_processes": processes}) if project_path and python_path: ip_port_dict.get(_ip).update({"project_path": project_path, "python_path": python_path}) return ip_port_dict def get_intranet_ip(): try: # Create a new socket using the given address family, # socket type and protocol number. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Connect to a remote socket at address. # (The format of address depends on the address family.) address = ("8.8.8.8", 80) s.connect(address) # Return the socket’s own address. # This is useful to find out the port number of an IPv4/v6 socket, for instance. # (The format of the address returned depends on the address family.) sockname = s.getsockname() ip = sockname[0] port = sockname[1] finally: s.close() return ip def memory_decorator(func): @wraps(func) def get_memory_info(*args, **kwargs): if get_platform() == "Windows": return func(*args, **kwargs) # 只有linux有resource包 # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024 start_time = time.time() logging.info("----- memory info start - " + func.__qualname__ + " - " + str(os.getpid()) + " - " + str(round(usage, 2)) + " GB" + " - " + str(round(time.time()-start_time, 2)) + " sec") result = func(*args, **kwargs) # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024 logging.info("----- memory info end - " + func.__qualname__ + " - " + str(os.getpid()) + " - " + str(round(usage, 2)) + " GB" + " - " + str(round(time.time()-start_time, 2)) + " sec") return result return get_memory_info def log(msg): call_func_name = inspect.currentframe().f_back.f_code.co_name logger = get_logger(call_func_name, {"md5": _global.get("md5"), "port": _global.get("port")}) logger.info(msg) # logging.info(msg) def get_logger(_name, _dict): extra = _dict _format = '%(asctime)s - %(name)s - %(levelname)s - %(md5)s - %(port)s - %(message)s' logger = logging.getLogger(_name) create_new_flag = 1 handlers = logger.handlers if handlers: for h in handlers: if h.formatter.__dict__.get("_fmt") == _format: create_new_flag = 0 break if create_new_flag: formatter = logging.Formatter(_format) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) logger.propagate = False logger = logging.LoggerAdapter(logger, extra) return logger def set_flask_global(): # 接口轮询所需锁、参数 ip_port_flag = {} ip_port_dict = get_ip_port() for _k in ip_port_dict.keys(): ip_port_flag.update({_k: {"ocr": 0, "otr": 0, "convert": 0, "office": 0 }}) _global.update({"ip_port_flag": ip_port_flag}) _global.update({"ip_port": ip_port_dict}) # print(globals().get("ip_port")) def get_md5_from_bytes(_bytes): def generate_fp(_b): bio = BytesIO() bio.write(_b) return bio _length = 0 try: _md5 = hashlib.md5() ff = generate_fp(_bytes) ff.seek(0) while True: data = ff.read(4096) if not data: break _length += len(data) _md5.update(data) return _md5.hexdigest(), _length except Exception as e: traceback.print_exc() return None, _length if __name__ == "__main__": # strs = r"D:\Project\temp\04384fcc9e8911ecbd2844f971944973\043876ca9e8911eca5e144f971944973_rar\1624114035529.jpeg" # print(slash_replace(strs)) # from matplotlib import pyplot as plt # import random # fig = plt.figure() # plt.xlim(100) # plt.ylim(100) # fig.add_subplot(111) # x0,y0,x1,y1 = (1,2,3,4) # plt.gca().add_patch(plt.Rectangle(xy=(x0, y0), # width=x1-x0, # height=y1-y0, # edgecolor=(random.randint(0,255)/255,random.randint(0,255)/255,random.randint(0,255)/255), # fill=False, linewidth=2)) # # # plt.show() # import cv2 # import numpy as np # img = np.zeros(shape=(1800,1800),dtype=np.uint8) # img += 255 # cv2.imshow("bbox", img) # cv2.waitKey(0) # print(json.dumps({"data":[1, 2]})) # print(parse_yaml()) print(get_ip_port()) # print(get_intranet_ip())