# -*- coding:utf-8 -*- import argparse import copy import hashlib import inspect import json import os import socket import subprocess import sys from io import BytesIO from subprocess import Popen import cv2 import requests from PIL import Image sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") import difflib import logging import mimetypes import platform import re import traceback import filetype from bs4 import BeautifulSoup import yaml from pdfminer.layout import * from format_convert import _global from functools import wraps import psutil import time import numpy as np from format_convert.judge_platform import get_platform if get_platform() == "Linux": import resource import math def judge_error_code(_list, code=[0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11]): """ [0] : continue [-1]: 逻辑处理错误 [-2]: 接口调用错误 [-3]: 文件格式错误,无法打开 [-4]: 各类文件调用第三方包读取超时 [-5]: 整个转换过程超时 [-6]: 阿里云UDF队列超时 [-7]: 文件需密码,无法打开 [-8]: 调用现成接口报错 [-9]: 接口接收数据为空 """ for c in code: if _list == [c]: return True return False def add_div(text): if text == "" or text is None: return text # if get_platform() == "Windows": # print("add_div", text) if re.findall("
", text): return text text = "
" + text + "\n" text = re.sub("\n", "
", text) # text += "
" if text[-5:] == "
": # print("add_div has cut", text[-30:]) text = text[:-5] return text def get_platform(): sys = platform.system() return sys def get_html_p(html_path): log("into get_html_p") try: with open(html_path, "r") as ff: html_str = ff.read() soup = BeautifulSoup(html_str, 'lxml') text = "" for p in soup.find_all("p"): p_text = p.text p_text = p_text.strip() if p.string != "": text += p_text text += "\n" return text except Exception as e: log("get_html_p error!") return [-1] def string_similarity(str1, str2): # 去掉
和回车 str1 = re.sub("
", "", str1) str1 = re.sub("
", "", str1) str1 = re.sub("\n", "", str1) str2 = re.sub("
", "", str2) str2 = re.sub("
", "", str2) str2 = re.sub("\n", "", str2) # print("********************************") # print("str1", str1) # print("********************************") # print("str2", str2) # print("********************************") score = difflib.SequenceMatcher(None, str1, str2).ratio() print("string_similarity", score) return score def get_sequential_data(text_list, bbox_list, html=False): logging.info("into get_sequential_data") try: text = "" order_list = [] for i in range(len(text_list)): length_start = bbox_list[i][0][0] length_end = bbox_list[i][1][0] height_start = bbox_list[i][0][1] height_end = bbox_list[i][-1][1] # print([length_start, length_end, height_start, height_end]) order_list.append([text_list[i], length_start, length_end, height_start, height_end]) # text = text + infomation['text'] + "\n" if get_platform() == "Windows": print("get_sequential_data", order_list) if not order_list: if get_platform() == "Windows": print("get_sequential_data", "no order list") return "" # 根据bbox的坐标对输出排序 order_list.sort(key=lambda x: (x[3], x[1], x[0])) # 根据bbox分行分列 # col_list = [] # height_end = int((order_list[0][4] + order_list[0][3]) / 2) # for i in range(len(order_list)): # if height_end - threshold <= order_list[i][3] <= height_end + threshold: # col_list.append(order_list[i]) # else: # row_list.append(col_list) # col_list = [] # height_end = int((order_list[i][4] + order_list[i][3]) / 2) # col_list.append(order_list[i]) # if i == len(order_list) - 1: # row_list.append(col_list) row_list = [] used_box = [] threshold = 5 for box in order_list: if box in used_box: continue height_center = (box[4] + box[3]) / 2 row = [] for box2 in order_list: if box2 in used_box: continue height_center2 = (box2[4] + box2[3]) / 2 if height_center - threshold <= height_center2 <= height_center + threshold: if box2 not in row: row.append(box2) used_box.append(box2) row.sort(key=lambda x: x[0]) row_list.append(row) for row in row_list: if not row: continue if len(row) <= 1: text = text + row[0][0] + "\n" else: sub_text = "" row.sort(key=lambda x: x[1]) for col in row: sub_text = sub_text + col[0] + " " sub_text = sub_text + "\n" text += sub_text if html: text = "
" + text text = re.sub("\n", "
\n
", text) text += "
" # if text[-5:] == "
": # text = text[:-5] return text except Exception as e: logging.info("get_sequential_data error!") print("get_sequential_data", traceback.print_exc()) return [-1] # def get_formatted_table(text_list, text_bbox_list, table_bbox_list, split_line): # logging.info("into get_formatted_table") # try: # # 重新定义text_bbox_list,[point, point, text] # text_bbox_list = [[text_bbox_list[i][0], text_bbox_list[i][2], text_list[i]] for i in # range(len(text_bbox_list))] # # 按纵坐标排序 # text_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # # # print("text_bbox_list", text_bbox_list) # # print("table_bbox_list", table_bbox_list) # # # bbox位置 threshold # threshold = 5 # # # 根据split_line分区,可能有个区多个表格 [(), ()] # area_text_bbox_list = [] # area_table_bbox_list = [] # # print("get_formatted_table, split_line", split_line) # for j in range(1, len(split_line)): # last_y = split_line[j - 1][0][1] # current_y = split_line[j][0][1] # temp_text_bbox_list = [] # temp_table_bbox_list = [] # # # 找出该区域下text bbox # for text_bbox in text_bbox_list: # # 计算 text bbox 中心点 # text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, # (text_bbox[1][1] + text_bbox[0][1]) / 2) # if last_y - threshold <= text_bbox_center[1] <= current_y + threshold: # temp_text_bbox_list.append(text_bbox) # area_text_bbox_list.append(temp_text_bbox_list) # # # 找出该区域下table bbox # for table_bbox in table_bbox_list: # # 计算 table bbox 中心点 # table_bbox_center = ((table_bbox[1][0] + table_bbox[0][0]) / 2, # (table_bbox[1][1] + table_bbox[0][1]) / 2) # if last_y < table_bbox_center[1] < current_y: # temp_table_bbox_list.append(table_bbox) # area_table_bbox_list.append(temp_table_bbox_list) # # # for j in range(len(area_text_bbox_list)): # # print("area_text_bbox_list", j, area_text_bbox_list[j]) # # # 对每个区域分别进行两个bbox匹配,生成表格 # area_text_list = [] # area_column_list = [] # for j in range(len(area_text_bbox_list)): # # 每个区域的table bbox 和text bbox # temp_table_bbox_list = area_table_bbox_list[j] # temp_text_bbox_list = area_text_bbox_list[j] # # # 判断该区域有无表格bbox # # 若无表格,将该区域文字连接 # if not temp_table_bbox_list: # # 找出该区域的所有text bbox # only_text_list = [] # only_bbox_list = [] # for text_bbox in temp_text_bbox_list: # only_text_list.append(text_bbox[2]) # only_bbox_list.append([text_bbox[0], text_bbox[1]]) # only_text = get_sequential_data(only_text_list, only_bbox_list, True) # if only_text == [-1]: # return [-1], [-1] # area_text_list.append(only_text) # area_column_list.append(0) # continue # # # 有表格 # # 文本对应的表格格子 # text_in_table = {} # for i in range(len(temp_text_bbox_list)): # text_bbox = temp_text_bbox_list[i] # # # 计算 text bbox 中心点 # text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, # (text_bbox[1][1] + text_bbox[0][1]) / 2) # # # 判断中心点在哪个table bbox中 # for table_bbox in temp_table_bbox_list: # # 中心点在table bbox中,将text写入字典 # if table_bbox[0][0] <= text_bbox_center[0] <= table_bbox[1][0] and \ # table_bbox[0][1] <= text_bbox_center[1] <= table_bbox[1][1]: # if str(table_bbox) in text_in_table.keys(): # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # else: # text_in_table[str(table_bbox)] = text_bbox[2] # break # # # 如果未找到text bbox匹配的table bbox,加大threshold匹配 # # elif (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]): # # if str(table_bbox) in text_in_table.keys(): # # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # # else: # # text_in_table[str(table_bbox)] = text_bbox[2] # # break # # # 对表格格子进行分行分列,并计算总计多少小列 # # 放入坐标 # all_col_list = [] # all_row_list = [] # for i in range(len(temp_table_bbox_list)): # table_bbox = temp_table_bbox_list[i] # # # 放入所有坐标x # if table_bbox[0][0] not in all_col_list: # all_col_list.append(table_bbox[0][0]) # if table_bbox[1][0] not in all_col_list: # all_col_list.append(table_bbox[1][0]) # # # 放入所有坐标y # if table_bbox[0][1] not in all_row_list: # all_row_list.append(table_bbox[0][1]) # if table_bbox[1][1] not in all_row_list: # all_row_list.append(table_bbox[1][1]) # all_col_list.sort(key=lambda x: x) # all_row_list.sort(key=lambda x: x) # # # 分行 # row_list = [] # rows = [] # temp_table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0], x[1][1], x[1][0])) # y_row = temp_table_bbox_list[0][0][1] # for i in range(len(temp_table_bbox_list)): # table_bbox = temp_table_bbox_list[i] # # if y_row - threshold <= table_bbox[0][1] <= y_row + threshold: # rows.append(table_bbox) # else: # y_row = table_bbox[0][1] # if rows: # rows.sort(key=lambda x: x[0][0]) # row_list.append(rows) # rows = [] # rows.append(table_bbox) # # print("*" * 30) # # print(row_list) # # if i == len(temp_table_bbox_list) - 1: # if rows: # rows.sort(key=lambda x: x[0][0]) # row_list.append(rows) # # # 生成表格,包括文字和格子宽度 # area_column = [] # text = '' + "\n" # for row in row_list: # text += "" + "\n" # for col in row: # # 计算bbox y坐标之间有多少其他点,+1即为所占行数 # row_span = 1 # for y in all_row_list: # if col[0][1] < y < col[1][1]: # if y - col[0][1] >= 2 and col[1][1] - y >= 2: # row_span += 1 # # # 计算bbox x坐标之间有多少其他点,+1即为所占列数 # col_span = 1 # for x in all_col_list: # if col[0][0] < x < col[1][0]: # if x - col[0][0] >= 2 and col[1][0] - x >= 2: # col_span += 1 # # text += "" + "\n" # text += "" + "\n" # text += "
" # # if str(col) in text_in_table.keys(): # text += text_in_table.get(str(col)) # else: # text += "" # text += "
" + "\n" # # # 计算最大column # max_col_num = 0 # for row in row_list: # col_num = 0 # for col in row: # col_num += 1 # if max_col_num < col_num: # max_col_num = col_num # # area_text_list.append(text) # area_column_list.append(max_col_num) # # text = "" # if get_platform() == "Windows": # print("get_formatted_table area_text_list", area_text_list) # for area_text in area_text_list: # text += area_text # return text, area_column_list # except Exception as e: # logging.info("get_formatted_table error!") # print("get_formatted_table", traceback.print_exc()) # return [-1], [-1] def rename_inner_files(root_path): try: logging.info("into rename_inner_files") # 获取解压文件夹下所有文件+文件夹,不带根路径 path_list = [] for root, dirs, files in os.walk(root_path, topdown=False): for name in dirs: p = os.path.join(root, name) + os.sep if get_platform() == "Windows": root_path = slash_replace(root_path) p = slash_replace(p) p = re.sub(root_path, "", p) root_path = slash_replace(root_path, True) p = slash_replace(p, True) else: p = re.sub(root_path, "", p) path_list.append(p) for name in files: p = os.path.join(root, name) if get_platform() == "Windows": root_path = slash_replace(root_path) p = slash_replace(p) p = re.sub(root_path, "", p) root_path = slash_replace(root_path, True) p = slash_replace(p, True) else: p = re.sub(root_path, "", p) path_list.append(p) # 按路径长度排序 path_list.sort(key=lambda x: len(x), reverse=True) # 循环改名 for old_path in path_list: # 按路径分隔符分割 ss = old_path.split(os.sep) # 判断是否文件夹 is_dir = 0 file_type = "" if os.path.isdir(root_path + old_path): ss = ss[:-1] is_dir = 1 else: if "." in old_path: file_type = "." + old_path.split(".")[-1] else: file_type = "" # 最后一级需要用hash改名 new_path = "" # new_path = re.sub(ss[-1], str(hash(ss[-1])), old_path) + file_type current_level = 0 for s in ss: # 路径拼接 if current_level < len(ss) - 1: new_path += s + os.sep else: new_path += str(hash(s)) + file_type current_level += 1 new_ab_path = root_path + new_path old_ab_path = root_path + old_path os.rename(old_ab_path, new_ab_path) # 重新获取解压文件夹下所有文件+文件夹 new_path_list = [] for root, dirs, files in os.walk(root_path, topdown=False): for name in dirs: new_path_list.append(os.path.join(root, name) + os.sep) for name in files: new_path_list.append(os.path.join(root, name)) return new_path_list except: traceback.print_exc() return [-1] def judge_format(path): guess1 = mimetypes.guess_type(path) _type = None if guess1[0]: _type = guess1[0] else: guess2 = filetype.guess(path) if guess2: _type = guess2.mime if _type == "application/pdf": return "pdf" if _type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": return "docx" if _type == "application/x-zip-compressed" or _type == "application/zip": return "zip" if _type == "application/x-rar-compressed" or _type == "application/rar": return "rar" if _type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": return "xlsx" if _type == "application/msword": return "doc" if _type == "image/png": return "png" if _type == "image/jpeg": return "jpg" # 猜不到,返回None return None def draw_lines_plt(bboxes): import matplotlib.pyplot as plt plt.figure() for bbox in bboxes: x = [bbox[0],bbox[2]] y = [bbox[1],bbox[3]] plt.plot(x,y) plt.show() def slash_replace(_str, reverse=False): if reverse: _str = eval(repr(_str).replace('/', '\\\\')) else: _str = eval(repr(_str).replace('\\\\', '/')) return _str class LineTable: def recognize_table(self,list_textbox, list_line,sourceP_LB=True): self.list_line = list_line self.list_crosspoints = self.recognize_crosspoints(list_line) # 聚类 cluster_crosspoints = [] for _point in self.list_crosspoints: cluster_crosspoints.append({"lines": _point.get("lines"), "points": [_point]}) while 1: _find = False new_cluster_crosspoints = [] for l_point in cluster_crosspoints: _flag = False for l_n_point in new_cluster_crosspoints: line1 = l_point.get("lines") line2 = l_n_point.get("lines") if len(line1&line2) > 0: _find = True _flag = True l_n_point["lines"] = line1.union(line2) l_n_point["points"].extend(l_point["points"]) if not _flag: new_cluster_crosspoints.append({"lines":l_point.get("lines"),"points":l_point.get("points")}) cluster_crosspoints = new_cluster_crosspoints if not _find: break #need to sort to deal with the inner tables for clu_cp in cluster_crosspoints: points = clu_cp["points"] list_p = np.array([p["point"] for p in points]) max_x = max(list_p[...,0]) min_x = min(list_p[...,0]) max_y = max(list_p[...,1]) min_y = min(list_p[...,1]) _area = (max_y-min_y)*(max_x-min_x) clu_cp["area"] = _area cluster_crosspoints.sort(key=lambda x:x["area"]) list_l_rect = [] for table_crosspoint in cluster_crosspoints: list_rect = self.crosspoint2rect(table_crosspoint.get("points")) list_l_rect.append(list_rect) in_objs = set() list_tables = [] for l_rect in list_l_rect: _ta = self.rect2table(list_textbox,l_rect,in_objs,sourceP_LB=sourceP_LB) if _ta: list_tables.append(_ta) #展示表格及文字 # self._plot(list_line, list_textbox) return list_tables, in_objs, list_l_rect def recognize_table_by_rect(self, list_textbox, list_rect, margin=2): dump_margin = 5 list_rect_tmp = [] # 去重 for _rect in list_rect: if (_rect.bbox[3]-_rect.bbox[1] < 10) or (abs(_rect.bbox[2]-_rect.bbox[0]) < 5): continue _find = False for _tmp in list_rect_tmp: for i in range(4): if abs(_rect.bbox[i]-_tmp.bbox[i]) < dump_margin: pass else: _find = False break if i == 3: _find = True if _find: break if not _find: list_rect_tmp.append(_rect) # print("=====",len(list_rect),len(list_rect_tmp)) # print(list_rect_tmp) # from matplotlib import pyplot as plt # plt.figure() # for _rect in list_rect_tmp: # x0,y0,x1,y1 = _rect.bbox # plt.boxplot(_rect.bbox) # plt.show() cluster_rect = [] for _rect in list_rect: _find = False for cr in cluster_rect: for cr_rect in cr: if abs((cr_rect.bbox[2]-cr_rect.bbox[0]+_rect.bbox[2]-_rect.bbox[0])-(max(cr_rect.bbox[2],_rect.bbox[2])-min(cr_rect.bbox[0],_rect.bbox[0])))0: _find = True _flag = True l_n_point["lines"] = line1.union(line2) l_n_point["points"].extend(l_point["points"]) if not _flag: new_cluster_crosspoints.append({"lines":l_point.get("lines"),"points":l_point.get("points")}) cluster_crosspoints = new_cluster_crosspoints if not _find: break list_crosspoints = [] for list_cp in cluster_crosspoints: points = list_cp.get("points") l_lines = [] for p in points: l_lines.extend(p.get("p_lines")) l_lines = list(set(l_lines)) l_lines.sort(key=lambda x:x[0]) min_x,_count = getMaxPoints([l[0] for l in l_lines],reverse=False) if _count<=2: min_x = None min_y,_count = getMaxPoints([l[1] for l in l_lines],reverse=False) if _count<2: min_y = None max_x,_count = getMaxPoints([l[2] for l in l_lines],reverse=True) if _count<=2: max_x = None max_y,_count = getMaxPoints([l[3] for l in l_lines],reverse=True) if _count<=2: max_y = None if min_x and min_y and max_x and max_y: points.sort(key=lambda x:x["point"][0]) if abs(min_x-points[0]["point"][0])>30: _line = LTLine(1,(min_x,min_y),(min_x,max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====",_line.bbox) if abs(max_x-points[-1]["point"][0])>30: _line = LTLine(1,(max_x,min_y),(max_x,max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====1",_line.bbox) points.sort(key=lambda x:x["point"][1]) if abs(min_y-points[0]["point"][1])>30: _line = LTLine(1,(min_x,min_y),(max_x,min_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====2",_line.bbox) if abs(max_y-points[-1]["point"][1])>30: _line = LTLine(1,(min_x,max_y),(max_x,max_y)) list_line.append(_line) l_lines.append(_line.bbox) # print("add=====2",_line.bbox) for _i in range(len(l_lines)): for _j in range(len(l_lines)): line1 = l_lines[_i] line2 = l_lines[_j] exists,point = self.cross_point(line1,line2) if exists: list_crosspoints.append(point) # from matplotlib import pyplot as plt # plt.figure() # for _line in l_lines: # x0,y0,x1,y1 = _line # plt.plot([x0,x1],[y0,y1]) # for point in list_crosspoints: # plt.scatter(point.get("point")[0],point.get("point")[1]) # plt.show() # print(list_crosspoints) # print("points num",len(list_crosspoints)) return list_crosspoints def recognize_rect(self, _page): list_line = [] for _obj in _page._objs: if isinstance(_obj, (LTLine)): list_line.append(_obj) list_crosspoints = self.recognize_crosspoints(list_line) #聚类 cluster_crosspoints = [] for _point in list_crosspoints: cluster_crosspoints.append({"lines":_point.get("lines"),"points":[_point]}) while 1: _find = False new_cluster_crosspoints = [] for l_point in cluster_crosspoints: _flag = False for l_n_point in new_cluster_crosspoints: line1 = l_point.get("lines") line2 = l_n_point.get("lines") if len(line1&line2)>0: _find = True _flag = True l_n_point["lines"] = line1.union(line2) l_n_point["points"].extend(l_point["points"]) if not _flag: new_cluster_crosspoints.append({"lines":l_point.get("lines"),"points":l_point.get("points")}) cluster_crosspoints = new_cluster_crosspoints if not _find: break # print(len(cluster_crosspoints)) list_l_rect = [] for table_crosspoint in cluster_crosspoints: list_rect = self.crosspoint2rect(table_crosspoint.get("points")) list_l_rect.append(list_rect) return list_l_rect def crosspoint2rect(self, list_crosspoint, margin=10): dict_line_points = {} for _point in list_crosspoint: lines = list(_point.get("lines")) for _line in lines: if _line not in dict_line_points: dict_line_points[_line] = {"direct":None,"points":[]} dict_line_points[_line]["points"].append(_point) # 排序 for k, v in dict_line_points.items(): list_x = [] list_y = [] for _p in v["points"]: list_x.append(_p.get("point")[0]) list_y.append(_p.get("point")[1]) if max(list_x)-min(list_x)>max(list_y)-min(list_y): v.get("points").sort(key=lambda x:x.get("point")[0]) v["direct"] = "row" else: v.get("points").sort(key=lambda x:x.get("point")[1]) v["direct"] = "column" list_rect = [] for _point in list_crosspoint: if _point["buttom"]>=margin and _point["right"]>=margin: lines = list(_point.get("lines")) _line = lines[0] if dict_line_points[_line]["direct"]=="column": _line = lines[1] next_point = None for p1 in dict_line_points[_line]["points"]: if p1["buttom"]>=margin and p1["point"][0]>_point["point"][0]: next_point = p1 break if not next_point: continue lines = list(next_point.get("lines")) _line = lines[0] if dict_line_points[_line]["direct"]=="row": _line = lines[1] final_point = None for p1 in dict_line_points[_line]["points"]: if p1["left"]>=margin and p1["point"][1]>next_point["point"][1]: final_point = p1 break if not final_point: continue _r = LTRect(1,(_point["point"][0],_point["point"][1],final_point["point"][0],final_point["point"][1])) list_rect.append(_r) tmp_rect = [] set_bbox = set() for _r in list_rect: _bbox = "%.2f-%.2f-%.2f-%.2f"%_r.bbox width = _r.bbox[2]-_r.bbox[0] height = _r.bbox[3]-_r.bbox[1] if width<=margin or height<=margin: continue if _bbox not in set_bbox: tmp_rect.append(_r) set_bbox.add(_bbox) list_rect = tmp_rect # import cv2 # import numpy as np # import random # img = np.zeros(shape=(1000,1000),dtype=np.uint8) # img += 255 # # color = [] # for rect in list_rect: # color += 10 # x0,y0,x1,y1 = rect.bbox # x0 *= 10/18 # y0 *= 10/18 # x1 *= 10/18 # y1 *= 10/18 # print(rect.bbox) # cv2.rectangle(img, (int(x0),int(y0)),(int(x1),int(y1)), (color%255, (color+10)%255, (color+20)%255), 3) # cv2.imshow("bbox", img) # cv2.waitKey(0) return list_rect def cross_point(self, line1, line2, segment=True, margin=2): point_is_exist = False x = y = 0 x1, y1, x2, y2 = line1 x3, y3, x4, y4 = line2 if (x2 - x1) == 0: k1 = None b1 = 0 else: k1 = (y2 - y1) * 1.0 / (x2 - x1) # 计算k1,由于点均为整数,需要进行浮点数转化 b1 = y1 * 1.0 - x1 * k1 * 1.0 # 整型转浮点型是关键 if (x4 - x3) == 0: # L2直线斜率不存在 k2 = None b2 = 0 else: k2 = (y4 - y3) * 1.0 / (x4 - x3) # 斜率存在 b2 = y3 * 1.0 - x3 * k2 * 1.0 if k1 is None: if not k2 is None: x = x1 y = k2 * x1 + b2 point_is_exist = True elif k2 is None: x = x3 y = k1 * x3 + b1 elif not k2 == k1: x = (b2 - b1) * 1.0 / (k1 - k2) y = k1 * x * 1.0 + b1 * 1.0 point_is_exist = True left = 0 right = 0 top = 0 buttom = 0 if point_is_exist: if segment: if x>=(min(x1,x2)-margin) and x<=(max(x1,x2)+margin) and y>=(min(y1,y2)-margin) and y<=(max(y1,y2)+margin): if x>=(min(x3,x4)-margin) and x<=(max(x3,x4)+margin) and y>=(min(y3,y4)-margin) and y<=(max(y3,y4)+margin): point_is_exist = True left = abs(min(x1,x3)-x) right = abs(max(x2,x4)-x) top = abs(min(y1,y3)-y) buttom = abs(max(y2,y4)-y) else: point_is_exist = False else: point_is_exist = False line1_key = "%.2f-%.2f-%.2f-%.2f"%(x1, y1, x2, y2) line2_key = "%.2f-%.2f-%.2f-%.2f"%(x3, y3, x4, y4) return point_is_exist, {"point": [x, y], "left": left, "right": right, "top": top, "buttom": buttom, "lines": set([line1_key,line2_key]),"p_lines":[line1,line2]} def unionTable(self, list_table, fixspan=True, margin=2): set_x = set() set_y = set() list_cell = [] for _t in list_table: for _line in _t: list_cell.extend(_line) clusters_rects = [] #根据y1聚类 set_id = set() list_cell_dump = [] for _cell in list_cell: _id = id(_cell) if _id in set_id: continue set_id.add(_id) list_cell_dump.append(_cell) list_cell = list_cell_dump list_cell.sort(key=lambda x:x.get("bbox")[3]) for _rect in list_cell: _y0 = _rect.get("bbox")[3] _find = False for l_cr in clusters_rects: if abs(l_cr[0].get("bbox")[3]-_y0)<2: _find = True l_cr.append(_rect) break if not _find: clusters_rects.append([_rect]) clusters_rects.sort(key=lambda x:x[0].get("bbox")[3],reverse=True) for l_cr in clusters_rects: l_cr.sort(key=lambda x:x.get("bbox")[0]) # print("=============:") # for l_r in clusters_rects: # print(len(l_r)) for _line in clusters_rects: for _rect in _line: (x0,y0,x1,y1) = _rect.get("bbox") set_x.add(x0) set_x.add(x1) set_y.add(y0) set_y.add(y1) if len(set_x)==0 or len(set_y)==0: return list_x = list(set_x) list_y = list(set_y) list_x.sort(key=lambda x:x) list_y.sort(key=lambda x:x,reverse=True) _table = [] line_i = 0 for _line in clusters_rects: table_line = [] cell_i = 0 for _rect in _line: (x0,y0,x1,y1) = _rect.get("bbox") _cell = {"bbox":(x0,y0,x1,y1),"rect":_rect.get("rect"),"rowspan":self.getspan(list_y,y0,y1,margin),"columnspan":self.getspan(list_x,x0,x1,margin),"text":_rect.get("text","")} table_line.append(_cell) cell_i += 1 line_i += 1 _table.append(table_line) # print("=====================>>") # for _line in _table: # for _cell in _line: # print(_cell,end="\t") # print("\n") # print("=====================>>") # print(_table) if fixspan: for _line in _table: extend_line = [] for c_i in range(len(_line)): _cell = _line[c_i] if _cell.get("columnspan")>1: _cospan = _cell.get("columnspan") _cell["columnspan"] = 1 for i in range(1,_cospan): extend_line.append({"index":c_i+1,"cell":_cell}) extend_line.sort(key=lambda x:x["index"],reverse=True) for _el in extend_line: _line.insert(_el["index"],_el["cell"]) for l_i in range(len(_table)): _line = _table[l_i] for c_i in range(len(_line)): _cell = _line[c_i] if _cell.get("rowspan")>1: _rospan = _cell.get("rowspan") _cell["rowspan"] = 1 for i in range(1,_rospan): _table[l_i+i].insert(c_i,_cell) table_bbox = (_table[0][0].get("bbox")[0],_table[0][0].get("bbox")[1],_table[-1][-1].get("bbox")[2],_table[-1][-1].get("bbox")[3]) ta = {"bbox":table_bbox,"table":_table} return ta #获取点阵 def getSpanLocation(self,_list, x0, x1, margin): list_location = [] (x0,x1) = (min(x0,x1),max(x0,x1)) for _x in _list: if _x>=(x0-margin) and _x<=(x1+margin): list_location.append(_x) return list_location def fixSpan(self,_table,list_x,list_y,sourceP_LB): def checkPosition(_line,_position,bbox,margin=5): #check y if len(_line)>0: _bbox = _line[0].get("bbox") if abs(min(_bbox[1],_bbox[3])-min(bbox[1],bbox[3]))>margin or abs(max(_bbox[1],_bbox[3])-max(bbox[1],bbox[3]))>margin: print("check position y false") return False #check x if _position<=len(_line)-1: after_bbox = _line[_position].get("bbox") # the insert bbox.x1 should not less then the after bbox.x0 if not (after_bbox[0]>=bbox[2]): print("check position x after false") return False if _position-1>0 and _position-1=before_bbox[2]): print("check position x before false") return False return True #拓展columnspan的数据 for _line in _table: c_i = 0 while c_i1: x0,y0,x1,y1 = _cell.get("bbox") _cospan = _cell.get("columnspan") locations = self.getSpanLocation(list_x,x0,x1,10) if len(locations)==_cospan+1: _cell["bbox"] = (x0,y0,locations[1],y1) _cell["columnspan"] = 1 #len(locations)==_colspan+1 for i in range(1,_cospan): n_cell = {} n_cell.update(_cell) n_cell["bbox"] = (locations[i],y0,locations[i+1],y1) c_i += 1 #check the position if checkPosition(_line,c_i,n_cell["bbox"]): _line.insert(c_i,n_cell) c_i += 1 #拓展rowspan的数据 for l_i in range(len(_table)): _line = _table[l_i] c_i = 0 while c_i1: x0,y0,x1,y1 = _cell.get("bbox") _rospan = _cell.get("rowspan") locations = self.getSpanLocation(list_y,y0,y1,10) if len(locations)==_rospan+1: _cell["bbox"] = (x0,y0,x1,locations[1]) _cell["rowspan"] = 1 for i in range(1,_rospan): n_cell = {} n_cell.update(_cell) if l_i+i<=len(_table)-1: # print(len(_table),l_i+i) n_cell["bbox"] = (x0,locations[i],x1,locations[i+1]) if checkPosition(_table[l_i+i],c_i,n_cell["bbox"]): _table[l_i+i].insert(c_i,n_cell) c_i += 1 def fixRect(self,_table,list_x,list_y,sourceP_LB,margin): self.fixSpan(_table,list_x,list_y,sourceP_LB) # for line_i in range(len(_table)): # for cell_i in range(len(_table[line_i])): # _cell = _table[line_i][cell_i] # print(line_i,cell_i,_cell["bbox"],_cell["text"]) for _line in _table: extend_line = [] for c_i in range(len(_line)): c_cell = _line[c_i] #first cell missing if c_i==0 and c_cell["bbox"][0]!=list_x[0]: _bbox = (list_x[0],c_cell["bbox"][1], c_cell["bbox"][0],c_cell["bbox"][3]) _cell = {"bbox": _bbox, "rect": LTRect(1,_bbox), "rowspan": self.getspan(list_y,_bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index":c_i,"cell":_cell}) #cell in the median missing if c_imargin: _bbox = (_bbox[2],_bbox[1], n_bbox[0],_bbox[3]) _cell = {"bbox": _bbox, "rect": LTRect(1,_bbox), "rowspan": self.getspan(list_y,_bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index":c_i+1,"cell":_cell}) #last cell missing if c_i==len(_line)-1: if abs(c_cell["bbox"][2]-list_x[-1])>margin: _bbox = (c_cell["bbox"][2],c_cell["bbox"][1], list_x[-1],c_cell["bbox"][3]) _cell = {"bbox": _bbox, "rect": LTRect(1,_bbox), "rowspan": self.getspan(list_y,_bbox[1], _bbox[3], margin), "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin), "text": ""} extend_line.append({"index":c_i+1,"cell":_cell}) extend_line.sort(key=lambda x: x["index"],reverse=True) for _tmp in extend_line: _line.insert(_tmp["index"],_tmp["cell"]) def feedText2table(self,_table,list_textbox,in_objs,sourceP_LB): #find the suitable cell of the textbox list_cells = [] for table_line in _table: for _cell in table_line: list_cells.append({"cell":_cell,"inbox_textbox_list":[]}) for textbox in list_textbox: list_iou = [] for _d in list_cells: _cell = _d["cell"] _iou = self.getIOU(textbox.bbox,_cell["bbox"]) list_iou.append(_iou) max_iou_index = np.argmax(list_iou) max_iou = list_iou[max_iou_index] if max_iou>0.1 and textbox not in in_objs: list_cells[max_iou_index]["inbox_textbox_list"].append(textbox) in_objs.add(textbox) has_matched_box_list = [] for _d in list_cells: _cell = _d["cell"] inbox_textbox_list = _d["inbox_textbox_list"] # 分行,根据y重合 all_match_box_list = [] inbox_textbox_list.sort(key=lambda x:x.bbox[1],reverse=sourceP_LB) for i in range(len(inbox_textbox_list)): match_box_list = [] box1 = inbox_textbox_list[i] if box1 in has_matched_box_list: continue min_y1 = box1.bbox[1] + 1/3 * abs(box1.bbox[3]-box1.bbox[1]) max_y1 = box1.bbox[3] - 1/3 * abs(box1.bbox[3]-box1.bbox[1]) match_box_list.append([box1.get_text(), box1.bbox[0], box1.bbox[1], box1.bbox[2], box1.bbox[3],min_y1,max_y1]) has_matched_box_list.append(box1) for j in range(i+1, len(inbox_textbox_list)): box2 = inbox_textbox_list[j] if box2 in has_matched_box_list: continue # print(min_y1, box2.bbox[1], box2.bbox[3], max_y1) # print(min_y2, box1.bbox[3], max_y2) if min_y1 <= box2.bbox[1] <= max_y1 or \ min_y1 <= box2.bbox[3] <= max_y1 or \ box2.bbox[1] <= min_y1 <= max_y1 <= box2.bbox[3]: match_box_list.append([box2.get_text(), box2.bbox[0], box2.bbox[1], box2.bbox[2], box2.bbox[3],min_y1,max_y1]) has_matched_box_list.append(box2) match_box_list.sort(key=lambda x: x[1]) all_match_box_list.append(match_box_list) # print("match_box_list", all_match_box_list) all_match_box_list.sort(key=lambda x:(round(x[0][2]+x[0][4])/2,0),reverse=sourceP_LB) for box_list in all_match_box_list: for box in box_list: _cell["text"] += re.sub("\s",'',box[0]) def makeTableByRect(self,list_rect,margin,sourceP_LB): _table = [] set_x = set() set_y = set() clusters_rects = [] # 根据y1聚类 if sourceP_LB: list_rect.sort(key=lambda x:x.bbox[3]) for _rect in list_rect: _y0 = _rect.bbox[3] _y1 = _rect.bbox[1] _find = False for l_cr in clusters_rects: if abs(l_cr[0].bbox[3]-_y0)>>>>>>>>>>>>") # for c in clusters_rects: # print("+"*30) # for cc in c: # print("rect", cc.) # cul spans for _line in clusters_rects: for _rect in _line: (x0,y0,x1,y1) = _rect.bbox set_x.add(x0) set_x.add(x1) set_y.add(y0) set_y.add(y1) if len(set_x)==0 or len(set_y)==0: return None,[],[] if len(list_rect)<=1: return None,[],[] list_x = list(set_x) list_y = list(set_y) list_x.sort(key=lambda x:x) list_y.sort(key=lambda x:x,reverse=sourceP_LB) # print("clusters_rects", len(clusters_rects)) if sourceP_LB: clusters_rects.sort(key=lambda x:(x[0].bbox[1]+x[0].bbox[3])/2,reverse=sourceP_LB) clusters_rects.sort(key=lambda x:(x[0].bbox[1]+x[0].bbox[3])/2,reverse=sourceP_LB) for l_cr in clusters_rects: l_cr.sort(key=lambda x:x.bbox[0]) pop_x = [] for i in range(len(list_x)-1): _i = len(list_x)-i-1 l_i = _i-1 if abs(list_x[_i]-list_x[l_i])<5: pop_x.append(_i) pop_x.sort(key=lambda x:x,reverse=True) for _x in pop_x: list_x.pop(_x) # pop_x = [] for i in range(len(list_y)-1): _i = len(list_y)-i-1 l_i = _i-1 if abs(list_y[_i]-list_y[l_i])<5: pop_x.append(_i) pop_x.sort(key=lambda x:x,reverse=True) for _x in pop_x: list_y.pop(_x) print("list_x",list_x) print("list_y",list_y) line_i = 0 for _line in clusters_rects: table_line = [] cell_i = 0 for _rect in _line: (x0, y0, x1, y1) = _rect.bbox _cell = {"bbox": (x0, y0, x1, y1), "rect": _rect, "rowspan": self.getspan(list_y, y0, y1, margin), "columnspan": self.getspan(list_x, x0, x1, margin), "text": ""} cell_i += 1 table_line.append(_cell) line_i += 1 _table.append(table_line) return _table,list_x,list_y def rect2table(self, list_textbox, list_rect, in_objs, margin=5, sourceP_LB=True): def getIOU(bbox0,bbox1): width = max(bbox0[2],bbox1[2])-min(bbox0[0],bbox1[0])-(bbox0[2]-bbox0[0]+bbox1[2]-bbox1[0]) height = max(bbox0[3],bbox1[3])-min(bbox0[1],bbox1[1])-(bbox0[3]-bbox0[1]+bbox1[3]-bbox1[1]) if width<0 and height<0: return abs(width*height/min(abs((bbox0[2]-bbox0[0])*(bbox0[3]-bbox0[1])),abs((bbox1[2]-bbox1[0])*(bbox1[3]-bbox1[1])))) return 0 _table,list_x,list_y = self.makeTableByRect(list_rect,margin,sourceP_LB) if _table is None: return self.feedText2table(_table,list_textbox,in_objs,sourceP_LB) # print("table===========================>") # for _line in _table: # for _cell in _line: # print("||%d%d"%(_cell["rowspan"],_cell["columnspan"]),end="\t") # print() # print("table===========================>") # # print("------------") # for _line in _table: # for _cell in _line: # print(_cell["text"],end="\t") # print("\n") # print("------------") self.fixRect(_table,list_x,list_y,sourceP_LB,margin) self.feedText2table(_table,list_textbox,in_objs,sourceP_LB) table_bbox = (_table[0][0].get("bbox")[0], _table[0][0].get("bbox")[1], _table[-1][-1].get("bbox")[2], _table[-1][-1].get("bbox")[3]) # print("=======") # for _line in _table: # for _cell in _line: # print(_cell["text"]) # print("\n") # print("===========") ta = {"bbox": table_bbox, "table": _table} return ta def inbox(self, bbox0, bbox_g, text=""): # if bbox_g[0]<=bbox0[0] and bbox_g[1]<=bbox0[1] and bbox_g[2]>=bbox0[2] and bbox_g[3]>=bbox0[3]: # return 1 # print("utils inbox", text, self.getIOU(bbox0,bbox_g), bbox0, bbox_g) if self.getIOU(bbox0,bbox_g)>0.2: return 1 return 0 def getIOU(self, bbox0, bbox1): width = abs(max(bbox0[2],bbox1[2])-min(bbox0[0],bbox1[0]))-(abs(bbox0[2]-bbox0[0])+abs(bbox1[2]-bbox1[0])) height = abs(max(bbox0[3],bbox1[3])-min(bbox0[1],bbox1[1]))-(abs(bbox0[3]-bbox0[1])+abs(bbox1[3]-bbox1[1])) if width < 0 and height < 0: iou = abs(width*height/min(abs((bbox0[2]-bbox0[0])*(bbox0[3]-bbox0[1])), abs((bbox1[2]-bbox1[0])*(bbox1[3]-bbox1[1])))) # print("getIOU", iou) return iou return 0 def getspan(self, _list, x0, x1, margin): _count = 0 (x0,x1) = (min(x0,x1),max(x0,x1)) for _x in _list: if _x>=(x0-margin) and _x<=(x1+margin): _count += 1 return _count-1 def _plot(self, list_line, list_textbox): from matplotlib import pyplot as plt plt.figure() for _line in list_line: x0, y0, x1, y1 = _line.__dict__.get("bbox") plt.plot([x0, x1], [y0, y1]) for _line in list_line: x0, y0, x1, y1 = _line.bbox plt.plot([x0, x1], [y0, y1]) # for point in list_crosspoints: # plt.scatter(point.get("point")[0],point.get("point")[1]) for textbox in list_textbox: x0, y0, x1, y1 = textbox.bbox plt.plot([x0, x1], [y0, y1]) plt.show() def get_table_html(table): html_text = '' for row in table: html_text += "" for col in row: row_span = col.get("rowspan") col_span = col.get("columnspan") bbox_text = col.get("text") html_text += "" html_text += "" html_text += "
" html_text += bbox_text + "
" return html_text def sort_object(obj_list, is_reverse=False): from format_convert.convert_tree import _Table, _Image, _Sentence, _Page obj_list = combine_object(obj_list) if len(obj_list) == 0: return obj_list if isinstance(obj_list[0], (_Table, _Sentence, _Image)): obj_list.sort(key=lambda x: (x.y, x.x), reverse=is_reverse) return obj_list elif isinstance(obj_list[0], _Page): obj_list.sort(key=lambda x: x.page_no) return obj_list else: return obj_list def combine_object(obj_list, threshold=5): from format_convert.convert_tree import _Sentence sentence_list = [] for obj in obj_list: if isinstance(obj, _Sentence): obj.content = re.sub("\s", "", obj.content) sentence_list.append(obj) sentence_list.sort(key=lambda x: (x.y, x.x)) for sen in sentence_list: obj_list.remove(sen) delete_list = [] for i in range(1, len(sentence_list)): sen1 = sentence_list[i-1] sen2 = sentence_list[i] if abs(sen2.y - sen1.y) <= threshold: if sen2.x > sen1.x: sen2.x = sen1.x sen2.content = sen1.content + sen2.content else: sen2.content = sen2.content + sen1.content if sen2.y > sen1.y: sen2.y = sen1.y delete_list.append(sen1) for sen in delete_list: sentence_list.remove(sen) for sen in sentence_list: obj_list.append(sen) return obj_list session_ocr = requests.Session() session_otr = requests.Session() session_all = requests.Session() def request_post(url, param, time_out=1000, use_zlib=False): fails = 0 text = json.dumps([-2]) while True: try: if fails >= 1: break headers = {'content-type': 'application/json'} # result = requests.post(url, data=param, timeout=time_out) if param.get("model_type") == "ocr": result = session_ocr.post(url, data=param, timeout=time_out) elif param.get("model_type") == "otr": result = session_otr.post(url, data=param, timeout=time_out) else: result = session_all.post(url, data=param, timeout=time_out) # print('result.status_code', result.status_code) # print('result.text', result.text) if result.status_code == 200: text = result.text break else: print('result.status_code', result.status_code) print('result.text', result.text) fails += 1 continue except socket.timeout: fails += 1 print('timeout! fail times:', fails) except: fails += 1 print('fail! fail times:', fails) traceback.print_exc() return text def test_gpu(): print("="*30) import paddle paddle.utils.run_check() # import tensorflow as tf # print("tf gpu", tf.config.list_physical_devices('GPU')) print("="*30) def my_subprocess_call(*popenargs, timeout=None): logging.info("into my_subprocess_call") with Popen(*popenargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p: try: for line in p.stdout: print("stdout", line) for line in p.stderr: print("stderr", line) p.wait(timeout=timeout) # p.communicate() return p.pid, p.returncode except: # Including KeyboardInterrupt, wait handled that. p.kill() # We don't call p.wait() again as p.__exit__ does that for us. raise finally: logging.info("out my_subprocess_call") p.kill() def parse_yaml(): yaml_path = os.path.dirname(os.path.abspath(__file__)) + "/interface.yml" with open(yaml_path, "r", encoding='utf-8') as f: cfg = f.read() params = yaml.load(cfg, Loader=yaml.SafeLoader) return params def get_ip_port(node_type=None, interface_type=None): if node_type is None: node_type_list = ["master", "slave"] else: node_type_list = [node_type] if interface_type is None: interface_type_list = ["convert", "ocr", "otr", "office", "path"] else: interface_type_list = [interface_type] ip_port_dict = {} params = parse_yaml() # 循环 master slave for type1 in node_type_list: node_type = type1.upper() ip_list = params.get(node_type).get("ip") # 循环多个IP for j in range(len(ip_list)): _ip = ip_list[j] if ip_port_dict.get(_ip): ip_port_dict.get(_ip).update({node_type: {}}) else: ip_port_dict.update({_ip: {node_type: {}}}) # 有IP时,循环多个参数 for type2 in interface_type_list: python_path = None project_path = None gunicorn_path = None processes = 0 port_list = [] interface_type = type2.upper() if interface_type in ["convert".upper()]: _port = params.get(node_type).get(interface_type).get("port") if _port is None: port_list = [] else: if interface_type == "convert".upper(): processes = params.get(node_type).get(interface_type).get("processes")[j] port_list = [str(_port[j])]*int(processes) # port_list = [str(_port)] elif interface_type == "path".upper(): python_path = params.get(node_type).get(interface_type).get("python")[j] project_path = params.get(node_type).get(interface_type).get("project")[j] gunicorn_path = params.get(node_type).get(interface_type).get("gunicorn")[j] else: port_start = params.get(node_type).get(interface_type).get("port_start") port_no = params.get(node_type).get(interface_type).get("port_no") if port_start is None or port_no is None: port_list = [] else: port_list = [str(x) for x in range(port_start[j], port_start[j]+port_no[j], 1)] # if ip_list: # for i in range(len(ip_list)): # 参数放入dict if port_list: ip_port_dict.get(_ip).get(node_type).update({interface_type.lower(): port_list}) if processes: ip_port_dict.get(_ip).get(node_type).update({interface_type.lower()+"_processes": processes}) if project_path and python_path and gunicorn_path: ip_port_dict.get(_ip).get(node_type).update({"project_path": project_path, "python_path": python_path, "gunicorn_path": gunicorn_path}) # print("ip_port_dict", ip_port_dict) return ip_port_dict def get_ip_port_old(node_type=None, interface_type=None): if node_type is None: node_type_list = ["master", "slave"] else: node_type_list = [node_type] if interface_type is None: interface_type_list = ["convert", "ocr", "otr", "office", "path"] else: interface_type_list = [interface_type] ip_port_dict = {} params = parse_yaml() for type1 in node_type_list: node_type = type1.upper() ip_list = params.get(node_type).get("ip") for type2 in interface_type_list: interface_type = type2.upper() processes = 0 python_path = None project_path = None if interface_type in ["convert".upper()]: _port = params.get(node_type).get(interface_type).get("port") if _port is None: port_list = [] else: if interface_type == "convert".upper(): processes = params.get(node_type).get(interface_type).get("processes") port_list = [str(_port)]*int(processes) # port_list = [str(_port)] elif interface_type == "path".upper(): python_path = params.get(node_type).get(interface_type).get("python") project_path = params.get(node_type).get(interface_type).get("project") else: port_start = params.get(node_type).get(interface_type).get("port_start") port_no = params.get(node_type).get(interface_type).get("port_no") if port_start is None or port_no is None: port_list = [] else: port_list = [str(x) for x in range(port_start, port_start+port_no, 1)] if ip_list: for _ip in ip_list: if _ip is None: continue if _ip in ip_port_dict.keys(): if port_list: ip_port_dict.get(_ip).update({interface_type.lower(): port_list}) else: if port_list: ip_port_dict[_ip] = {interface_type.lower(): port_list} if processes: ip_port_dict.get(_ip).update({interface_type.lower()+"_processes": processes}) if project_path and python_path: ip_port_dict.get(_ip).update({"project_path": project_path, "python_path": python_path}) return ip_port_dict def get_intranet_ip(): try: # Create a new socket using the given address family, # socket type and protocol number. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # Connect to a remote socket at address. # (The format of address depends on the address family.) address = ("8.8.8.8", 80) s.connect(address) # Return the socket’s own address. # This is useful to find out the port number of an IPv4/v6 socket, for instance. # (The format of the address returned depends on the address family.) sockname = s.getsockname() ip = sockname[0] port = sockname[1] finally: s.close() return ip def get_all_ip(): if get_platform() == "Windows": ips = ['127.0.0.1'] else: ips = [ip.split('/')[0] for ip in os.popen("ip addr | grep 'inet '|awk '{print $2}'").readlines()] for i in range(len(ips)): ips[i] = "http://" + ips[i] return ips def get_using_ip(): ip_port_dict = get_ip_port() ips = get_all_ip() ip = "http://127.0.0.1" for key in ip_port_dict.keys(): if key in ips: ip = key break return ip def memory_decorator(func): @wraps(func) def get_memory_info(*args, **kwargs): if get_platform() == "Windows": return func(*args, **kwargs) # 只有linux有resource包 # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024 start_time = time.time() logging.info("----- memory info start - " + func.__qualname__ + " - " + str(os.getpid()) + " - " + str(round(usage, 2)) + " GB" + " - " + str(round(time.time()-start_time, 2)) + " sec") result = func(*args, **kwargs) # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024 logging.info("----- memory info end - " + func.__qualname__ + " - " + str(os.getpid()) + " - " + str(round(usage, 2)) + " GB" + " - " + str(round(time.time()-start_time, 2)) + " sec") return result return get_memory_info def log(msg): call_func_name = inspect.currentframe().f_back.f_code.co_name logger = get_logger(call_func_name, {"md5": _global.get("md5"), "port": _global.get("port")}) logger.info(msg) # logging.info(msg) def get_logger(_name, _dict): extra = _dict _format = '%(asctime)s - %(name)s - %(levelname)s - %(md5)s - %(port)s - %(message)s' logger = logging.getLogger(_name) create_new_flag = 1 handlers = logger.handlers if handlers: for h in handlers: if h.formatter.__dict__.get("_fmt") == _format: create_new_flag = 0 break if create_new_flag: formatter = logging.Formatter(_format) handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) logger.propagate = False logger = logging.LoggerAdapter(logger, extra) return logger def set_flask_global(): # 接口轮询所需锁、参数 ip_port_flag = {} ip_flag = [] ip_port_dict = get_ip_port() for _k in ip_port_dict.keys(): ip_port_flag.update({_k: {"ocr": 0, "otr": 0, "convert": 0, "office": 0 }}) if ip_port_dict.get(_k).get("MASTER"): ip_flag.append([_k+"_master", 0]) if ip_port_dict.get(_k).get("SLAVE"): ip_flag.append([_k+"_slave", 0]) _global.update({"ip_port_flag": ip_port_flag}) _global.update({"ip_port": ip_port_dict}) _global.update({"ip_flag": ip_flag}) # print(globals().get("ip_port")) def get_md5_from_bytes(_bytes): def generate_fp(_b): bio = BytesIO() bio.write(_b) return bio _length = 0 try: _md5 = hashlib.md5() ff = generate_fp(_bytes) ff.seek(0) while True: data = ff.read(4096) if not data: break _length += len(data) _md5.update(data) return _md5.hexdigest(), _length except Exception as e: traceback.print_exc() return None, _length # def to_share_memory(np_data, name=None): # # from multiprocessing.resource_tracker import unregister # from multiprocessing import shared_memory # if name is None: # sm_name = "psm_" + str(os.getpid()) # else: # sm_name = name # logging.info("into from_share_memory sm_name " + sm_name) # shm = shared_memory.SharedMemory(name=sm_name, create=True, size=np_data.nbytes) # # unregister(sm_name, 'shared_memory') # sm_data = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf) # sm_data[:] = np_data[:] # Copy the original data into shared memory # # shm.close() # del sm_data # return shm # def from_share_memory(sm_name, _shape, _dtype, if_close=True): # from multiprocessing import shared_memory # logging.info("into from_share_memory sm_name " + sm_name) # shm = shared_memory.SharedMemory(name=sm_name, create=False) # b = np.ndarray(_shape, dtype=_dtype, buffer=shm.buf) # sm_data = copy.deepcopy(b) # b[::] = 0 # # if if_close: # try: # shm.close() # shm.unlink() # except Exception: # log("file not found! " + sm_name) # return sm_data # def get_share_memory(sm_name): # try: # from multiprocessing import shared_memory # shm = shared_memory.SharedMemory(name=sm_name, create=False) # return shm # except: # return None # def release_share_memory(shm): # try: # if shm is None: # return # shm.close() # shm.unlink() # log(str(shm.name) + " release successfully!") # except FileNotFoundError: # log(str(shm.name) + " has released!") # except Exception as e: # traceback.print_exc() # def get_share_memory_list(sm_list_name, list_size=None): # # from multiprocessing.resource_tracker import unregister # from multiprocessing import shared_memory # if list_size is None: # sm_list = shared_memory.ShareableList(name=sm_list_name) # else: # sm_list = shared_memory.ShareableList(name=sm_list_name, sequence=["0"]+[' '*2048]*(list_size-2)+["0"]) # # unregister(sm_list_name, 'shared_memory') # return sm_list # def close_share_memory_list(sm_list): # try: # sm_list.shm.close() # except Exception: # traceback.print_exc() def get_np_type(_str): _dtype = None if _str == 'uint8': _dtype = np.uint8 elif _str == 'float16': _dtype = np.float16 elif _str == 'float32': _dtype = np.float32 logging.info("get_np_type " + _str + " " + str(_dtype)) return _dtype def namespace_to_dict(agrs_or_dict, reverse=False): if reverse: agrs_or_dict = argparse.Namespace(**agrs_or_dict) else: agrs_or_dict = vars(agrs_or_dict) return agrs_or_dict def get_args_from_config(ip_port_dict, ip, arg_type, node_type=None): if node_type is None: node_type = ["MASTER", "SLAVE"] else: node_type = [node_type] arg_list = [] for _type in node_type: if ip_port_dict.get(ip).get(_type): if ip_port_dict.get(ip).get(_type).get(arg_type): arg_list.append(ip_port_dict.get(ip).get(_type).get(arg_type)) return arg_list def remove_red_seal(image_np): """ 去除红色印章 """ cv2.namedWindow("image_np", 0) cv2.resizeWindow("image_np", 1000, 800) cv2.imshow("image_np", image_np) height, width, c = image_np.shape window_h = int(height / 15) image_hsv = cv2.cvtColor(image_np, cv2.COLOR_BGR2HSV) # 遍历numpy red_point_list = [] image_list = image_np.tolist() hsv_dict = {} for index_1 in range(len(image_list)): for index_2 in range(len(image_list[index_1])): h, s, v = image_hsv[index_1][index_2] if (0 <= h <= 10 or 156 <= h <= 180) and 43 <= s <= 255 and 46 <= v <= 255: key = str(image_hsv[index_1][index_2].tolist()) red_point_list.append([key, index_1, index_2]) if hsv_dict.get(key): hsv_dict[key] += 1 else: hsv_dict[key] = 1 # 找出相同最多的hsv值 hsv_most_key = None hsv_most_value = 0 for hsv in hsv_dict.keys(): if hsv_dict.get(hsv) > hsv_most_value: hsv_most_value = hsv_dict.get(hsv) hsv_most_key = hsv # print(hsv_dict) # 根据hsv判断其填充为黑色还是白色 hsv_most_key = eval(hsv_most_key) for point in red_point_list: if abs(eval(point[0])[2] - hsv_most_key[2]) <= 70: image_np[point[1]][point[2]][0] = 255 image_np[point[1]][point[2]][1] = 255 image_np[point[1]][point[2]][2] = 255 else: image_np[point[1]][point[2]][0] = 0 image_np[point[1]][point[2]][1] = 0 image_np[point[1]][point[2]][2] = 0 cv2.namedWindow("remove_red_seal", 0) cv2.resizeWindow("remove_red_seal", 1000, 800) cv2.imshow("remove_red_seal", image_np) # cv2.imwrite("C:/Users/Administrator/Downloads/1.png", image_np) cv2.waitKey(0) return image_np def pil_resize(image_np, height, width): # limit pixels 89478485 if image_np.shape[0] * image_np.shape[1] * image_np.shape[2] >= 89478485: print("image too large, limit 89478485 pixels", image_np.shape) ratio = image_np.shape[0] / image_np.shape[1] if image_np.shape[0] >= image_np.shape[1]: image_np = cv2.resize(image_np, (int(3000/ratio), 3000), interpolation=cv2.INTER_AREA) else: image_np = cv2.resize(image_np, (3000, int(3000*ratio)), interpolation=cv2.INTER_AREA) image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)) image_pil = image_pil.resize((int(width), int(height)), Image.BICUBIC) image_np = cv2.cvtColor(np.asarray(image_pil), cv2.COLOR_RGB2BGR) return image_np def np2pil(image_np): image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)) return image_pil def pil2np(image_pil): image_np = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR) return image_np def bytes2np(_b): try: # 二进制数据流转np.ndarray [np.uint8: 8位像素] image_np = cv2.imdecode(np.frombuffer(_b, np.uint8), cv2.IMREAD_COLOR) # 将rgb转为bgr # image_np = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR) return image_np except cv2.error as e: if "src.empty()" in str(e): log("bytes2np image is empty!") return None except: traceback.print_exc() return None if __name__ == "__main__": # strs = r"D:\Project\temp\04384fcc9e8911ecbd2844f971944973\043876ca9e8911eca5e144f971944973_rar\1624114035529.jpeg" # print(slash_replace(strs)) # from matplotlib import pyplot as plt # import random # fig = plt.figure() # plt.xlim(100) # plt.ylim(100) # fig.add_subplot(111) # x0,y0,x1,y1 = (1,2,3,4) # plt.gca().add_patch(plt.Rectangle(xy=(x0, y0), # width=x1-x0, # height=y1-y0, # edgecolor=(random.randint(0,255)/255,random.randint(0,255)/255,random.randint(0,255)/255), # fill=False, linewidth=2)) # # # plt.show() # import cv2 # import numpy as np # img = np.zeros(shape=(1800,1800),dtype=np.uint8) # img += 255 # cv2.imshow("bbox", img) # cv2.waitKey(0) # print(json.dumps({"data":[1, 2]})) # print(parse_yaml()) print(get_ip_port()) # set_flask_global() # print(get_all_ip()) print(get_args_from_config(get_ip_port(), get_all_ip()[0], "gunicorn_path")) # print(get_args_from_config(get_ip_port(), "http://127.0.0.1", "gunicorn_path")) # print(get_intranet_ip()) # _path = "C:/Users/Administrator/Downloads/3.png" # remove_red_seal(cv2.imread(_path))