123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076 |
- # -*- coding:utf-8 -*-
- import argparse
- import copy
- import hashlib
- import inspect
- import json
- import os
- import socket
- import subprocess
- import sys
- from io import BytesIO
- from subprocess import Popen
- from shapely.geometry import LineString
- import cv2
- import requests
- from PIL import Image
- sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../")
- import difflib
- import logging
- import mimetypes
- import platform
- import re
- import traceback
- import filetype
- from bs4 import BeautifulSoup
- import yaml
- from pdfminer.layout import *
- from format_convert import _global
- from functools import wraps
- import psutil
- import time
- import numpy as np
- from format_convert.judge_platform import get_platform
- if get_platform() == "Linux":
- import resource
- import math
- def judge_error_code(_list, code=[0, -1, -2, -3, -4, -5, -6, -7, -8, -9, -10, -11, -12, -13, -14]):
- """
- [0] : continue
- [-1]: 逻辑处理错误
- [-2]: 接口调用错误
- [-3]: 文件格式错误,无法打开
- [-4]: 各类文件调用第三方包读取超时
- [-5]: 整个转换过程超时
- [-6]: 阿里云UDF队列超时
- [-7]: 文件需密码,无法打开
- [-8]: 调用现成接口报错
- [-9]: 接口接收数据为空
- [-10]: 长图分割报错
- [-11]: 新接口idc、isr、atc报错
- [-12]: 表格跨页连接报错
- [-13]: pdf表格线处理报错
- [-14]: 指定页码报错
- """
- for c in code:
- if isinstance(_list, list) and _list == [c]:
- return True
- return False
- def add_div(text):
- if text == "" or text is None:
- return text
- # if get_platform() == "Windows":
- # print("add_div", text)
- if re.findall("<div>", text):
- return text
- text = "<div>" + text + "\n"
- text = re.sub("\n", "</div><div>", text)
- # text += "</div>"
- if text[-5:] == "<div>":
- # print("add_div has cut", text[-30:])
- text = text[:-5]
- return text
- def get_platform():
- sys = platform.system()
- return sys
- def get_html_p(html_path):
- log("into get_html_p")
- try:
- with open(html_path, "r") as ff:
- html_str = ff.read()
- soup = BeautifulSoup(html_str, 'lxml')
- text = ""
- for p in soup.find_all("p"):
- p_text = p.text
- p_text = p_text.strip()
- if p.string != "":
- text += p_text
- text += "\n"
- return text
- except Exception as e:
- log("get_html_p error!")
- return [-1]
- def string_similarity(str1, str2):
- # 去掉<div>和回车
- str1 = re.sub("<div>", "", str1)
- str1 = re.sub("</div>", "", str1)
- str1 = re.sub("\n", "", str1)
- str2 = re.sub("<div>", "", str2)
- str2 = re.sub("</div>", "", str2)
- str2 = re.sub("\n", "", str2)
- # print("********************************")
- # print("str1", str1)
- # print("********************************")
- # print("str2", str2)
- # print("********************************")
- score = difflib.SequenceMatcher(None, str1, str2).ratio()
- print("string_similarity", score)
- return score
- def get_sequential_data(text_list, bbox_list, html=False):
- logging.info("into get_sequential_data")
- try:
- text = ""
- order_list = []
- for i in range(len(text_list)):
- length_start = bbox_list[i][0][0]
- length_end = bbox_list[i][1][0]
- height_start = bbox_list[i][0][1]
- height_end = bbox_list[i][-1][1]
- # print([length_start, length_end, height_start, height_end])
- order_list.append([text_list[i], length_start, length_end, height_start, height_end])
- # text = text + infomation['text'] + "\n"
- if get_platform() == "Windows":
- print("get_sequential_data", order_list)
- if not order_list:
- if get_platform() == "Windows":
- print("get_sequential_data", "no order list")
- return ""
- # 根据bbox的坐标对输出排序
- order_list.sort(key=lambda x: (x[3], x[1], x[0]))
- # 根据bbox分行分列
- # col_list = []
- # height_end = int((order_list[0][4] + order_list[0][3]) / 2)
- # for i in range(len(order_list)):
- # if height_end - threshold <= order_list[i][3] <= height_end + threshold:
- # col_list.append(order_list[i])
- # else:
- # row_list.append(col_list)
- # col_list = []
- # height_end = int((order_list[i][4] + order_list[i][3]) / 2)
- # col_list.append(order_list[i])
- # if i == len(order_list) - 1:
- # row_list.append(col_list)
- row_list = []
- used_box = []
- threshold = 5
- for box in order_list:
- if box in used_box:
- continue
- height_center = (box[4] + box[3]) / 2
- row = []
- for box2 in order_list:
- if box2 in used_box:
- continue
- height_center2 = (box2[4] + box2[3]) / 2
- if height_center - threshold <= height_center2 <= height_center + threshold:
- if box2 not in row:
- row.append(box2)
- used_box.append(box2)
- row.sort(key=lambda x: x[0])
- row_list.append(row)
- for row in row_list:
- if not row:
- continue
- if len(row) <= 1:
- text = text + row[0][0] + "\n"
- else:
- sub_text = ""
- row.sort(key=lambda x: x[1])
- for col in row:
- sub_text = sub_text + col[0] + " "
- sub_text = sub_text + "\n"
- text += sub_text
- if html:
- text = "<div>" + text
- text = re.sub("\n", "</div>\n<div>", text)
- text += "</div>"
- # if text[-5:] == "<div>":
- # text = text[:-5]
- return text
- except Exception as e:
- logging.info("get_sequential_data error!")
- print("get_sequential_data", traceback.print_exc())
- return [-1]
- def rename_inner_files(root_path):
- try:
- logging.info("into rename_inner_files")
- # 获取解压文件夹下所有文件+文件夹,不带根路径
- path_list = []
- for root, dirs, files in os.walk(root_path, topdown=False):
- for name in dirs:
- p = os.path.join(root, name) + os.sep
- if get_platform() == "Windows":
- root_path = slash_replace(root_path)
- p = slash_replace(p)
- p = re.sub(root_path, "", p)
- root_path = slash_replace(root_path, True)
- p = slash_replace(p, True)
- else:
- p = re.sub(root_path, "", p)
- path_list.append(p)
- for name in files:
- p = os.path.join(root, name)
- if get_platform() == "Windows":
- root_path = slash_replace(root_path)
- p = slash_replace(p)
- p = re.sub(root_path, "", p)
- root_path = slash_replace(root_path, True)
- p = slash_replace(p, True)
- else:
- p = re.sub(root_path, "", p)
- path_list.append(p)
- # 按路径长度排序
- path_list.sort(key=lambda x: len(x), reverse=True)
- # 循环改名
- for old_path in path_list:
- # 按路径分隔符分割
- ss = old_path.split(os.sep)
- # 判断是否文件夹
- is_dir = 0
- file_type = ""
- if os.path.isdir(root_path + old_path):
- ss = ss[:-1]
- is_dir = 1
- else:
- if "." in old_path:
- file_type = "." + old_path.split(".")[-1]
- else:
- file_type = ""
- # 最后一级需要用hash改名
- new_path = ""
- # new_path = re.sub(ss[-1], str(hash(ss[-1])), old_path) + file_type
- current_level = 0
- for s in ss:
- # 路径拼接
- if current_level < len(ss) - 1:
- new_path += s + os.sep
- else:
- new_path += str(hash(s)) + file_type
- current_level += 1
- new_ab_path = root_path + new_path
- old_ab_path = root_path + old_path
- os.rename(old_ab_path, new_ab_path)
- # 重新获取解压文件夹下所有文件+文件夹
- new_path_list = []
- for root, dirs, files in os.walk(root_path, topdown=False):
- for name in dirs:
- new_path_list.append(os.path.join(root, name) + os.sep)
- for name in files:
- new_path_list.append(os.path.join(root, name))
- return new_path_list
- except:
- traceback.print_exc()
- return [-1]
- def judge_format(path):
- guess1 = mimetypes.guess_type(path)
- _type = None
- if guess1[0]:
- _type = guess1[0]
- else:
- guess2 = filetype.guess(path)
- if guess2:
- _type = guess2.mime
- if _type == "application/pdf":
- return "pdf"
- if _type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document":
- return "docx"
- if _type == "application/x-zip-compressed" or _type == "application/zip":
- return "zip"
- if _type == "application/x-rar-compressed" or _type == "application/rar":
- return "rar"
- if _type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet":
- return "xlsx"
- if _type == "application/msword":
- return "doc"
- if _type == "image/png":
- return "png"
- if _type == "image/jpeg":
- return "jpg"
- # 猜不到,返回None
- return None
- def draw_lines_plt(bboxes):
- import matplotlib.pyplot as plt
- plt.figure()
- for bbox in bboxes:
- x = [bbox[0], bbox[2]]
- y = [bbox[1], bbox[3]]
- plt.plot(x, y)
- plt.show()
- def slash_replace(_str, reverse=False):
- if reverse:
- _str = eval(repr(_str).replace('/', '\\\\'))
- else:
- _str = eval(repr(_str).replace('\\\\', '/'))
- return _str
- class LineTable:
- def recognize_table(self, list_textbox, list_line, sourceP_LB=True, splited=False, from_pdf=False):
- self.list_line = list_line
- self.list_crosspoints = self.recognize_crosspoints(list_line)
- self.from_pdf = from_pdf
- self.splited = splited
- self.connect_bbox_list = []
- # 聚类
- cluster_crosspoints = []
- for _point in self.list_crosspoints:
- cluster_crosspoints.append({"lines": _point.get("lines"), "points": [_point]})
- while 1:
- _find = False
- new_cluster_crosspoints = []
- for l_point in cluster_crosspoints:
- _flag = False
- for l_n_point in new_cluster_crosspoints:
- line1 = l_point.get("lines")
- line2 = l_n_point.get("lines")
- if len(line1 & line2) > 0:
- _find = True
- _flag = True
- l_n_point["lines"] = line1.union(line2)
- l_n_point["points"].extend(l_point["points"])
- if not _flag:
- new_cluster_crosspoints.append({"lines": l_point.get("lines"), "points": l_point.get("points")})
- cluster_crosspoints = new_cluster_crosspoints
- if not _find:
- break
- # need to sort to deal with the inner tables
- for clu_cp in cluster_crosspoints:
- points = clu_cp["points"]
- list_p = np.array([p["point"] for p in points])
- max_x = max(list_p[..., 0])
- min_x = min(list_p[..., 0])
- max_y = max(list_p[..., 1])
- min_y = min(list_p[..., 1])
- _area = (max_y - min_y) * (max_x - min_x)
- clu_cp["area"] = _area
- cluster_crosspoints.sort(key=lambda x: x["area"])
- list_l_rect = []
- for table_crosspoint in cluster_crosspoints:
- list_rect = self.crosspoint2rect(table_crosspoint.get("points"))
- list_l_rect.append(list_rect)
- in_objs = set()
- list_tables = []
- for l_rect in list_l_rect:
- _ta = self.rect2table(list_textbox, l_rect, in_objs, sourceP_LB=sourceP_LB)
- if self.connect_bbox_list:
- return [], [], [], self.connect_bbox_list
- if _ta:
- list_tables.append(_ta)
- # 展示表格及文字
- # self._plot(list_line, list_textbox)
- return list_tables, in_objs, list_l_rect, []
- # def recognize_table_by_rect(self, list_textbox, list_rect, margin=2):
- #
- # dump_margin = 5
- # list_rect_tmp = []
- # # 去重
- # for _rect in list_rect:
- # if (_rect.bbox[3] - _rect.bbox[1] < 10) or (abs(_rect.bbox[2] - _rect.bbox[0]) < 5):
- # continue
- # _find = False
- # for _tmp in list_rect_tmp:
- # for i in range(4):
- # if abs(_rect.bbox[i] - _tmp.bbox[i]) < dump_margin:
- # pass
- # else:
- # _find = False
- # break
- # if i == 3:
- # _find = True
- # if _find:
- # break
- # if not _find:
- # list_rect_tmp.append(_rect)
- #
- # # print("=====",len(list_rect),len(list_rect_tmp))
- # # print(list_rect_tmp)
- # # from matplotlib import pyplot as plt
- # # plt.figure()
- # # for _rect in list_rect_tmp:
- # # x0,y0,x1,y1 = _rect.bbox
- # # plt.boxplot(_rect.bbox)
- # # plt.show()
- #
- # cluster_rect = []
- # for _rect in list_rect:
- # _find = False
- # for cr in cluster_rect:
- # for cr_rect in cr:
- # if abs((cr_rect.bbox[2] - cr_rect.bbox[0] + _rect.bbox[2] - _rect.bbox[0]) - (
- # max(cr_rect.bbox[2], _rect.bbox[2]) - min(cr_rect.bbox[0], _rect.bbox[0]))) < margin:
- # _find = True
- # cr.append(_rect)
- # break
- # elif abs((cr_rect.bbox[3] - cr_rect.bbox[1] + _rect.bbox[3] - _rect.bbox[1]) - (
- # max(cr_rect.bbox[3], _rect.bbox[3]) - min(cr_rect.bbox[1], _rect.bbox[1]))) < margin:
- # _find = True
- # cr.append(_rect)
- # break
- # if _find:
- # break
- # if not _find:
- # cluster_rect.append([_rect])
- #
- # list_l_rect = cluster_rect
- #
- # in_objs = set()
- # list_tables = []
- # for l_rect in list_l_rect:
- # _ta = self.rect2table(list_textbox, l_rect, in_objs)
- # if _ta:
- # list_tables.append(_ta)
- # return list_tables, in_objs, list_l_rect
- def recognize_crosspoints(self, list_line, fixLine=True):
- list_crosspoints = []
- # print("lines num",len(list_line))
- def getMaxPoints(list_x, margin=5, reverse=False):
- clust_x = []
- for _x in list_x:
- _find = False
- for cx in clust_x:
- if abs(cx[0] - _x) < margin:
- _find = True
- cx.append(_x)
- break
- if not _find:
- clust_x.append([_x])
- clust_x.sort(key=lambda x: x, reverse=reverse)
- return clust_x[0][0], len(clust_x[0])
- for _i in range(len(list_line)):
- for _j in range(len(list_line)):
- line1 = list_line[_i].__dict__.get("bbox")
- line2 = list_line[_j].__dict__.get("bbox")
- exists, point = self.cross_point(line1, line2)
- if exists:
- list_crosspoints.append(point)
- if fixLine:
- # 聚类
- cluster_crosspoints = []
- for _point in list_crosspoints:
- cluster_crosspoints.append({"lines": _point.get("lines"), "points": [_point]})
- while 1:
- _find = False
- new_cluster_crosspoints = []
- for l_point in cluster_crosspoints:
- _flag = False
- for l_n_point in new_cluster_crosspoints:
- line1 = l_point.get("lines")
- line2 = l_n_point.get("lines")
- if len(line1 & line2) > 0:
- _find = True
- _flag = True
- l_n_point["lines"] = line1.union(line2)
- l_n_point["points"].extend(l_point["points"])
- if not _flag:
- new_cluster_crosspoints.append({"lines": l_point.get("lines"), "points": l_point.get("points")})
- cluster_crosspoints = new_cluster_crosspoints
- if not _find:
- break
- list_crosspoints = []
- for list_cp in cluster_crosspoints:
- points = list_cp.get("points")
- l_lines = []
- for p in points:
- l_lines.extend(p.get("p_lines"))
- l_lines = list(set(l_lines))
- l_lines.sort(key=lambda x: x[0])
- min_x, _count = getMaxPoints([l[0] for l in l_lines], reverse=False)
- if _count <= 2:
- min_x = None
- min_y, _count = getMaxPoints([l[1] for l in l_lines], reverse=False)
- if _count < 2:
- min_y = None
- max_x, _count = getMaxPoints([l[2] for l in l_lines], reverse=True)
- if _count <= 2:
- max_x = None
- max_y, _count = getMaxPoints([l[3] for l in l_lines], reverse=True)
- if _count <= 2:
- max_y = None
- if min_x and min_y and max_x and max_y:
- points.sort(key=lambda x: x["point"][0])
- if abs(min_x - points[0]["point"][0]) > 30:
- _line = LTLine(1, (min_x, min_y), (min_x, max_y))
- list_line.append(_line)
- l_lines.append(_line.bbox)
- # print("add=====",_line.bbox)
- if abs(max_x - points[-1]["point"][0]) > 30:
- _line = LTLine(1, (max_x, min_y), (max_x, max_y))
- list_line.append(_line)
- l_lines.append(_line.bbox)
- # print("add=====1",_line.bbox)
- points.sort(key=lambda x: x["point"][1])
- if abs(min_y - points[0]["point"][1]) > 30:
- _line = LTLine(1, (min_x, min_y), (max_x, min_y))
- list_line.append(_line)
- l_lines.append(_line.bbox)
- # print("add=====2",_line.bbox)
- if abs(max_y - points[-1]["point"][1]) > 30:
- _line = LTLine(1, (min_x, max_y), (max_x, max_y))
- list_line.append(_line)
- l_lines.append(_line.bbox)
- # print("add=====2",_line.bbox)
- for _i in range(len(l_lines)):
- for _j in range(len(l_lines)):
- line1 = l_lines[_i]
- line2 = l_lines[_j]
- exists, point = self.cross_point(line1, line2)
- if exists:
- list_crosspoints.append(point)
- # from matplotlib import pyplot as plt
- # plt.figure()
- # for _line in l_lines:
- # x0,y0,x1,y1 = _line
- # plt.plot([x0,x1],[y0,y1])
- # for point in list_crosspoints:
- # plt.scatter(point.get("point")[0],point.get("point")[1])
- # plt.show()
- # print(list_crosspoints)
- # print("points num",len(list_crosspoints))
- return list_crosspoints
- # def recognize_rect(self, _page):
- # list_line = []
- # for _obj in _page._objs:
- # if isinstance(_obj, (LTLine)):
- # list_line.append(_obj)
- # list_crosspoints = self.recognize_crosspoints(list_line)
- #
- # # 聚类
- # cluster_crosspoints = []
- # for _point in list_crosspoints:
- # cluster_crosspoints.append({"lines": _point.get("lines"), "points": [_point]})
- # while 1:
- # _find = False
- # new_cluster_crosspoints = []
- # for l_point in cluster_crosspoints:
- # _flag = False
- # for l_n_point in new_cluster_crosspoints:
- # line1 = l_point.get("lines")
- # line2 = l_n_point.get("lines")
- # if len(line1 & line2) > 0:
- # _find = True
- # _flag = True
- # l_n_point["lines"] = line1.union(line2)
- # l_n_point["points"].extend(l_point["points"])
- # if not _flag:
- # new_cluster_crosspoints.append({"lines": l_point.get("lines"), "points": l_point.get("points")})
- # cluster_crosspoints = new_cluster_crosspoints
- # if not _find:
- # break
- # # print(len(cluster_crosspoints))
- #
- # list_l_rect = []
- # for table_crosspoint in cluster_crosspoints:
- # list_rect = self.crosspoint2rect(table_crosspoint.get("points"))
- # list_l_rect.append(list_rect)
- #
- # return list_l_rect
- def crosspoint2rect(self, list_crosspoint, margin=10):
- dict_line_points = {}
- for _point in list_crosspoint:
- lines = list(_point.get("lines"))
- for _line in lines:
- if _line not in dict_line_points:
- dict_line_points[_line] = {"direct": None, "points": []}
- dict_line_points[_line]["points"].append(_point)
- # 排序
- for k, v in dict_line_points.items():
- list_x = []
- list_y = []
- for _p in v["points"]:
- list_x.append(_p.get("point")[0])
- list_y.append(_p.get("point")[1])
- if max(list_x) - min(list_x) > max(list_y) - min(list_y):
- v.get("points").sort(key=lambda x: x.get("point")[0])
- v["direct"] = "row"
- else:
- v.get("points").sort(key=lambda x: x.get("point")[1])
- v["direct"] = "column"
- list_rect = []
- for _point in list_crosspoint:
- if _point["buttom"] >= margin and _point["right"] >= margin:
- lines = list(_point.get("lines"))
- _line = lines[0]
- if dict_line_points[_line]["direct"] == "column":
- _line = lines[1]
- next_point = None
- for p1 in dict_line_points[_line]["points"]:
- if p1["buttom"] >= margin and p1["point"][0] > _point["point"][0]:
- next_point = p1
- break
- if not next_point:
- continue
- lines = list(next_point.get("lines"))
- _line = lines[0]
- if dict_line_points[_line]["direct"] == "row":
- _line = lines[1]
- final_point = None
- for p1 in dict_line_points[_line]["points"]:
- if p1["left"] >= margin and p1["point"][1] > next_point["point"][1]:
- final_point = p1
- break
- if not final_point:
- continue
- _r = LTRect(1,
- (_point["point"][0], _point["point"][1], final_point["point"][0], final_point["point"][1]))
- list_rect.append(_r)
- tmp_rect = []
- set_bbox = set()
- for _r in list_rect:
- _bbox = "%.2f-%.2f-%.2f-%.2f" % _r.bbox
- width = _r.bbox[2] - _r.bbox[0]
- height = _r.bbox[3] - _r.bbox[1]
- if width <= margin or height <= margin:
- continue
- if _bbox not in set_bbox:
- tmp_rect.append(_r)
- set_bbox.add(_bbox)
- list_rect = tmp_rect
- # _l = [x.get('point') for x in list_crosspoint]
- # _l.sort(key=lambda x: (x[0], x[1]))
- # print('list_crosspoint', _l)
- # print('list_rect', list_rect)
- # import cv2
- # import numpy as np
- # import random
- # img = np.zeros(shape=(1000,1000),dtype=np.uint8)
- # img += 255
- #
- # color = []
- # for rect in list_rect:
- # color += 10
- # x0,y0,x1,y1 = rect.bbox
- # x0 *= 10/18
- # y0 *= 10/18
- # x1 *= 10/18
- # y1 *= 10/18
- # print(rect.bbox)
- # cv2.rectangle(img, (int(x0),int(y0)),(int(x1),int(y1)), (color%255, (color+10)%255, (color+20)%255), 3)
- # cv2.imshow("bbox", img)
- # cv2.waitKey(0)
- return list_rect
- def cross_point(self, line1, line2, segment=True, margin=2):
- point_is_exist = False
- x = y = 0
- x1, y1, x2, y2 = line1
- x3, y3, x4, y4 = line2
- if (x2 - x1) == 0:
- k1 = None
- b1 = 0
- else:
- k1 = (y2 - y1) * 1.0 / (x2 - x1) # 计算k1,由于点均为整数,需要进行浮点数转化
- b1 = y1 * 1.0 - x1 * k1 * 1.0 # 整型转浮点型是关键
- if (x4 - x3) == 0: # L2直线斜率不存在
- k2 = None
- b2 = 0
- else:
- k2 = (y4 - y3) * 1.0 / (x4 - x3) # 斜率存在
- b2 = y3 * 1.0 - x3 * k2 * 1.0
- if k1 is None:
- if not k2 is None:
- x = x1
- y = k2 * x1 + b2
- point_is_exist = True
- elif k2 is None:
- x = x3
- y = k1 * x3 + b1
- elif not k2 == k1:
- x = (b2 - b1) * 1.0 / (k1 - k2)
- y = k1 * x * 1.0 + b1 * 1.0
- point_is_exist = True
- left = 0
- right = 0
- top = 0
- buttom = 0
- if point_is_exist:
- if segment:
- if x >= (min(x1, x2) - margin) and x <= (max(x1, x2) + margin) and y >= (
- min(y1, y2) - margin) and y <= (max(y1, y2) + margin):
- if x >= (min(x3, x4) - margin) and x <= (max(x3, x4) + margin) and y >= (
- min(y3, y4) - margin) and y <= (max(y3, y4) + margin):
- point_is_exist = True
- left = abs(min(x1, x3) - x)
- right = abs(max(x2, x4) - x)
- top = abs(min(y1, y3) - y)
- buttom = abs(max(y2, y4) - y)
- else:
- point_is_exist = False
- else:
- point_is_exist = False
- line1_key = "%.2f-%.2f-%.2f-%.2f" % (x1, y1, x2, y2)
- line2_key = "%.2f-%.2f-%.2f-%.2f" % (x3, y3, x4, y4)
- return point_is_exist, {"point": [x, y], "left": left, "right": right,
- "top": top, "buttom": buttom, "lines": set([line1_key, line2_key]),
- "p_lines": [line1, line2]}
- # def unionTable(self, list_table, fixspan=True, margin=2):
- # set_x = set()
- # set_y = set()
- #
- # list_cell = []
- # for _t in list_table:
- # for _line in _t:
- # list_cell.extend(_line)
- #
- # clusters_rects = []
- # # 根据y1聚类
- # set_id = set()
- # list_cell_dump = []
- # for _cell in list_cell:
- # _id = id(_cell)
- # if _id in set_id:
- # continue
- # set_id.add(_id)
- # list_cell_dump.append(_cell)
- # list_cell = list_cell_dump
- # list_cell.sort(key=lambda x: x.get("bbox")[3])
- # for _rect in list_cell:
- # _y0 = _rect.get("bbox")[3]
- # _find = False
- # for l_cr in clusters_rects:
- # if abs(l_cr[0].get("bbox")[3] - _y0) < 2:
- # _find = True
- # l_cr.append(_rect)
- # break
- # if not _find:
- # clusters_rects.append([_rect])
- #
- # clusters_rects.sort(key=lambda x: x[0].get("bbox")[3], reverse=True)
- # for l_cr in clusters_rects:
- # l_cr.sort(key=lambda x: x.get("bbox")[0])
- #
- # # print("=============:")
- # # for l_r in clusters_rects:
- # # print(len(l_r))
- #
- # for _line in clusters_rects:
- # for _rect in _line:
- # (x0, y0, x1, y1) = _rect.get("bbox")
- # set_x.add(x0)
- # set_x.add(x1)
- # set_y.add(y0)
- # set_y.add(y1)
- # if len(set_x) == 0 or len(set_y) == 0:
- # return
- # list_x = list(set_x)
- # list_y = list(set_y)
- #
- # list_x.sort(key=lambda x: x)
- # list_y.sort(key=lambda x: x, reverse=True)
- # _table = []
- # line_i = 0
- # for _line in clusters_rects:
- #
- # table_line = []
- # cell_i = 0
- # for _rect in _line:
- # (x0, y0, x1, y1) = _rect.get("bbox")
- # _cell = {"bbox": (x0, y0, x1, y1), "rect": _rect.get("rect"),
- # "rowspan": self.getspan(list_y, y0, y1, margin),
- # "columnspan": self.getspan(list_x, x0, x1, margin), "text": _rect.get("text", "")}
- # table_line.append(_cell)
- #
- # cell_i += 1
- # line_i += 1
- # _table.append(table_line)
- #
- # # print("=====================>>")
- # # for _line in _table:
- # # for _cell in _line:
- # # print(_cell,end="\t")
- # # print("\n")
- # # print("=====================>>")
- #
- # # print(_table)
- # if fixspan:
- # for _line in _table:
- # extend_line = []
- # for c_i in range(len(_line)):
- # _cell = _line[c_i]
- # if _cell.get("columnspan") > 1:
- # _cospan = _cell.get("columnspan")
- # _cell["columnspan"] = 1
- # for i in range(1, _cospan):
- # extend_line.append({"index": c_i + 1, "cell": _cell})
- # extend_line.sort(key=lambda x: x["index"], reverse=True)
- # for _el in extend_line:
- # _line.insert(_el["index"], _el["cell"])
- # for l_i in range(len(_table)):
- # _line = _table[l_i]
- # for c_i in range(len(_line)):
- # _cell = _line[c_i]
- # if _cell.get("rowspan") > 1:
- # _rospan = _cell.get("rowspan")
- # _cell["rowspan"] = 1
- # for i in range(1, _rospan):
- # _table[l_i + i].insert(c_i, _cell)
- #
- # table_bbox = (_table[0][0].get("bbox")[0], _table[0][0].get("bbox")[1], _table[-1][-1].get("bbox")[2],
- # _table[-1][-1].get("bbox")[3])
- #
- # ta = {"bbox": table_bbox, "table": _table}
- # return ta
- # 获取点阵
- def getSpanLocation(self, _list, x0, x1, margin):
- list_location = []
- (x0, x1) = (min(x0, x1), max(x0, x1))
- for _x in _list:
- if _x >= (x0 - margin) and _x <= (x1 + margin):
- list_location.append(_x)
- return list_location
- def fixSpan(self, _table, list_x, list_y, sourceP_LB):
- def checkPosition(_line, _position, bbox, margin=5):
- # check y
- if len(_line) > 0:
- _bbox = _line[0].get("bbox")
- # check if has lap
- if (min(_bbox[1], _bbox[3]) > max(bbox[1], bbox[3]) or max(_bbox[1], _bbox[3]) < min(bbox[1], bbox[3])):
- # if abs(min(_bbox[1],_bbox[3])-min(bbox[1],bbox[3]))>margin or abs(max(_bbox[1],_bbox[3])-max(bbox[1],bbox[3]))>margin:
- # print(_bbox)
- # print(bbox)
- # print("check position y false")
- return False
- # check x
- if _position <= len(_line) - 1:
- after_bbox = _line[_position].get("bbox")
- # the insert bbox.x1 should not less then the after bbox.x0
- if not (after_bbox[0] >= bbox[2]):
- # print("check position x after false")
- return False
- if _position - 1 > 0 and _position - 1 < len(_line):
- before_bbox = _line[_position - 1].get("bbox")
- # the insert bbox.x1 should less equal than the first bbox.x0
- if not (bbox[0] >= before_bbox[2]):
- # print("check position x before false")
- return False
- return True
- # 拓展columnspan的数据
- for _line in _table:
- c_i = 0
- while c_i < len(_line):
- _cell = _line[c_i]
- if _cell.get("columnspan") > 1:
- x0, y0, x1, y1 = _cell.get("bbox")
- _cospan = _cell.get("columnspan")
- locations = self.getSpanLocation(list_x, x0, x1, 10)
- if len(locations) == _cospan + 1:
- _cell["bbox"] = (x0, y0, locations[1], y1)
- _cell["columnspan"] = 1
- # len(locations)==_colspan+1
- for i in range(1, _cospan):
- n_cell = {}
- n_cell.update(_cell)
- n_cell["bbox"] = (locations[i], y0, locations[i + 1], y1)
- c_i += 1
- # check the position
- if checkPosition(_line, c_i, n_cell["bbox"]):
- _line.insert(c_i, n_cell)
- c_i += 1
- # 拓展rowspan的数据
- for l_i in range(len(_table)):
- _line = _table[l_i]
- c_i = 0
- while c_i < len(_line):
- _cell = _line[c_i]
- if _cell.get("rowspan") > 1:
- x0, y0, x1, y1 = _cell.get("bbox")
- _rospan = _cell.get("rowspan")
- locations = self.getSpanLocation(list_y, y0, y1, 10)
- if len(locations) == _rospan + 1:
- _cell["bbox"] = (x0, y0, x1, locations[1])
- _cell["rowspan"] = 1
- for i in range(1, _rospan):
- n_cell = {}
- n_cell.update(_cell)
- if l_i + i <= len(_table) - 1:
- # print(len(_table),l_i+i)
- n_cell["bbox"] = (x0, locations[i], x1, locations[i + 1])
- if checkPosition(_table[l_i + i], c_i, n_cell["bbox"]):
- _table[l_i + i].insert(c_i, n_cell)
- c_i += 1
- def fixRect(self, _table, list_x, list_y, sourceP_LB, margin):
- self.fixSpan(_table, list_x, list_y, sourceP_LB)
- # for line_i in range(len(_table)):
- # for cell_i in range(len(_table[line_i])):
- # _cell = _table[line_i][cell_i]
- # print(line_i,cell_i,_cell["bbox"],_cell["text"])
- for _line in _table:
- extend_line = []
- for c_i in range(len(_line)):
- c_cell = _line[c_i]
- # first cell missing
- if c_i == 0 and c_cell["bbox"][0] != list_x[0]:
- _bbox = (list_x[0], c_cell["bbox"][1], c_cell["bbox"][0], c_cell["bbox"][3])
- _cell = {"bbox": _bbox,
- "rect": LTRect(1, _bbox),
- "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin),
- "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin),
- "text": ""}
- extend_line.append({"index": c_i, "cell": _cell})
- # cell in the median missing
- if c_i < len(_line) - 1:
- n_cell = _line[c_i + 1]
- _bbox = c_cell["bbox"]
- n_bbox = n_cell["bbox"]
- if _bbox[0] == n_bbox[0] and _bbox[2] == n_bbox[2]:
- continue
- else:
- if abs(_bbox[2] - n_bbox[0]) > margin:
- _bbox = (_bbox[2], _bbox[1], n_bbox[0], _bbox[3])
- _cell = {"bbox": _bbox,
- "rect": LTRect(1, _bbox),
- "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin),
- "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin),
- "text": ""}
- extend_line.append({"index": c_i + 1, "cell": _cell})
- # last cell missing
- if c_i == len(_line) - 1:
- if abs(c_cell["bbox"][2] - list_x[-1]) > margin:
- _bbox = (c_cell["bbox"][2], c_cell["bbox"][1], list_x[-1], c_cell["bbox"][3])
- _cell = {"bbox": _bbox,
- "rect": LTRect(1, _bbox),
- "rowspan": self.getspan(list_y, _bbox[1], _bbox[3], margin),
- "columnspan": self.getspan(list_x, _bbox[0], _bbox[2], margin),
- "text": ""}
- extend_line.append({"index": c_i + 1, "cell": _cell})
- extend_line.sort(key=lambda x: x["index"], reverse=True)
- for _tmp in extend_line:
- _line.insert(_tmp["index"], _tmp["cell"])
- def feedText2table(self, _table, list_textbox, in_objs, sourceP_LB):
- # find the suitable cell of the textbox
- list_cells = []
- for table_line in _table:
- for _cell in table_line:
- list_cells.append({"cell": _cell, "inbox_textbox_list": []})
- self.connect_bbox_list = []
- for textbox in list_textbox:
- list_iou = []
- for _d in list_cells:
- _cell = _d["cell"]
- _iou = self.getIOU(textbox.bbox, _cell["bbox"])
- list_iou.append(_iou)
- max_iou_index = np.argmax(list_iou)
- max_iou = list_iou[max_iou_index]
- if max_iou > 0.1 and textbox not in in_objs:
- list_cells[max_iou_index]["inbox_textbox_list"].append(textbox)
- in_objs.add(textbox)
- if not self.from_pdf and not self.splited:
- # 多个iou大于0.3的,可能是ocr将两个文本合成一个了
- iou_index_list = np.where(np.array(list_iou) >= 0.3)[0].tolist()
- if len(iou_index_list) >= 2:
- print('len(iou_index_list) >= 2 textbox', textbox)
- self.connect_bbox_list.append(textbox)
- has_matched_box_list = []
- for _d in list_cells:
- _cell = _d["cell"]
- inbox_textbox_list = _d["inbox_textbox_list"]
- # 分行,根据y重合
- all_match_box_list = []
- inbox_textbox_list.sort(key=lambda x: x.bbox[1], reverse=sourceP_LB)
- for i in range(len(inbox_textbox_list)):
- match_box_list = []
- box1 = inbox_textbox_list[i]
- if box1 in has_matched_box_list:
- continue
- min_y1 = box1.bbox[1] + 1 / 3 * abs(box1.bbox[3] - box1.bbox[1])
- max_y1 = box1.bbox[3] - 1 / 3 * abs(box1.bbox[3] - box1.bbox[1])
- match_box_list.append(
- [box1.get_text(), box1.bbox[0], box1.bbox[1], box1.bbox[2], box1.bbox[3], min_y1, max_y1])
- has_matched_box_list.append(box1)
- for j in range(i + 1, len(inbox_textbox_list)):
- box2 = inbox_textbox_list[j]
- if box2 in has_matched_box_list:
- continue
- # print(min_y1, box2.bbox[1], box2.bbox[3], max_y1)
- # print(min_y2, box1.bbox[3], max_y2)
- if min_y1 <= box2.bbox[1] <= max_y1 or \
- min_y1 <= box2.bbox[3] <= max_y1 or \
- box2.bbox[1] <= min_y1 <= max_y1 <= box2.bbox[3]:
- match_box_list.append(
- [box2.get_text(), box2.bbox[0], box2.bbox[1], box2.bbox[2], box2.bbox[3], min_y1, max_y1])
- has_matched_box_list.append(box2)
- match_box_list.sort(key=lambda x: x[1])
- all_match_box_list.append(match_box_list)
- # print("match_box_list", all_match_box_list)
- all_match_box_list.sort(key=lambda x: (round(x[0][2] + x[0][4]) / 2, 0), reverse=sourceP_LB)
- for box_list in all_match_box_list:
- for box in box_list:
- _cell["text"] += re.sub("\s", '', box[0])
- def makeTableByRect(self, list_rect, margin, sourceP_LB):
- _table = []
- set_x = set()
- set_y = set()
- clusters_rects = []
- # 根据y1聚类
- if sourceP_LB:
- list_rect.sort(key=lambda x: x.bbox[3])
- for _rect in list_rect:
- _y0 = _rect.bbox[3]
- _y1 = _rect.bbox[1]
- _find = False
- for l_cr in clusters_rects:
- if abs(l_cr[0].bbox[3] - _y0) < margin:
- _find = True
- l_cr.append(_rect)
- break
- if not _find:
- clusters_rects.append([_rect])
- else:
- list_rect.sort(key=lambda x: x.bbox[1])
- for _rect in list_rect:
- _y0 = _rect.bbox[1]
- _y1 = _rect.bbox[3]
- _find = False
- for l_cr in clusters_rects:
- if abs(l_cr[0].bbox[1] - _y0) < margin:
- _find = True
- l_cr.append(_rect)
- break
- if not _find:
- clusters_rects.append([_rect])
- # print("textbox:===================")
- # for _textbox in list_textbox:
- # print(_textbox.get_text())
- # print("textbox:======>>>>>>>>>>>>>")
- # for c in clusters_rects:
- # print("+"*30)
- # for cc in c:
- # print("rect", cc.)
- # cul spans
- for _line in clusters_rects:
- for _rect in _line:
- (x0, y0, x1, y1) = _rect.bbox
- set_x.add(x0)
- set_x.add(x1)
- set_y.add(y0)
- set_y.add(y1)
- if len(set_x) == 0 or len(set_y) == 0:
- return None, [], []
- if len(list_rect) <= 1:
- return None, [], []
- list_x = list(set_x)
- list_y = list(set_y)
- list_x.sort(key=lambda x: x)
- list_y.sort(key=lambda x: x, reverse=sourceP_LB)
- # print("clusters_rects", len(clusters_rects))
- if sourceP_LB:
- clusters_rects.sort(key=lambda x: (x[0].bbox[1] + x[0].bbox[3]) / 2, reverse=sourceP_LB)
- clusters_rects.sort(key=lambda x: (x[0].bbox[1] + x[0].bbox[3]) / 2, reverse=sourceP_LB)
- for l_cr in clusters_rects:
- l_cr.sort(key=lambda x: x.bbox[0])
- pop_x = []
- for i in range(len(list_x) - 1):
- _i = len(list_x) - i - 1
- l_i = _i - 1
- if abs(list_x[_i] - list_x[l_i]) < 5:
- pop_x.append(_i)
- pop_x.sort(key=lambda x: x, reverse=True)
- for _x in pop_x:
- list_x.pop(_x)
- #
- pop_x = []
- for i in range(len(list_y) - 1):
- _i = len(list_y) - i - 1
- l_i = _i - 1
- if abs(list_y[_i] - list_y[l_i]) < 5:
- pop_x.append(_i)
- pop_x.sort(key=lambda x: x, reverse=True)
- for _x in pop_x:
- list_y.pop(_x)
- # print("list_x", list_x)
- # print("list_y", list_y)
- line_i = 0
- for _line in clusters_rects:
- table_line = []
- cell_i = 0
- for _rect in _line:
- (x0, y0, x1, y1) = _rect.bbox
- _cell = {"bbox": (x0, y0, x1, y1),
- "rect": _rect,
- "rowspan": self.getspan(list_y, y0, y1, margin),
- "columnspan": self.getspan(list_x, x0, x1, margin),
- "text": ""}
- cell_i += 1
- table_line.append(_cell)
- line_i += 1
- _table.append(table_line)
- return _table, list_x, list_y
- def rect2table(self, list_textbox, list_rect, in_objs, margin=5, sourceP_LB=True):
- def getIOU(bbox0, bbox1):
- width = max(bbox0[2], bbox1[2]) - min(bbox0[0], bbox1[0]) - (bbox0[2] - bbox0[0] + bbox1[2] - bbox1[0])
- height = max(bbox0[3], bbox1[3]) - min(bbox0[1], bbox1[1]) - (bbox0[3] - bbox0[1] + bbox1[3] - bbox1[1])
- if width < 0 and height < 0:
- return abs(width * height / min(abs((bbox0[2] - bbox0[0]) * (bbox0[3] - bbox0[1])),
- abs((bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1]))))
- return 0
- _table, list_x, list_y = self.makeTableByRect(list_rect, margin, sourceP_LB)
- if _table is None:
- return
- self.feedText2table(_table, list_textbox, in_objs, sourceP_LB)
- # print("table===========================>")
- # for _line in _table:
- # for _cell in _line:
- # print("||%d%d"%(_cell["rowspan"],_cell["columnspan"]),end="\t")
- # print()
- # print("table===========================>")
- #
- # print("------------")
- # for _line in _table:
- # for _cell in _line:
- # print(_cell["text"],end="\t")
- # print("\n")
- # print("------------")
- self.fixRect(_table, list_x, list_y, sourceP_LB, margin)
- # print("table===========================>")
- # for _line in _table:
- # for _cell in _line:
- # print("||%d%d"%(_cell["rowspan"],_cell["columnspan"]),end="\t")
- # print()
- # print("table===========================>")
- self.feedText2table(_table, list_textbox, in_objs, sourceP_LB)
- # feedText2table后,有textbox符合多个单元格iou的,可能是文本错误连接了,需拆开
- if self.connect_bbox_list:
- return {}
- table_bbox = (_table[0][0].get("bbox")[0],
- _table[0][0].get("bbox")[1],
- _table[-1][-1].get("bbox")[2],
- _table[-1][-1].get("bbox")[3])
- # print("=======")
- # for _line in _table:
- # for _cell in _line:
- # print(_cell["text"])
- # print("\n")
- # print("===========")
- ta = {"bbox": table_bbox, "table": _table}
- return ta
- def inbox(self, bbox0, bbox_g, text=""):
- # if bbox_g[0]<=bbox0[0] and bbox_g[1]<=bbox0[1] and bbox_g[2]>=bbox0[2] and bbox_g[3]>=bbox0[3]:
- # return 1
- # print("utils inbox", text, self.getIOU(bbox0,bbox_g), bbox0, bbox_g)
- if self.getIOU(bbox0, bbox_g) > 0.2:
- return 1
- return 0
- def getIOU(self, bbox0, bbox1):
- width = abs(max(bbox0[2], bbox1[2]) - min(bbox0[0], bbox1[0])) - (
- abs(bbox0[2] - bbox0[0]) + abs(bbox1[2] - bbox1[0]))
- height = abs(max(bbox0[3], bbox1[3]) - min(bbox0[1], bbox1[1])) - (
- abs(bbox0[3] - bbox0[1]) + abs(bbox1[3] - bbox1[1]))
- if width < 0 and height < 0:
- iou = abs(width * height / min(abs((bbox0[2] - bbox0[0]) * (bbox0[3] - bbox0[1])),
- abs((bbox1[2] - bbox1[0]) * (bbox1[3] - bbox1[1]))))
- # print("getIOU", iou)
- return iou
- return 0
- def getspan(self, _list, x0, x1, margin):
- _count = 0
- (x0, x1) = (min(x0, x1), max(x0, x1))
- for _x in _list:
- if _x >= (x0 - margin) and _x <= (x1 + margin):
- _count += 1
- return _count - 1
- def _plot(self, list_line, list_textbox):
- from matplotlib import pyplot as plt
- plt.figure()
- for _line in list_line:
- x0, y0, x1, y1 = _line.__dict__.get("bbox")
- plt.plot([x0, x1], [y0, y1])
- for _line in list_line:
- x0, y0, x1, y1 = _line.bbox
- plt.plot([x0, x1], [y0, y1])
- # for point in list_crosspoints:
- # plt.scatter(point.get("point")[0],point.get("point")[1])
- for textbox in list_textbox:
- x0, y0, x1, y1 = textbox.bbox
- plt.plot([x0, x1], [y0, y1])
- plt.show()
- def get_table_html(table):
- html_text = '<table border="1">'
- for row in table:
- html_text += "<tr>"
- for col in row:
- row_span = col.get("rowspan")
- col_span = col.get("columnspan")
- bbox_text = col.get("text")
- html_text += "<td colspan=" + str(col_span) + " rowspan=" + str(row_span) + ">"
- html_text += bbox_text + "</td>"
- html_text += "</tr>"
- html_text += "</table>"
- return html_text
- def sort_object(obj_list, is_reverse=False):
- from format_convert.convert_tree import _Table, _Image, _Sentence, _Page
- obj_list = combine_object(obj_list)
- if len(obj_list) == 0:
- return obj_list
- if isinstance(obj_list[0], (_Table, _Sentence, _Image)):
- obj_list.sort(key=lambda x: (x.y, x.x), reverse=is_reverse)
- return obj_list
- elif isinstance(obj_list[0], _Page):
- obj_list.sort(key=lambda x: x.page_no)
- return obj_list
- else:
- return obj_list
- def combine_object(obj_list, threshold=5):
- from format_convert.convert_tree import _Sentence
- sentence_list = []
- for obj in obj_list:
- if isinstance(obj, _Sentence) and not obj.is_html:
- obj.content = re.sub("\s", "", obj.content)
- sentence_list.append(obj)
- sentence_list.sort(key=lambda x: (x.y, x.x))
- for sen in sentence_list:
- obj_list.remove(sen)
- delete_list = []
- for i in range(1, len(sentence_list)):
- sen1 = sentence_list[i - 1]
- sen2 = sentence_list[i]
- if sen1.combine is False or sen2.combine is False:
- continue
- if abs(sen2.y - sen1.y) <= threshold:
- if sen2.x > sen1.x:
- sen2.x = sen1.x
- sen2.content = sen1.content + sen2.content
- else:
- sen2.content = sen2.content + sen1.content
- if sen2.y > sen1.y:
- sen2.y = sen1.y
- delete_list.append(sen1)
- for sen in delete_list:
- sentence_list.remove(sen)
- for sen in sentence_list:
- obj_list.append(sen)
- return obj_list
- session_ocr = requests.Session()
- session_otr = requests.Session()
- session_all = requests.Session()
- def request_post(url, param, time_out=1000, use_zlib=False):
- fails = 0
- text = json.dumps([-2])
- while True:
- try:
- if fails >= 1:
- break
- headers = {'content-type': 'application/json'}
- # result = requests.post(url, data=param, timeout=time_out)
- if param.get("model_type") == "ocr":
- result = session_ocr.post(url, data=param, timeout=time_out)
- elif param.get("model_type") == "otr":
- result = session_otr.post(url, data=param, timeout=time_out)
- else:
- result = session_all.post(url, data=param, timeout=time_out)
- # print('result.status_code', result.status_code)
- # print('result.text', result.text)
- if result.status_code == 200:
- text = result.text
- break
- else:
- # print('result.status_code', result.status_code)
- # print('result.text', result.text)
- fails += 1
- continue
- except socket.timeout:
- fails += 1
- # print('timeout! fail times:', fails)
- except:
- fails += 1
- # print('fail! fail times:', fails)
- traceback.print_exc()
- return text
- def test_gpu():
- print("=" * 30)
- import paddle
- paddle.utils.run_check()
- # import tensorflow as tf
- # print("tf gpu", tf.config.list_physical_devices('GPU'))
- print("=" * 30)
- def my_subprocess_call(*popenargs, timeout=None):
- logging.info("into my_subprocess_call")
- with Popen(*popenargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as p:
- try:
- for line in p.stdout:
- print("stdout", line)
- for line in p.stderr:
- print("stderr", line)
- p.wait(timeout=timeout)
- # p.communicate()
- return p.pid, p.returncode
- except: # Including KeyboardInterrupt, wait handled that.
- p.kill()
- # We don't call p.wait() again as p.__exit__ does that for us.
- raise
- finally:
- logging.info("out my_subprocess_call")
- p.kill()
- def parse_yaml():
- yaml_path = os.path.dirname(os.path.abspath(__file__)) + "/interface.yml"
- with open(yaml_path, "r", encoding='utf-8') as f:
- cfg = f.read()
- params = yaml.load(cfg, Loader=yaml.SafeLoader)
- return params
- def get_ip_port(node_type=None, interface_type=None):
- if node_type is None:
- node_type_list = ["master", "slave"]
- else:
- node_type_list = [node_type]
- if interface_type is None:
- interface_type_list = ["convert", "ocr", "otr", "office", "path", "isr", "idc", "atc", "yolo"]
- else:
- interface_type_list = [interface_type]
- ip_port_dict = {}
- params = parse_yaml()
- # 循环 master slave
- for type1 in node_type_list:
- node_type = type1.upper()
- ip_list = params.get(node_type).get("ip")
- # 循环多个IP
- for j in range(len(ip_list)):
- _ip = ip_list[j]
- if ip_port_dict.get(_ip):
- ip_port_dict.get(_ip).update({node_type: {}})
- else:
- ip_port_dict.update({_ip: {node_type: {}}})
- # 有IP时,循环多个参数
- for type2 in interface_type_list:
- python_path = None
- project_path = None
- gunicorn_path = None
- processes = 0
- port_list = []
- interface_type = type2.upper()
- # if interface_type in ["convert".upper()]:
- # _port = params.get(node_type).get(interface_type).get("port")
- # if _port is None:
- # port_list = []
- # else:
- # if interface_type == "convert".upper():
- # processes = params.get(node_type).get(interface_type).get("processes")[j]
- # port_list = [str(_port[j])]*int(processes)
- # # port_list = [str(_port)]
- if interface_type == "path".upper():
- python_path = params.get(node_type).get(interface_type).get("python")[j]
- project_path = params.get(node_type).get(interface_type).get("project")[j]
- gunicorn_path = params.get(node_type).get(interface_type).get("gunicorn")[j]
- else:
- port_start = params.get(node_type).get(interface_type).get("port_start")
- port_no = params.get(node_type).get(interface_type).get("port_no")
- if port_start is None or port_no is None:
- port_list = []
- else:
- if interface_type in ["office".upper()]:
- port_list = [str(x) for x in range(port_start[j], port_start[j] + port_no[j], 1)]
- else:
- port_list = [str(port_start[j])] * port_no[j]
- # if ip_list:
- # for i in range(len(ip_list)):
- # 参数放入dict
- if port_list:
- ip_port_dict.get(_ip).get(node_type).update({interface_type.lower(): port_list})
- if processes:
- ip_port_dict.get(_ip).get(node_type).update({interface_type.lower() + "_processes": processes})
- if project_path and python_path and gunicorn_path:
- ip_port_dict.get(_ip).get(node_type).update({"project_path": project_path,
- "python_path": python_path,
- "gunicorn_path": gunicorn_path})
- # print("ip_port_dict", ip_port_dict)
- return ip_port_dict
- def get_ip_port_old(node_type=None, interface_type=None):
- if node_type is None:
- node_type_list = ["master", "slave"]
- else:
- node_type_list = [node_type]
- if interface_type is None:
- interface_type_list = ["convert", "ocr", "otr", "office", "path"]
- else:
- interface_type_list = [interface_type]
- ip_port_dict = {}
- params = parse_yaml()
- for type1 in node_type_list:
- node_type = type1.upper()
- ip_list = params.get(node_type).get("ip")
- for type2 in interface_type_list:
- interface_type = type2.upper()
- processes = 0
- python_path = None
- project_path = None
- if interface_type in ["convert".upper()]:
- _port = params.get(node_type).get(interface_type).get("port")
- if _port is None:
- port_list = []
- else:
- if interface_type == "convert".upper():
- processes = params.get(node_type).get(interface_type).get("processes")
- port_list = [str(_port)] * int(processes)
- # port_list = [str(_port)]
- elif interface_type == "path".upper():
- python_path = params.get(node_type).get(interface_type).get("python")
- project_path = params.get(node_type).get(interface_type).get("project")
- else:
- port_start = params.get(node_type).get(interface_type).get("port_start")
- port_no = params.get(node_type).get(interface_type).get("port_no")
- if port_start is None or port_no is None:
- port_list = []
- else:
- port_list = [str(x) for x in range(port_start, port_start + port_no, 1)]
- if ip_list:
- for _ip in ip_list:
- if _ip is None:
- continue
- if _ip in ip_port_dict.keys():
- if port_list:
- ip_port_dict.get(_ip).update({interface_type.lower(): port_list})
- else:
- if port_list:
- ip_port_dict[_ip] = {interface_type.lower(): port_list}
- if processes:
- ip_port_dict.get(_ip).update({interface_type.lower() + "_processes": processes})
- if project_path and python_path:
- ip_port_dict.get(_ip).update({"project_path": project_path,
- "python_path": python_path})
- return ip_port_dict
- def get_intranet_ip():
- try:
- # Create a new socket using the given address family,
- # socket type and protocol number.
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # Connect to a remote socket at address.
- # (The format of address depends on the address family.)
- address = ("8.8.8.8", 80)
- s.connect(address)
- # Return the socket’s own address.
- # This is useful to find out the port number of an IPv4/v6 socket, for instance.
- # (The format of the address returned depends on the address family.)
- sockname = s.getsockname()
- ip = sockname[0]
- port = sockname[1]
- finally:
- s.close()
- return ip
- def get_all_ip():
- if get_platform() == "Windows":
- ips = ['127.0.0.1']
- else:
- ips = [ip.split('/')[0] for ip in os.popen("ip addr | grep 'inet '|awk '{print $2}'").readlines()]
- for i in range(len(ips)):
- ips[i] = "http://" + ips[i]
- return ips
- def get_using_ip():
- ip_port_dict = get_ip_port()
- ips = get_all_ip()
- ip = "http://127.0.0.1"
- for key in ip_port_dict.keys():
- if key in ips:
- ip = key
- break
- return ip
- def memory_decorator(func):
- @wraps(func)
- def get_memory_info(*args, **kwargs):
- if get_platform() == "Windows":
- return func(*args, **kwargs)
- # 只有linux有resource包
- # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
- usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024
- start_time = time.time()
- logging.info("----- memory info start - " + func.__qualname__
- + " - " + str(os.getpid())
- + " - " + str(round(usage, 2)) + " GB"
- + " - " + str(round(time.time() - start_time, 2)) + " sec")
- result = func(*args, **kwargs)
- # usage = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
- usage = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024 / 1024
- logging.info("----- memory info end - " + func.__qualname__
- + " - " + str(os.getpid())
- + " - " + str(round(usage, 2)) + " GB"
- + " - " + str(round(time.time() - start_time, 2)) + " sec")
- return result
- return get_memory_info
- def log(msg):
- call_func_name = inspect.currentframe().f_back.f_code.co_name
- logger = get_logger(call_func_name, {"md5": _global.get("md5"),
- "port": _global.get("port")})
- logger.info(msg)
- # logging.info(msg)
- def get_logger(_name, _dict):
- extra = _dict
- _format = '%(asctime)s - %(name)s - %(levelname)s - %(md5)s - %(port)s - %(message)s'
- logger = logging.getLogger(_name)
- create_new_flag = 1
- handlers = logger.handlers
- if handlers:
- for h in handlers:
- if h.formatter.__dict__.get("_fmt") == _format:
- create_new_flag = 0
- break
- if create_new_flag:
- formatter = logging.Formatter(_format)
- handler = logging.StreamHandler()
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- logger.setLevel(logging.INFO)
- logger.propagate = False
- logger = logging.LoggerAdapter(logger, extra)
- return logger
- def set_flask_global():
- # 接口轮询所需锁、参数
- ip_port_flag = {}
- # ip_flag = []
- ip_port_dict = get_ip_port()
- for _k in ip_port_dict.keys():
- ip_port_flag.update({_k: {}})
- for interface in ["ocr", "otr", "convert", "idc", "isr", "atc", 'yolo', "office"]:
- if ip_port_dict.get(_k).get("MASTER"):
- if ip_port_dict.get(_k).get("MASTER").get(interface):
- ip_port_flag[_k][interface] = 0
- else:
- if ip_port_dict.get(_k).get("SLAVE").get(interface):
- ip_port_flag[_k][interface] = 0
- # ip_port_flag.update({_k: {"ocr": 0,
- # "otr": 0,
- # "convert": 0,
- # "idc": 0,
- # "isr": 0,
- # "office": 0
- # }})
- # if ip_port_dict.get(_k).get("MASTER"):
- # ip_flag.append([_k+"_master", 0])
- # if ip_port_dict.get(_k).get("SLAVE"):
- # ip_flag.append([_k+"_slave", 0])
- _global.update({"ip_port_flag": ip_port_flag})
- _global.update({"ip_port": ip_port_dict})
- # _global.update({"ip_flag": ip_flag})
- # print(globals().get("ip_port"))
- def get_md5_from_bytes(_bytes):
- def generate_fp(_b):
- bio = BytesIO()
- bio.write(_b)
- return bio
- _length = 0
- try:
- _md5 = hashlib.md5()
- ff = generate_fp(_bytes)
- ff.seek(0)
- while True:
- data = ff.read(4096)
- if not data:
- break
- _length += len(data)
- _md5.update(data)
- return _md5.hexdigest(), _length
- except Exception as e:
- traceback.print_exc()
- return None, _length
- # def to_share_memory(np_data, name=None):
- # # from multiprocessing.resource_tracker import unregister
- # from multiprocessing import shared_memory
- # if name is None:
- # sm_name = "psm_" + str(os.getpid())
- # else:
- # sm_name = name
- # logging.info("into from_share_memory sm_name " + sm_name)
- # shm = shared_memory.SharedMemory(name=sm_name, create=True, size=np_data.nbytes)
- # # unregister(sm_name, 'shared_memory')
- # sm_data = np.ndarray(np_data.shape, dtype=np_data.dtype, buffer=shm.buf)
- # sm_data[:] = np_data[:] # Copy the original data into shared memory
- #
- # shm.close()
- # del sm_data
- # return shm
- # def from_share_memory(sm_name, _shape, _dtype, if_close=True):
- # from multiprocessing import shared_memory
- # logging.info("into from_share_memory sm_name " + sm_name)
- # shm = shared_memory.SharedMemory(name=sm_name, create=False)
- # b = np.ndarray(_shape, dtype=_dtype, buffer=shm.buf)
- # sm_data = copy.deepcopy(b)
- # b[::] = 0
- #
- # if if_close:
- # try:
- # shm.close()
- # shm.unlink()
- # except Exception:
- # log("file not found! " + sm_name)
- # return sm_data
- # def get_share_memory(sm_name):
- # try:
- # from multiprocessing import shared_memory
- # shm = shared_memory.SharedMemory(name=sm_name, create=False)
- # return shm
- # except:
- # return None
- # def release_share_memory(shm):
- # try:
- # if shm is None:
- # return
- # shm.close()
- # shm.unlink()
- # log(str(shm.name) + " release successfully!")
- # except FileNotFoundError:
- # log(str(shm.name) + " has released!")
- # except Exception as e:
- # traceback.print_exc()
- # def get_share_memory_list(sm_list_name, list_size=None):
- # # from multiprocessing.resource_tracker import unregister
- # from multiprocessing import shared_memory
- # if list_size is None:
- # sm_list = shared_memory.ShareableList(name=sm_list_name)
- # else:
- # sm_list = shared_memory.ShareableList(name=sm_list_name, sequence=["0"]+[' '*2048]*(list_size-2)+["0"])
- # # unregister(sm_list_name, 'shared_memory')
- # return sm_list
- # def close_share_memory_list(sm_list):
- # try:
- # sm_list.shm.close()
- # except Exception:
- # traceback.print_exc()
- def get_np_type(_str):
- _dtype = None
- if _str == 'uint8':
- _dtype = np.uint8
- elif _str == 'float16':
- _dtype = np.float16
- elif _str == 'float32':
- _dtype = np.float32
- logging.info("get_np_type " + _str + " " + str(_dtype))
- return _dtype
- def namespace_to_dict(agrs_or_dict, reverse=False):
- if reverse:
- agrs_or_dict = argparse.Namespace(**agrs_or_dict)
- else:
- agrs_or_dict = vars(agrs_or_dict)
- return agrs_or_dict
- def get_args_from_config(ip_port_dict, ip, arg_type, node_type=None):
- if node_type is None:
- node_type = ["MASTER", "SLAVE"]
- else:
- node_type = [node_type]
- arg_list = []
- for _type in node_type:
- if ip_port_dict.get(ip).get(_type):
- if ip_port_dict.get(ip).get(_type).get(arg_type):
- arg_list.append(ip_port_dict.get(ip).get(_type).get(arg_type))
- return arg_list
- def remove_red_seal(image_np):
- """
- 去除红色印章
- """
- cv2.namedWindow("image_np", 0)
- cv2.resizeWindow("image_np", 1000, 800)
- cv2.imshow("image_np", image_np)
- height, width, c = image_np.shape
- window_h = int(height / 15)
- image_hsv = cv2.cvtColor(image_np, cv2.COLOR_BGR2HSV)
- # 遍历numpy
- red_point_list = []
- image_list = image_np.tolist()
- hsv_dict = {}
- for index_1 in range(len(image_list)):
- for index_2 in range(len(image_list[index_1])):
- h, s, v = image_hsv[index_1][index_2]
- if (0 <= h <= 10 or 156 <= h <= 180) and 43 <= s <= 255 and 46 <= v <= 255:
- key = str(image_hsv[index_1][index_2].tolist())
- red_point_list.append([key, index_1, index_2])
- if hsv_dict.get(key):
- hsv_dict[key] += 1
- else:
- hsv_dict[key] = 1
- # 找出相同最多的hsv值
- hsv_most_key = None
- hsv_most_value = 0
- for hsv in hsv_dict.keys():
- if hsv_dict.get(hsv) > hsv_most_value:
- hsv_most_value = hsv_dict.get(hsv)
- hsv_most_key = hsv
- # print(hsv_dict)
- # 根据hsv判断其填充为黑色还是白色
- hsv_most_key = eval(hsv_most_key)
- for point in red_point_list:
- if abs(eval(point[0])[2] - hsv_most_key[2]) <= 70:
- image_np[point[1]][point[2]][0] = 255
- image_np[point[1]][point[2]][1] = 255
- image_np[point[1]][point[2]][2] = 255
- else:
- image_np[point[1]][point[2]][0] = 0
- image_np[point[1]][point[2]][1] = 0
- image_np[point[1]][point[2]][2] = 0
- cv2.namedWindow("remove_red_seal", 0)
- cv2.resizeWindow("remove_red_seal", 1000, 800)
- cv2.imshow("remove_red_seal", image_np)
- # cv2.imwrite("C:/Users/Administrator/Downloads/1.png", image_np)
- cv2.waitKey(0)
- return image_np
- def pil_resize(image_np, height, width):
- # limit pixels 89478485
- if image_np.shape[0] * image_np.shape[1] * image_np.shape[2] >= 89478485:
- # print("image too large, limit 89478485 pixels", image_np.shape)
- ratio = image_np.shape[0] / image_np.shape[1]
- if image_np.shape[0] >= image_np.shape[1]:
- image_np = cv2.resize(image_np, (int(3000 / ratio), 3000), interpolation=cv2.INTER_AREA)
- else:
- image_np = cv2.resize(image_np, (3000, int(3000 * ratio)), interpolation=cv2.INTER_AREA)
- image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))
- image_pil = image_pil.resize((int(width), int(height)), Image.BICUBIC)
- image_np = cv2.cvtColor(np.asarray(image_pil), cv2.COLOR_RGB2BGR)
- return image_np
- def np2pil(image_np):
- image_pil = Image.fromarray(cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB))
- return image_pil
- def pil2np(image_pil):
- image_np = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)
- return image_np
- def bytes2np(_b):
- try:
- # 二进制数据流转np.ndarray [np.uint8: 8位像素]
- image_np = cv2.imdecode(np.frombuffer(_b, np.uint8), cv2.IMREAD_COLOR)
- # 将rgb转为bgr
- # image_np = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
- return image_np
- except cv2.error as e:
- if "src.empty()" in str(e):
- log("bytes2np image is empty!")
- return None
- except:
- traceback.print_exc()
- return None
- def np2bytes(image_np):
- # numpy转为可序列化的string
- success, img_encode = cv2.imencode(".jpg", image_np)
- # numpy -> bytes
- img_bytes = img_encode.tobytes()
- return img_bytes
- def ocr_cant_read(text_list, box_list):
- """
- 判断ocr因为图片方向无法识别情况
- :param text_list: 文字list
- :param box_list: 文字框list
- :return: bool
- """
- # 无文字及框
- if not text_list or not box_list:
- return True
- # 根据bbox长宽比判断
- box_cnt = 0
- box_flag = 0
- for box in box_list:
- if abs(box[0][1] - box[2][1]) > abs(box[0][0] - box[2][0]):
- box_cnt += 1
- if box_cnt >= int(len(box_list) / 2):
- box_flag = 1
- # 根据识别字数判断
- charac_flag = 0
- charac_set = set()
- for text in text_list:
- charac_set.update(text)
- if len(charac_set) < 40:
- charac_flag = 1
- # 字数少
- if charac_flag:
- result = True
- # 字数多但格子长
- elif box_flag:
- result = True
- else:
- result = False
- log(result)
- return result
- def file_lock(file_name):
- """
- 获取文件排它锁,返回文件句柄,需手动close文件以释放排它锁
- :param file_name:
- :return:
- """
- import fcntl
- if not os.path.exists(file_name):
- with open(file_name, 'w') as f:
- f.write('0')
- file = open(file_name, 'r')
- # 获取排它锁
- fcntl.flock(file.fileno(), fcntl.LOCK_EX)
- return file
- def get_garble_code():
- reg_str = '[ÿÝØÐÙÚÛÜÒÓÔÕÖÊÄẨòóôäåüúîïìþ¡¢£¤§èéêëȟš' + \
- 'Ϸᱦ¼ŒÞ¾Çœø‡Æ�ϐ㏫⮰≧ڝⶹӇⰚڣༀងϦȠ⚓Ⴭᐬ⩔ⅮⰚࡦࣽ' + \
- '䕆㶃䌛㻰䙹䔮㔭䶰䰬䉰䶰䘔䉥喌䶥䶰䛳䉙䄠' + \
- ''.join(['\\x0' + str(x) for x in range(1, 10)]) + \
- ''.join(['\\x' + str(x) for x in range(10, 20)]) + \
- ']'
- return reg_str
- def line_is_cross(A, B, C, D):
- line1 = LineString([A, B])
- line2 = LineString([C, D])
- int_pt = line1.intersection(line2)
- try:
- point_of_intersection = int_pt.x, int_pt.y
- return True
- except:
- return False
- if __name__ == "__main__":
- # strs = r"D:\Project\temp\04384fcc9e8911ecbd2844f971944973\043876ca9e8911eca5e144f971944973_rar\1624114035529.jpeg"
- # print(slash_replace(strs))
- # from matplotlib import pyplot as plt
- # import random
- # fig = plt.figure()
- # plt.xlim(100)
- # plt.ylim(100)
- # fig.add_subplot(111)
- # x0,y0,x1,y1 = (1,2,3,4)
- # plt.gca().add_patch(plt.Rectangle(xy=(x0, y0),
- # width=x1-x0,
- # height=y1-y0,
- # edgecolor=(random.randint(0,255)/255,random.randint(0,255)/255,random.randint(0,255)/255),
- # fill=False, linewidth=2))
- #
- # # plt.show()
- # import cv2
- # import numpy as np
- # img = np.zeros(shape=(1800,1800),dtype=np.uint8)
- # img += 255
- # cv2.imshow("bbox", img)
- # cv2.waitKey(0)
- # print(json.dumps({"data":[1, 2]}))
- # print(parse_yaml())
- print(get_ip_port())
- # set_flask_global()
- # print(get_all_ip())
- print(get_args_from_config(get_ip_port(), get_all_ip()[0], "idc"))
- print(get_args_from_config(get_ip_port(), get_all_ip()[0], "atc"))
- # print(get_args_from_config(get_ip_port(), "http://127.0.0.1", "gunicorn_path"))
- # print(get_intranet_ip())
- # _path = "C:/Users/Administrator/Downloads/3.png"
- # remove_red_seal(cv2.imread(_path))
|