#-*- coding: utf-8 -*- import sys import os sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/../") from format_convert.convert_doc import doc2text, DocConvert from format_convert.convert_docx import docx2text, DocxConvert from format_convert.convert_image import picture2text, ImageConvert from format_convert.convert_pdf import pdf2text, PDFConvert from format_convert.convert_rar import rar2text, RarConvert from format_convert.convert_swf import swf2text, SwfConvert from format_convert.convert_txt import txt2text, TxtConvert from format_convert.convert_xls import xls2text, XlsConvert from format_convert.convert_xlsx import xlsx2text, XlsxConvert from format_convert.convert_zip import zip2text, ZipConvert import hashlib from format_convert import get_memory_info from ocr import ocr_interface from otr import otr_interface import re import shutil import base64 import time import uuid import logging from bs4 import BeautifulSoup logging.getLogger("pdfminer").setLevel(logging.WARNING) from format_convert.table_correct import * import logging from format_convert import timeout_decorator logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') # txt doc docx xls xlsx pdf zip rar swf jpg jpeg png # def judge_error_code(_list, code=[-1, -2, -3, -4, -5, -7]): # for c in code: # if _list == [c]: # return True # return False # # # def set_timeout(signum, frame): # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # print("=======================set_timeout") # # raise TimeoutError # # # def log_traceback(func_name): # logging.info(func_name) # etype, value, tb = sys.exc_info() # for line in traceback.TracebackException( # type(value), value, tb, limit=None).format(chain=True): # logging.info(line) # # # def judge_format(path): # guess1 = mimetypes.guess_type(path) # _type = None # if guess1[0]: # _type = guess1[0] # else: # guess2 = filetype.guess(path) # if guess2: # _type = guess2.mime # # if _type == "application/pdf": # return "pdf" # if _type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": # return "docx" # if _type == "application/x-zip-compressed" or _type == "application/zip": # return "zip" # if _type == "application/x-rar-compressed" or _type == "application/rar": # return "rar" # if _type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": # return "xlsx" # if _type == "application/msword": # return "doc" # if _type == "image/png": # return "png" # if _type == "image/jpeg": # return "jpg" # # # 猜不到,返回None # return None # # # @get_memory_info.memory_decorator # def txt2text(path): # logging.info("into txt2text") # try: # # 判断字符编码 # with open(path, "rb") as ff: # data = ff.read() # encode = chardet.detect(data).get("encoding") # print("txt2text judge code is", encode) # # try: # if encode is None: # logging.info("txt2text cannot judge file code!") # return [-3] # with open(path, "r", encoding=encode) as ff: # txt_text = ff.read() # return [txt_text] # except: # logging.info("txt2text cannot open file with code " + encode) # return [-3] # except Exception as e: # print("txt2text", traceback.print_exc()) # logging.info("txt2text error!") # return [-1] # # # @get_memory_info.memory_decorator # def doc2text(path, unique_type_dir): # logging.info("into doc2text") # try: # # 调用office格式转换 # file_path = from_office_interface(path, unique_type_dir, 'docx') # # if file_path == [-3]: # # return [-3] # if judge_error_code(file_path): # return file_path # # text = docx2text(file_path, unique_type_dir) # return text # except Exception as e: # logging.info("doc2text error!") # print("doc2text", traceback.print_exc()) # # log_traceback("doc2text") # return [-1] # # # @get_memory_info.memory_decorator # def read_xml_order(path, save_path): # logging.info("into read_xml_order") # try: # try: # f = zipfile.ZipFile(path) # for file in f.namelist(): # if "word/document.xml" == str(file): # f.extract(file, save_path) # f.close() # except Exception as e: # # print("docx format error!", e) # logging.info("docx format error!") # return [-3] # # # DOMTree = xml.dom.minidom.parse(save_path + "word/document.xml") # # collection = DOMTree.documentElement # # try: # collection = xml_analyze(save_path + "word/document.xml") # except TimeoutError: # logging.info("read_xml_order timeout") # return [-4] # # body = collection.getElementsByTagName("w:body")[0] # order_list = [] # for line in body.childNodes: # # print(str(line)) # if "w:p" in str(line): # text = line.getElementsByTagName("w:t") # picture = line.getElementsByTagName("wp:docPr") # if text: # order_list.append("w:t") # if picture: # order_list.append("wp:docPr") # # for line1 in line.childNodes: # if "w:r" in str(line1): # # print("read_xml_order", "w:r") # picture1 = line1.getElementsByTagName("w:pict") # if picture1: # order_list.append("wp:docPr") # # if "w:tbl" in str(line): # order_list.append("w:tbl") # read_xml_table(path, save_path) # return order_list # except Exception as e: # logging.info("read_xml_order error!") # print("read_xml_order", traceback.print_exc()) # # log_traceback("read_xml_order") # return [-1] # # # @get_memory_info.memory_decorator # def read_xml_table(path, save_path): # logging.info("into read_xml_table") # try: # # print("into read_xml_table") # try: # f = zipfile.ZipFile(path) # for file in f.namelist(): # if "word/document.xml" == str(file): # f.extract(file, save_path) # f.close() # except Exception as e: # # print("docx format error!", e) # logging.info("docx format error!") # return [-3] # # # DOMTree = xml.dom.minidom.parse(save_path + "word/document.xml") # # collection = DOMTree.documentElement # # try: # collection = xml_analyze(save_path + "word/document.xml") # except TimeoutError: # logging.info("read_xml_table timeout") # return [-4] # # body = collection.getElementsByTagName("w:body")[0] # table_text_list = [] # # print("body.childNodes", body.childNodes) # for line in body.childNodes: # if "w:tbl" in str(line): # # print("str(line)", str(line)) # table_text = '' + "\n" # tr_list = line.getElementsByTagName("w:tr") # # print("line.childNodes", line.childNodes) # tr_index = 0 # tr_text_list = [] # tr_text_list_colspan = [] # for tr in tr_list: # table_text = table_text + "" + "\n" # tc_list = tr.getElementsByTagName("w:tc") # tc_index = 0 # tc_text_list = [] # for tc in tc_list: # tc_text = "" # # # 获取一格占多少列 # col_span = tc.getElementsByTagName("w:gridSpan") # if col_span: # col_span = int(col_span[0].getAttribute("w:val")) # else: # col_span = 1 # # # 获取是否是合并单元格的下一个空单元格 # is_merge = tc.getElementsByTagName("w:vMerge") # if is_merge: # is_merge = is_merge[0].getAttribute("w:val") # if is_merge == "continue": # col_span_index = 0 # real_tc_index = 0 # # # if get_platform() == "Windows": # # print("read_xml_table tr_text_list", tr_text_list) # # print("read_xml_table tr_index", tr_index) # # if 0 <= tr_index - 1 < len(tr_text_list): # for tc_colspan in tr_text_list[tr_index - 1]: # if col_span_index < tc_index: # col_span_index += tc_colspan[1] # real_tc_index += 1 # # # print("tr_index-1, real_tc_index", tr_index-1, real_tc_index) # # print(tr_text_list[tr_index-1]) # if real_tc_index < len(tr_text_list[tr_index - 1]): # tc_text = tr_text_list[tr_index - 1][real_tc_index][0] # # table_text = table_text + "" + "\n" # tc_index += 1 # tc_text_list.append([tc_text, col_span]) # table_text += "" + "\n" # tr_index += 1 # tr_text_list.append(tc_text_list) # table_text += "
" + "\n" # p_list = tc.getElementsByTagName("w:p") # # for p in p_list: # t = p.getElementsByTagName("w:t") # if t: # for tt in t: # # print("tt", tt.childNodes) # if len(tt.childNodes) > 0: # tc_text += tt.childNodes[0].nodeValue # tc_text += "\n" # # table_text = table_text + tc_text + "
" + "\n" # table_text_list.append(table_text) # return table_text_list # # except Exception as e: # logging.info("read_xml_table error") # print("read_xml_table", traceback.print_exc()) # # log_traceback("read_xml_table") # return [-1] # # # @get_memory_info.memory_decorator # @timeout_decorator.timeout(300, timeout_exception=TimeoutError) # def xml_analyze(path): # # 解析xml # DOMTree = xml.dom.minidom.parse(path) # collection = DOMTree.documentElement # return collection # # # def read_docx_table(document): # table_text_list = [] # for table in document.tables: # table_text = "\n" # print("==================") # for row in table.rows: # table_text += "\n" # for cell in row.cells: # table_text += "\n" # table_text += "\n" # table_text += "
" + cell.text + "
\n" # print(table_text) # table_text_list.append(table_text) # return table_text_list # # # @get_memory_info.memory_decorator # def docx2text(path, unique_type_dir): # logging.info("into docx2text") # try: # try: # doc = docx.Document(path) # except Exception as e: # print("docx format error!", e) # print(traceback.print_exc()) # logging.info("docx format error!") # return [-3] # # # 遍历段落 # # print("docx2text extract paragraph") # paragraph_text_list = [] # for paragraph in doc.paragraphs: # if paragraph.text != "": # paragraph_text_list.append("
" + paragraph.text + "
" + "\n") # # print("paragraph_text", paragraph.text) # # # 遍历表 # try: # table_text_list = read_xml_table(path, unique_type_dir) # except TimeoutError: # return [-4] # # if judge_error_code(table_text_list): # return table_text_list # # # 顺序遍历图片 # # print("docx2text extract image") # image_text_list = [] # temp_image_path = unique_type_dir + "temp_image.png" # pattern = re.compile('rId\d+') # for graph in doc.paragraphs: # for run in graph.runs: # if run.text == '': # try: # if not pattern.search(run.element.xml): # continue # content_id = pattern.search(run.element.xml).group(0) # content_type = doc.part.related_parts[content_id].content_type # except Exception as e: # print("docx no image!", e) # continue # if not content_type.startswith('image'): # continue # # # 写入临时文件 # img_data = doc.part.related_parts[content_id].blob # with open(temp_image_path, 'wb') as f: # f.write(img_data) # # # if get_platform() == "Windows": # # print("img_data", img_data) # # if img_data is None: # continue # # # 识别图片文字 # image_text = picture2text(temp_image_path) # if image_text == [-2]: # return [-2] # if image_text == [-1]: # return [-1] # if image_text == [-3]: # continue # # image_text = image_text[0] # image_text_list.append(add_div(image_text)) # # # 解析document.xml,获取文字顺序 # # print("docx2text extract order") # order_list = read_xml_order(path, unique_type_dir) # if order_list == [-2]: # return [-2] # if order_list == [-1]: # return [-1] # # text = "" # print("len(order_list)", len(order_list)) # print("len(paragraph_text_list)", len(paragraph_text_list)) # print("len(image_text_list)", len(image_text_list)) # print("len(table_text_list)", len(table_text_list)) # # # log("docx2text output in order") # for tag in order_list: # if tag == "w:t": # if len(paragraph_text_list) > 0: # text += paragraph_text_list.pop(0) # if tag == "wp:docPr": # if len(image_text_list) > 0: # text += image_text_list.pop(0) # if tag == "w:tbl": # if len(table_text_list) > 0: # text += table_text_list.pop(0) # return [text] # except Exception as e: # # print("docx2text", e, global_type) # logging.info("docx2text error!") # print("docx2text", traceback.print_exc()) # # log_traceback("docx2text") # return [-1] # # # def add_div(text): # if text == "" or text is None: # return text # # if get_platform() == "Windows": # print("add_div", text) # if re.findall("
", text): # return text # # text = "
" + text + "\n" # text = re.sub("\n", "
\n
", text) # # text += "
" # if text[-5:] == "
": # print("add_div has cut", text[-30:]) # text = text[:-5] # return text # # # @get_memory_info.memory_decorator # def pdf2Image(path, save_dir): # logging.info("into pdf2Image") # try: # try: # doc = fitz.open(path) # except Exception as e: # logging.info("pdf format error!") # # print("pdf format error!", e) # return [-3] # # # output_image_list = [] # output_image_dict = {} # page_count = doc.page_count # for page_no in range(page_count): # # 限制pdf页数,只取前10页后10页 # if page_count > 20: # if 10 <= page_no < page_count-10: # # logging.info("pdf2Image: pdf pages count " + str(doc.page_count) # # + ", only get 70 pages") # continue # # try: # page = doc.loadPage(page_no) # output = save_dir + "_page" + str(page_no) + ".png" # rotate = int(0) # # 每个尺寸的缩放系数为1.3,这将为我们生成分辨率提高2.6的图像。 # # 此处若是不做设置,默认图片大小为:792X612, dpi=96 # # (1.33333333 --> 1056x816) (2 --> 1584x1224) # # (1.183, 2.28 --> 1920x1080) # zoom_x = 3. # zoom_y = 3. # # mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) # mat = fitz.Matrix(zoom_x, zoom_y).preRotate(rotate) # pix = page.getPixmap(matrix=mat, alpha=False) # pix.writePNG(output) # pdf_image = cv2.imread(output) # print("pdf_image", page_no, pdf_image.shape) # # output_image_list.append([page_no, output]) # output_image_dict[int(page_no)] = output # except ValueError as e: # traceback.print_exc() # if str(e) == "page not in document": # logging.info("pdf2Image page not in document! continue..." + str(page_no)) # continue # elif "encrypted" in str(e): # logging.info("pdf2Image document need password " + str(page_no)) # return [-7] # except RuntimeError as e: # if "cannot find page" in str(e): # logging.info("pdf2Image page {} not in document! continue... ".format(str(page_no)) + str(e)) # continue # else: # traceback.print_exc() # return [-3] # return [output_image_dict] # # except Exception as e: # logging.info("pdf2Image error!") # print("pdf2Image", traceback.print_exc()) # return [-1] # # # ocr_result_flag = 0 # def image_preprocess(image_np, image_path, use_ocr=True): # logging.info("into image_preprocess") # try: # # 长 宽 # # resize_size = (1024, 768) # # 限制图片大小 # # resize_image(image_path, resize_size) # # # 图片倾斜校正,写入原来的图片路径 # g_r_i = get_rotated_image(image_np, image_path) # if g_r_i == [-1]: # return [-1], [], [], 0 # # # otr需要图片resize, 写入另一个路径 # image_np = cv2.imread(image_path) # best_h, best_w = get_best_predict_size(image_np) # image_resize = cv2.resize(image_np, (best_w, best_h), interpolation=cv2.INTER_AREA) # image_resize_path = image_path[:-4] + "_resize" + image_path[-4:] # cv2.imwrite(image_resize_path, image_resize) # # # 调用otr模型接口 # with open(image_resize_path, "rb") as f: # image_bytes = f.read() # points, split_lines, bboxes, outline_points = from_otr_interface(image_bytes) # if judge_error_code(points): # return points, [], [], 0 # # # 将resize后得到的bbox根据比例还原 # ratio = (image_np.shape[0]/best_h, image_np.shape[1]/best_w) # for i in range(len(bboxes)): # bbox = bboxes[i] # bboxes[i] = [(int(bbox[0][0]*ratio[1]), int(bbox[0][1]*ratio[0])), # (int(bbox[1][0]*ratio[1]), int(bbox[1][1]*ratio[0]))] # for i in range(len(split_lines)): # line = split_lines[i] # split_lines[i] = [(int(line[0][0]*ratio[1]), int(line[0][1]*ratio[0])), # (int(line[1][0]*ratio[1]), int(line[1][1]*ratio[0]))] # for i in range(len(points)): # point = points[i] # points[i] = (int(point[0]*ratio[1]), int(point[1]*ratio[0])) # # for i in range(len(outline_points)): # point = outline_points[i] # outline_points[i] = [(int(point[0][0]*ratio[1]), int(point[0][1]*ratio[0])), # (int(point[1][0]*ratio[1]), int(point[1][1]*ratio[0]))] # # # 查看是否能输出正确框 # for box in bboxes: # cv2.rectangle(image_np, box[0], box[1], (0, 255, 0), 2) # # cv2.namedWindow('bbox', 0) # # cv2.imshow("bbox", image_np) # # cv2.waitKey(0) # # # 调用ocr模型接口 # with open(image_path, "rb") as f: # image_bytes = f.read() # # 有表格 # if len(bboxes) >= 2: # text_list, bbox_list = from_ocr_interface(image_bytes, True) # if judge_error_code(text_list): # return text_list, [], [], 0 # # # for i in range(len(text_list)): # # print(text_list[i], bbox_list[i]) # # 查看是否能输出正确框 # # # for box in bbox_list: # # cv2.rectangle(image_np, (int(box[0][0]), int(box[0][1])), # # (int(box[2][0]), int(box[2][1])), (255, 0, 0), 1) # # cv2.namedWindow('bbox', 0) # # cv2.imshow("bbox", image_np) # # cv2.waitKey(0) # # text, column_list = get_formatted_table(text_list, bbox_list, bboxes, split_lines) # if judge_error_code(text): # return text, [], [], 0 # is_table = 1 # return text, column_list, outline_points, is_table # # # 无表格 # else: # if use_ocr: # text = from_ocr_interface(image_bytes) # if judge_error_code(text): # return text, [], [], 0 # # is_table = 0 # return text, [], [], is_table # else: # is_table = 0 # return None, [], [], is_table # # except Exception as e: # logging.info("image_preprocess error") # print("image_preprocess", traceback.print_exc()) # return [-1], [], [], 0 # # # def get_best_predict_size2(image_np): # sizes = [1280, 1152, 1024, 896, 768, 640, 512, 384, 256, 128] # # min_len = 10000 # best_height = sizes[0] # for height in sizes: # if abs(image_np.shape[0] - height) < min_len: # min_len = abs(image_np.shape[0] - height) # best_height = height # # min_len = 10000 # best_width = sizes[0] # for width in sizes: # if abs(image_np.shape[1] - width) < min_len: # min_len = abs(image_np.shape[1] - width) # best_width = width # # return best_height, best_width # # # def get_best_predict_size(image_np, times=64): # sizes = [] # for i in range(1, 100): # if i*times <= 3000: # sizes.append(i*times) # sizes.sort(key=lambda x: x, reverse=True) # # min_len = 10000 # best_height = sizes[0] # for height in sizes: # if abs(image_np.shape[0] - height) < min_len: # min_len = abs(image_np.shape[0] - height) # best_height = height # # min_len = 10000 # best_width = sizes[0] # for width in sizes: # if abs(image_np.shape[1] - width) < min_len: # min_len = abs(image_np.shape[1] - width) # best_width = width # # return best_height, best_width # # # @get_memory_info.memory_decorator # def pdf2text(path, unique_type_dir): # logging.info("into pdf2text") # try: # # pymupdf pdf to image # save_dir = path.split(".")[-2] + "_" + path.split(".")[-1] # output_image_dict = pdf2Image(path, save_dir) # if judge_error_code(output_image_dict): # return output_image_dict # output_image_dict = output_image_dict[0] # output_image_no_list = list(output_image_dict.keys()) # output_image_no_list.sort(key=lambda x: x) # # # 获取每页pdf提取的文字、表格的列数、轮廓点、是否含表格、页码 # # page_info_list = [] # page_info_dict = {} # has_table_dict = {} # no_table_dict = {} # for page_no in output_image_no_list: # img_path = output_image_dict.get(page_no) # print("pdf page", page_no, "in total", output_image_no_list[-1]) # # 读不出来的跳过 # try: # img = cv2.imread(img_path) # img_size = img.shape # except: # logging.info("pdf2text read image in page fail! continue...") # continue # # # 每张图片处理 # text, column_list, outline_points, is_table = image_preprocess(img, img_path, # use_ocr=False) # if judge_error_code(text): # return text # # # page_info_list.append([text, column_list, outline_points, is_table, # # page_no, img_size]) # page_info = [text, column_list, outline_points, is_table, img_size] # page_info_dict[int(page_no)] = page_info # # 包含table的和不包含table的 # if is_table: # has_table_dict[int(page_no)] = page_info # else: # no_table_dict[int(page_no)] = page_info # # has_table_no_list = list(has_table_dict.keys()) # has_table_no_list.sort(key=lambda x: x) # page_no_list = list(page_info_dict.keys()) # page_no_list.sort(key=lambda x: x) # # # 页码表格连接 # table_connect_list, connect_text_list = page_table_connect(has_table_dict) # if judge_error_code(table_connect_list): # return table_connect_list # # # 连接的页码 # table_connect_page_no_list = [] # for area in connect_text_list: # table_connect_page_no_list.append(area[1]) # print("pdf2text table_connect_list", table_connect_list) # print("connect_text_list", connect_text_list) # # # pdfminer 方式 # try: # fp = open(path, 'rb') # # 用文件对象创建一个PDF文档分析器 # parser = PDFParser(fp) # # 创建一个PDF文档 # doc = PDFDocument(parser) # # 连接分析器,与文档对象 # rsrcmgr = PDFResourceManager() # device = PDFPageAggregator(rsrcmgr, laparams=LAParams()) # interpreter = PDFPageInterpreter(rsrcmgr, device) # # # 判断是否能读pdf # for page in PDFPage.create_pages(doc): # break # except pdfminer.psparser.PSEOF as e: # # pdfminer 读不了空白页的对象,直接使用pymupdf转换出的图片进行ocr识别 # logging.info("pdf2text " + str(e) + " use ocr read pdf!") # text_list = [] # for page_no in page_no_list: # logging.info("pdf2text ocr page_no " + str(page_no)) # page_info = page_info_dict.get(page_no) # # 表格 # if page_info[3]: # # 判断表格是否跨页连接 # area_no = 0 # jump_page = 0 # for area in table_connect_list: # if page_no in area: # # 只记录一次text # if page_no == area[0]: # image_text = connect_text_list[area_no][0] # text_list.append([image_text, page_no, 0]) # jump_page = 1 # area_no += 1 # # # 是连接页的跳过后面步骤 # if jump_page: # continue # # # 直接取text # image_text = page_info_dict.get(page_no)[0] # text_list.append([image_text, page_no, 0]) # # 非表格 # else: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # text_list.append([image_text, page_no, 0]) # # text_list.sort(key=lambda z: z[1]) # text = "" # for t in text_list: # text += t[0] # return [text] # except Exception as e: # logging.info("pdf format error!") # traceback.print_exc() # return [-3] # # text_list = [] # page_no = 0 # pages = PDFPage.create_pages(doc) # pages = list(pages) # page_count = len(pages) # for page in pages: # logging.info("pdf2text pymupdf page_no " + str(page_no)) # # 限制pdf页数,只取前100页 # # if page_no >= 70: # # logging.info("pdf2text: pdf pages only get 70 pages") # # break # if page_count > 20: # if 10 <= page_no < page_count-10: # page_no += 1 # continue # # # 判断页码在含表格页码中,直接拿已生成的text # if page_no in has_table_no_list: # # 判断表格是否跨页连接 # area_no = 0 # jump_page = 0 # for area in table_connect_list: # if page_no in area: # # 只记录一次text # if page_no == area[0]: # image_text = connect_text_list[area_no][0] # text_list.append([image_text, page_no, 0]) # jump_page = 1 # area_no += 1 # # # 是连接页的跳过后面步骤 # if jump_page: # page_no += 1 # continue # # # 直接取text # image_text = has_table_dict.get(page_no)[0] # text_list.append([image_text, page_no, 0]) # page_no += 1 # continue # # # 不含表格的解析pdf # else: # if get_platform() == "Windows": # try: # interpreter.process_page(page) # layout = device.get_result() # except Exception: # logging.info("pdf2text pdfminer read pdf page error! continue...") # continue # # else: # # 设置超时时间 # try: # # 解析pdf中的不含表格的页 # if get_platform() == "Windows": # origin_pdf_analyze = pdf_analyze.__wrapped__ # layout = origin_pdf_analyze(interpreter, page, device) # else: # layout = pdf_analyze(interpreter, page, device) # except TimeoutError as e: # logging.info("pdf2text pdfminer read pdf page time out!") # return [-4] # except Exception: # logging.info("pdf2text pdfminer read pdf page error! continue...") # continue # # # 判断该页有没有文字对象,没有则有可能是有水印 # only_image = 1 # image_count = 0 # for x in layout: # if isinstance(x, LTTextBoxHorizontal): # only_image = 0 # if isinstance(x, LTFigure): # image_count += 1 # # # 如果该页图片数量过多,直接ocr整页识别 # logging.info("pdf2text image_count " + str(image_count)) # if image_count >= 3: # image_text = page_info_dict.get(page_no)[0] # if image_text is None: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # if judge_error_code(image_text): # return image_text # page_info_dict[page_no][0] = image_text # # text_list.append([image_text, page_no, 0]) # page_no += 1 # continue # # order_list = [] # for x in layout: # # 该对象是否是ocr识别 # ocr_flag = 0 # # if get_platform() == "Windows": # # print("x", page_no, x) # print() # # if isinstance(x, LTTextBoxHorizontal): # image_text = x.get_text() # # # 无法识别编码,用ocr # if re.search('[(]cid:[0-9]+[)]', image_text): # print(re.search('[(]cid:[0-9]+[)]', image_text)) # image_text = page_info_dict.get(page_no)[0] # if image_text is None: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # if judge_error_code(image_text): # return image_text # page_info_dict[page_no][0] = image_text # image_text = add_div(image_text) # # order_list.append([image_text, page_no, x.bbox[1]]) # order_list = [[image_text, page_no, x.bbox[1]]] # break # else: # image_text = add_div(image_text) # order_list.append([image_text, page_no, x.bbox[1]]) # continue # # if isinstance(x, LTFigure): # for image in x: # if isinstance(image, LTImage): # try: # print("pdf2text LTImage size", page_no, image.width, image.height) # image_stream = image.stream.get_data() # # # 小的图忽略 # if image.width <= 300 and image.height <= 300: # continue # # # 有些水印导致pdf分割、读取报错 # # if image.width <= 200 and image.height<=200: # # continue # # # img_test = Image.open(io.BytesIO(image_stream)) # # img_test.save('temp/LTImage.jpg') # # # 查看提取的图片高宽,太大则抛错用pdf输出图进行ocr识别 # img_test = Image.open(io.BytesIO(image_stream)) # if img_test.size[1] > 2000 or img_test.size[0] > 1500: # print("pdf2text LTImage stream output size", img_test.size) # raise Exception # # 比较小的图则直接保存用ocr识别 # else: # img_test.save('temp/LTImage.jpg') # with open('temp/LTImage.jpg', "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # if judge_error_code(image_text): # return image_text # # except pdfminer.pdftypes.PDFNotImplementedError: # # with open(output_image_list[page_no], "rb") as ff: # # image_stream = ff.read() # except Exception: # logging.info("pdf2text pdfminer read image in page " + str(page_no) + # " fail! use pymupdf read image...") # print(traceback.print_exc()) # image_text = page_info_dict.get(page_no)[0] # if image_text is None: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # if judge_error_code(image_text): # return image_text # page_info_dict[page_no][0] = image_text # ocr_flag = 1 # # # 判断只拿到了水印图: 无文字输出且只有图片对象 # if image_text == "" and only_image: # # 拆出该页pdf # try: # logging.info("pdf2text guess pdf has watermark") # split_path = get_single_pdf(path, page_no) # except: # # 如果拆分抛异常,则大概率不是水印图,用ocr识别图片 # logging.info("pdf2text guess pdf has no watermark") # image_text = page_info_dict.get(page_no)[0] # if image_text is None: # with open(output_image_dict.get(page_no), "rb") as ff: # image_stream = ff.read() # image_text = from_ocr_interface(image_stream) # order_list.append([image_text, page_no, -1]) # page_info_dict[page_no][0] = image_text # ocr_flag = 1 # continue # if judge_error_code(split_path): # return split_path # # # 调用office格式转换 # file_path = from_office_interface(split_path, unique_type_dir, 'html', 3) # # if file_path == [-3]: # # return [-3] # if judge_error_code(file_path): # return file_path # # # 获取html文本 # image_text = get_html_p(file_path) # if judge_error_code(image_text): # return image_text # # if get_platform() == "Windows": # print("image_text", page_no, x.bbox[1], image_text) # with open("temp" + str(x.bbox[0]) + ".jpg", "wb") as ff: # ff.write(image_stream) # image_text = add_div(image_text) # if ocr_flag: # order_list.append([image_text, page_no, -1]) # else: # order_list.append([image_text, page_no, x.bbox[1]]) # # order_list.sort(key=lambda z: z[2], reverse=True) # # # 有ocr参与识别 # if order_list[-1][2] == -1: # ocr_order_list = [order_list[-1]] # not_ocr_order_list = [] # not_ocr_text = "" # # 去重,因读取失败而重复获取 # for order in order_list: # if order[2] != -1: # not_ocr_order_list.append(order) # not_ocr_text += order[0] # if string_similarity(ocr_order_list[0][0], not_ocr_text) >= 0.85: # order_list = not_ocr_order_list # else: # order_list = ocr_order_list # # for order in order_list: # text_list.append(order) # page_no += 1 # # text = "" # for t in text_list: # # text += add_div(t[0]) # if t[0] is not None: # text += t[0] # return [text] # except UnicodeDecodeError as e: # logging.info("pdf2text pdfminer create pages failed! " + str(e)) # return [-3] # except Exception as e: # logging.info("pdf2text error!") # print("pdf2text", traceback.print_exc()) # return [-1] # # # def string_similarity(str1, str2): # # 去掉
和回车 # str1 = re.sub("
", "", str1) # str1 = re.sub("
", "", str1) # str1 = re.sub("\n", "", str1) # str2 = re.sub("
", "", str2) # str2 = re.sub("
", "", str2) # str2 = re.sub("\n", "", str2) # # print("********************************") # # print("str1", str1) # # print("********************************") # # print("str2", str2) # # print("********************************") # score = difflib.SequenceMatcher(None, str1, str2).ratio() # print("string_similarity", score) # return score # # # @get_memory_info.memory_decorator # @timeout_decorator.timeout(300, timeout_exception=TimeoutError) # def pdf_analyze(interpreter, page, device): # logging.info("into pdf_analyze") # # 解析pdf中的不含表格的页 # pdf_time = time.time() # print("pdf_analyze interpreter process...") # interpreter.process_page(page) # print("pdf_analyze device get_result...") # layout = device.get_result() # logging.info("pdf2text read time " + str(time.time()-pdf_time)) # return layout # # # def get_html_p(html_path): # logging.info("into get_html_p") # try: # with open(html_path, "r") as ff: # html_str = ff.read() # # soup = BeautifulSoup(html_str, 'lxml') # text = "" # for p in soup.find_all("p"): # p_text = p.text # p_text = p_text.strip() # if p.string != "": # text += p_text # text += "\n" # return text # except Exception as e: # logging.info("get_html_p error!") # print("get_html_p", traceback.print_exc()) # return [-1] # # # def get_single_pdf(path, page_no): # logging.info("into get_single_pdf") # try: # # print("path, ", path) # pdf_origin = PdfFileReader(path, strict=False) # # pdf_new = PdfFileWriter() # pdf_new.addPage(pdf_origin.getPage(page_no)) # # path_new = path.split(".")[0] + "_split.pdf" # with open(path_new, "wb") as ff: # pdf_new.write(ff) # return path_new # except PyPDF2.utils.PdfReadError as e: # raise e # except Exception as e: # logging.info("get_single_pdf error! page " + str(page_no)) # print("get_single_pdf", traceback.print_exc()) # raise e # # # def page_table_connect2(has_table_list, page_info_list): # logging.info("into page_table_connect") # try: # # 判断是否有页码的表格相连 # table_connect_list = [] # temp_list = [] # # 离图片顶部或底部距离,页面高度的1/7 # threshold = 7 # # for i in range(1, len(has_table_list)): # page_info = has_table_list[i] # last_page_info = has_table_list[i - 1] # # # 页码需相连 # if page_info[4] - last_page_info[4] == 1: # # # 上一页最后一个区域的列数和下一页第一个区域列数都为0,且相等 # if not last_page_info[1][-1] and not page_info[1][0] and \ # last_page_info[1][-1] == page_info[1][0]: # # # 上一页的轮廓点要离底部一定距离内,下一页的轮廓点要离顶部一定距离内 # if last_page_info[5][0] - last_page_info[2][-1][1][1] \ # <= int(last_page_info[5][0]/threshold) \ # and page_info[2][0][0][1] - 0 \ # <= int(page_info[5][0]/threshold): # temp_list.append(last_page_info[4]) # temp_list.append(page_info[4]) # continue # # # 条件不符合的,存储之前保存的连接页码 # if len(temp_list) > 1: # temp_list = list(set(temp_list)) # temp_list.sort(key=lambda x: x) # table_connect_list.append(temp_list) # temp_list = [] # if len(temp_list) > 1: # temp_list = list(set(temp_list)) # temp_list.sort(key=lambda x: x) # table_connect_list.append(temp_list) # temp_list = [] # # # 连接两页内容 # connect_text_list = [] # for area in table_connect_list: # first_page_no = area[0] # for page in page_info_list: # if page[4] == first_page_no: # area_page_text = str(page[0]) # break # for i in range(1, len(area)): # current_page_no = area[i] # for page in page_info_list: # if page[4] == current_page_no: # current_page_text = str(page[0]) # break # # # 连接两个table # table_prefix = re.finditer('', current_page_text) # index_list = [] # for t in table_prefix: # index_list.append(t.span()) # # delete_index = index_list[0] # current_page_text = current_page_text[:delete_index[0]] \ # + current_page_text[delete_index[1]:] # # table_suffix = re.finditer('
', area_page_text) # index_list = [] # for t in table_suffix: # index_list.append(t.span()) # # delete_index = index_list[-1] # area_page_text = area_page_text[:delete_index[0]] \ # + area_page_text[delete_index[1]:] # area_page_text = area_page_text + current_page_text # connect_text_list.append([area_page_text, area]) # # return table_connect_list, connect_text_list # except Exception as e: # # print("page_table_connect", e) # logging.info("page_table_connect error!") # print("page_table_connect", traceback.print_exc()) # return [-1], [-1] # # # def page_table_connect(has_table_dict): # logging.info("into page_table_connect") # if not has_table_dict: # return [], [] # # try: # # 判断是否有页码的表格相连 # table_connect_list = [] # temp_list = [] # # 离图片顶部或底部距离,页面高度的1/7 # threshold = 7 # page_no_list = list(has_table_dict.keys()) # page_no_list.sort(key=lambda x: x) # for i in range(1, len(page_no_list)): # page_info = has_table_dict.get(page_no_list[i]) # last_page_info = has_table_dict.get(page_no_list[i-1]) # # 页码需相连 # if page_no_list[i] - page_no_list[i-1] == 1: # # 上一页最后一个区域的列数和下一页第一个区域列数都为0,且相等 # if not last_page_info[1][-1] and not page_info[1][0] and \ # last_page_info[1][-1] == page_info[1][0]: # # # 上一页的轮廓点要离底部一定距离内,下一页的轮廓点要离顶部一定距离内 # if last_page_info[4][0] - last_page_info[2][-1][1][1] \ # <= int(last_page_info[4][0]/threshold) \ # and page_info[2][0][0][1] - 0 \ # <= int(page_info[4][0]/threshold): # temp_list.append(page_no_list[i-1]) # temp_list.append(page_no_list[i]) # continue # # # 条件不符合的,存储之前保存的连接页码 # if len(temp_list) > 1: # temp_list = list(set(temp_list)) # temp_list.sort(key=lambda x: x) # table_connect_list.append(temp_list) # temp_list = [] # if len(temp_list) > 1: # temp_list = list(set(temp_list)) # temp_list.sort(key=lambda x: x) # table_connect_list.append(temp_list) # temp_list = [] # # # 连接两页内容 # connect_text_list = [] # for area in table_connect_list: # first_page_no = area[0] # area_page_text = str(has_table_dict.get(first_page_no)[0]) # for i in range(1, len(area)): # current_page_no = area[i] # current_page_text = str(has_table_dict.get(current_page_no)[0]) # # # 连接两个table # table_prefix = re.finditer('', current_page_text) # index_list = [] # for t in table_prefix: # index_list.append(t.span()) # # delete_index = index_list[0] # current_page_text = current_page_text[:delete_index[0]] \ # + current_page_text[delete_index[1]:] # # table_suffix = re.finditer('
', area_page_text) # index_list = [] # for t in table_suffix: # index_list.append(t.span()) # # delete_index = index_list[-1] # area_page_text = area_page_text[:delete_index[0]] \ # + area_page_text[delete_index[1]:] # area_page_text = area_page_text + current_page_text # connect_text_list.append([area_page_text, area]) # # return table_connect_list, connect_text_list # except Exception as e: # # print("page_table_connect", e) # logging.info("page_table_connect error!") # print("page_table_connect", traceback.print_exc()) # return [-1], [-1] # # # @get_memory_info.memory_decorator # def zip2text(path, unique_type_dir): # logging.info("into zip2text") # try: # zip_path = unique_type_dir # # try: # zip_file = zipfile.ZipFile(path) # zip_list = zip_file.namelist() # # print("zip list namelist", zip_list) # # if get_platform() == "Windows": # if os.path.exists(zip_list[0]): # print("zip2text exists") # # # 循环解压文件到指定目录 # file_list = [] # for f in zip_list: # file_list.append(zip_file.extract(f, path=zip_path)) # # zip_file.extractall(path=zip_path) # zip_file.close() # # # 获取文件名 # # file_list = [] # # for root, dirs, files in os.walk(zip_path, topdown=False): # # for name in dirs: # # file_list.append(os.path.join(root, name) + os.sep) # # for name in files: # # file_list.append(os.path.join(root, name)) # # # # # if get_platform() == "Windows": # # # print("file_list", file_list) # # # # # 过滤掉doc缓存文件 # # temp_list = [] # # for f in file_list: # # if re.search("~\$", f): # # continue # # else: # # temp_list.append(f) # # file_list = temp_list # # except Exception as e: # logging.info("zip format error!") # print("zip format error!", traceback.print_exc()) # return [-3] # # # 内部文件重命名 # # file_list = inner_file_rename(file_list) # file_list = rename_inner_files(zip_path) # if judge_error_code(file_list): # return file_list # # if get_platform() == "Windows": # print("============= zip file list") # # print(file_list) # # text = [] # for file in file_list: # if os.path.isdir(file): # continue # # # 无文件后缀,猜格式 # if len(file.split(".")) <= 1: # logging.info(str(file) + " has no type! Guess type...") # _type = judge_format(file) # if _type is None: # logging.info(str(file) + "cannot guess type!") # sub_text = [""] # else: # logging.info(str(file) + " guess type: " + _type) # new_file = str(file) + "." + _type # os.rename(file, new_file) # file = new_file # sub_text = getText(_type, file) # # 有文件后缀,截取 # else: # _type = file.split(".")[-1] # sub_text = getText(_type, file) # # if judge_error_code(sub_text, code=[-3]): # continue # if judge_error_code(sub_text): # return sub_text # # text = text + sub_text # return text # except Exception as e: # logging.info("zip2text error!") # print("zip2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def rar2text(path, unique_type_dir): # logging.info("into rar2text") # try: # rar_path = unique_type_dir # # try: # # shell调用unrar解压 # _signal = os.system("unrar x " + path + " " + rar_path) # print("rar2text _signal", _signal) # # =0, 解压成功 # if _signal != 0: # raise Exception # except Exception as e: # logging.info("rar format error!") # print("rar format error!", e) # return [-3] # # # 获取文件名 # # file_list = [] # # for root, dirs, files in os.walk(rar_path, topdown=False): # # for name in dirs: # # file_list.append(os.path.join(root, name) + os.sep) # # for name in files: # # file_list.append(os.path.join(root, name)) # # if get_platform() == "Windows": # print("============= rar file list") # # # 内部文件重命名 # # file_list = inner_file_rename(file_list) # file_list = rename_inner_files(rar_path) # if judge_error_code(file_list): # return file_list # # text = [] # for file in file_list: # if os.path.isdir(file): # continue # # # 无文件后缀,猜格式 # if len(file.split(".")) <= 1: # logging.info(str(file) + " has no type! Guess type...") # _type = judge_format(file) # if _type is None: # logging.info(str(file) + "cannot guess type!") # sub_text = [""] # else: # logging.info(str(file) + " guess type: " + _type) # new_file = str(file) + "." + _type # os.rename(file, new_file) # file = new_file # sub_text = getText(_type, file) # # 有文件后缀,截取 # else: # _type = file.split(".")[-1] # sub_text = getText(_type, file) # # if judge_error_code(sub_text, code=[-3]): # continue # if judge_error_code(sub_text): # return sub_text # # # print("sub text", sub_text, file, _type) # text = text + sub_text # return text # except Exception as e: # logging.info("rar2text error!") # print("rar2text", traceback.print_exc()) # return [-1] # # # def inner_file_rename(path_list): # logging.info("into inner_file_rename") # try: # # 先过滤文件名中的点 '.' # path_list.sort(key=lambda x: len(x), reverse=True) # for i in range(len(path_list)): # old_path = path_list[i] # # 对于目录,判断最后一级是否需过滤,重命名 # if os.path.isdir(old_path): # ps = old_path.split(os.sep) # old_p = ps[-2] # if '.' in old_p: # new_p = re.sub("\\.", "", old_p) # new_path = "" # for p in ps[:-2]: # new_path += p + os.sep # new_path += new_p + os.sep # # # 重命名,更新 # # print("has .", path_list[i], new_path) # os.rename(old_path, new_path) # for j in range(len(path_list)): # if old_path in path_list[j]: # path_list[j] = re.sub(old_p, new_p, path_list[j]) + os.sep # # # 将path分割,按分割个数排名 # path_len_list = [] # for p in path_list: # p_ss = p.split(os.sep) # temp_p_ss = [] # for pp in p_ss: # if pp == "": # continue # temp_p_ss.append(pp) # p_ss = temp_p_ss # path_len_list.append([p, p_ss, len(p_ss)]) # # # 从路径分割少的开始改名,即从根目录开始改 # path_len_list.sort(key=lambda x: x[2]) # # # for p in path_len_list: # # print("---", p[1]) # # # 判断不用变的目录在第几级 # no_change_level = 0 # loop = 0 # for p_s in path_len_list[0][1]: # if p_s[-4:] == "_rar" or p_s[-4:] == "_zip": # no_change_level += loop # loop = 0 # loop += 1 # no_change_level += 1 # # # 每个 # new_path_list = [] # for path_len in path_len_list: # # 前n个是固定路径 # new_path = "" # for i in range(no_change_level): # new_path += path_len[1][i] + os.sep # old_path = new_path # # if not get_platform() == "Windows": # old_path = os.sep + old_path # new_path = os.sep + new_path # # print("path_len[1][3:]", path_len[1][3:]) # # count = 0 # for p in path_len[1][no_change_level:]: # # 新路径全部转换hash # new_path += str(hash(p)) # # # 最后一个不加os.sep,并且旧路径最后一个不转换hash # if count < len(path_len[1][no_change_level:]) - 1: # old_path += str(hash(p)) + os.sep # new_path += os.sep # else: # old_path += p # count += 1 # # # path是文件夹再加os.sep # if os.path.isdir(path_len[0]): # new_path += os.sep # old_path += os.sep # # path是文件再加文件名后缀 # else: # p_ss = path_len[1][-1].split(".") # if len(p_ss) > 1: # path_suffix = "." + p_ss[-1] # new_path += path_suffix # # print("inner_file_rename", old_path, "to", new_path) # os.rename(old_path, new_path) # new_path_list.append(new_path) # # return new_path_list # except Exception as e: # logging.info("inner_file_rename error!") # print("inner_file_rename", traceback.print_exc()) # return [-1] # # # def rename_inner_files(root_path): # try: # logging.info("into rename_inner_files") # # 获取解压文件夹下所有文件+文件夹,不带根路径 # path_list = [] # for root, dirs, files in os.walk(root_path, topdown=False): # for name in dirs: # p = os.path.join(root, name) + os.sep # p = re.sub(root_path, "", p) # path_list.append(p) # for name in files: # p = os.path.join(root, name) # p = re.sub(root_path, "", p) # path_list.append(p) # # # 按路径长度排序 # path_list.sort(key=lambda x: len(x), reverse=True) # # # 循环改名 # for old_path in path_list: # # 按路径分隔符分割 # ss = old_path.split(os.sep) # # 判断是否文件夹 # is_dir = 0 # file_type = "" # if os.path.isdir(root_path + old_path): # ss = ss[:-1] # is_dir = 1 # else: # if "." in old_path: # file_type = "." + old_path.split(".")[-1] # else: # file_type = "" # # # 最后一级需要用hash改名 # new_path = "" # # new_path = re.sub(ss[-1], str(hash(ss[-1])), old_path) + file_type # current_level = 0 # for s in ss: # # 路径拼接 # if current_level < len(ss) - 1: # new_path += s + os.sep # else: # new_path += str(hash(s)) + file_type # current_level += 1 # # new_ab_path = root_path + new_path # old_ab_path = root_path + old_path # os.rename(old_ab_path, new_ab_path) # # # 重新获取解压文件夹下所有文件+文件夹 # new_path_list = [] # for root, dirs, files in os.walk(root_path, topdown=False): # for name in dirs: # new_path_list.append(os.path.join(root, name) + os.sep) # for name in files: # new_path_list.append(os.path.join(root, name)) # # print("new_path_list", new_path_list) # return new_path_list # except: # traceback.print_exc() # return [-1] # # # @get_memory_info.memory_decorator # def xls2text(path, unique_type_dir): # logging.info("into xls2text") # try: # # 调用libreoffice格式转换 # file_path = from_office_interface(path, unique_type_dir, 'xlsx') # # if file_path == [-3]: # # return [-3] # if judge_error_code(file_path): # return file_path # # text = xlsx2text(file_path, unique_type_dir) # # if text == [-1]: # # return [-1] # # if text == [-3]: # # return [-3] # if judge_error_code(text): # return text # # return text # except Exception as e: # logging.info("xls2text error!") # print("xls2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def xlsx2text(path, unique_type_dir): # logging.info("into xlsx2text") # try: # try: # # sheet_name=None, 即拿取所有sheet,存为dict # df_dict = pandas.read_excel(path, header=None, keep_default_na=False, sheet_name=None) # except Exception as e: # logging.info("xlsx format error!") # # print("xlsx format error!", e) # return [-3] # # df_list = [sheet for sheet in df_dict.values()] # sheet_text = "" # for df in df_list: # text = '' + "\n" # for index, row in df.iterrows(): # text = text + "" # for r in row: # text = text + "" + "\n" # # print(text) # text = text + "" + "\n" # text = text + "
" + str(r) + "
" + "\n" # sheet_text += text # # return [sheet_text] # except Exception as e: # logging.info("xlsx2text error!") # print("xlsx2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def swf2text(path, unique_type_dir): # logging.info("into swf2text") # try: # try: # with open(path, 'rb') as f: # swf_file = SWF(f) # svg_exporter = SVGExporter() # svg = swf_file.export(svg_exporter) # # with open('swf_export.jpg', 'wb') as f: # # f.write(svg.read()) # swf_str = str(svg.getvalue(), encoding='utf-8') # except Exception as e: # logging.info("swf format error!") # traceback.print_exc() # return [-3] # # # 正则匹配图片的信息位置 # result0 = re.finditer(']*)', swf_str) # image_bytes_list = [] # i = 0 # image_path_prefix = path.split(".")[-2] + "_" + path.split(".")[-1] # image_path_list = [] # for r in result0: # # 截取图片信息所在位置 # swf_str0 = swf_str[r.span()[0]:r.span()[1] + 1] # # # 正则匹配得到图片的base64编码 # result1 = re.search('xlink:href="data:(.[^>]*)', swf_str0) # swf_str1 = swf_str0[result1.span()[0]:result1.span()[1]] # reg1_prefix = 'b\'' # result1 = re.search(reg1_prefix + '(.[^\']*)', swf_str1) # swf_str1 = swf_str1[result1.span()[0] + len(reg1_prefix):result1.span()[1]] # # # base64_str -> base64_bytes -> no "\\" base64_bytes -> bytes -> image # base64_bytes_with_double = bytes(swf_str1, "utf-8") # base64_bytes = codecs.escape_decode(base64_bytes_with_double, "hex-escape")[0] # image_bytes = base64.b64decode(base64_bytes) # image_bytes_list.append(image_bytes) # image_path = image_path_prefix + "_page_" + str(i) + ".png" # with open(image_path, 'wb') as f: # f.write(image_bytes) # # image_path_list.append(image_path) # # 正则匹配得到图片的宽高 # # reg2_prefix = 'width="' # # result2 = re.search(reg2_prefix + '(\d+)', swf_str0) # # swf_str2 = swf_str0[result2.span()[0]+len(reg2_prefix):result2.span()[1]] # # width = swf_str2 # # reg2_prefix = 'height="' # # result2 = re.search(reg2_prefix + '(\d+)', swf_str0) # # swf_str2 = swf_str0[result2.span()[0]+len(reg2_prefix):result2.span()[1]] # # height = swf_str2 # i += 1 # # text_list = [] # # print("image_path_list", image_path_list) # for image_path in image_path_list: # text = picture2text(image_path) # # print("text", text) # # if judge_error_code(text, code=[-3]): # continue # if judge_error_code(text): # return text # # text = text[0] # text_list.append(text) # # text = "" # for t in text_list: # text += t # # return [text] # except Exception as e: # logging.info("swf2text error!") # print("swf2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def picture2text(path, html=False): # logging.info("into picture2text") # try: # # 判断图片中表格 # img = cv2.imread(path) # if img is None: # return [-3] # # # if get_platform() == "Windows": # # print("picture2text img", img) # # text, column_list, outline_points, is_table = image_preprocess(img, path) # if judge_error_code(text): # return text # # if text == [-5]: # # return [-5] # # if text == [-2]: # # return [-2] # # if text == [-1]: # # return [-1] # # if html: # text = add_div(text) # return [text] # except Exception as e: # logging.info("picture2text error!") # print("picture2text", traceback.print_exc()) # return [-1] # # # @get_memory_info.memory_decorator # def from_ocr_interface(image_stream, is_table=False): # logging.info("into from_ocr_interface") # try: # base64_stream = base64.b64encode(image_stream) # # # 调用接口 # try: # r = ocr(data=base64_stream, ocr_model=globals().get("global_ocr_model")) # except TimeoutError: # if is_table: # return [-5], [-5] # else: # return [-5] # except requests.exceptions.ConnectionError as e: # if is_table: # return [-2], [-2] # else: # return [-2] # # _dict = r # text_list = eval(_dict.get("text")) # bbox_list = eval(_dict.get("bbox")) # if text_list is None: # text_list = [] # if bbox_list is None: # bbox_list = [] # # if is_table: # return text_list, bbox_list # else: # if text_list and bbox_list: # text = get_sequential_data(text_list, bbox_list, html=True) # if judge_error_code(text): # return text # # if text == [-1]: # # return [-1] # else: # text = "" # return text # except Exception as e: # logging.info("from_ocr_interface error!") # # print("from_ocr_interface", e, global_type) # if is_table: # return [-1], [-1] # else: # return [-1] # # # @get_memory_info.memory_decorator # def from_otr_interface(image_stream): # logging.info("into from_otr_interface") # try: # base64_stream = base64.b64encode(image_stream) # # # 调用接口 # try: # r = otr(data=base64_stream, otr_model=globals().get("global_otr_model")) # except TimeoutError: # return [-5], [-5], [-5], [-5] # except requests.exceptions.ConnectionError as e: # logging.info("from_otr_interface") # print("from_otr_interface", traceback.print_exc()) # return [-2], [-2], [-2], [-2] # # # 处理结果 # _dict = r # points = eval(_dict.get("points")) # split_lines = eval(_dict.get("split_lines")) # bboxes = eval(_dict.get("bboxes")) # outline_points = eval(_dict.get("outline_points")) # # print("from_otr_interface len(bboxes)", len(bboxes)) # if points is None: # points = [] # if split_lines is None: # split_lines = [] # if bboxes is None: # bboxes = [] # if outline_points is None: # outline_points = [] # return points, split_lines, bboxes, outline_points # except Exception as e: # logging.info("from_otr_interface error!") # print("from_otr_interface", traceback.print_exc()) # return [-1], [-1], [-1], [-1] # # # def from_office_interface(src_path, dest_path, target_format, retry_times=1): # try: # # Win10跳出超时装饰器 # if get_platform() == "Windows": # # origin_office_convert = office_convert.__wrapped__ # # file_path = origin_office_convert(src_path, dest_path, target_format, retry_times) # file_path = office_convert(src_path, dest_path, target_format, retry_times) # else: # # 将装饰器包装为一个类,否则多进程Pickle会报错 it's not the same object as xxx 问题, # # timeout_decorator_obj = my_timeout_decorator.TimeoutClass(office_convert, 180, TimeoutError) # # file_path = timeout_decorator_obj.run(src_path, dest_path, target_format, retry_times) # # file_path = office_convert(src_path, dest_path, target_format, retry_times) # # if judge_error_code(file_path): # return file_path # return file_path # except TimeoutError: # logging.info("from_office_interface timeout error!") # return [-5] # except: # logging.info("from_office_interface error!") # print("from_office_interface", traceback.print_exc()) # return [-1] # # # def get_sequential_data(text_list, bbox_list, html=False): # logging.info("into get_sequential_data") # try: # text = "" # order_list = [] # for i in range(len(text_list)): # length_start = bbox_list[i][0][0] # length_end = bbox_list[i][1][0] # height_start = bbox_list[i][0][1] # height_end = bbox_list[i][-1][1] # # print([length_start, length_end, height_start, height_end]) # order_list.append([text_list[i], length_start, length_end, height_start, height_end]) # # text = text + infomation['text'] + "\n" # # if get_platform() == "Windows": # print("get_sequential_data", order_list) # if not order_list: # if get_platform() == "Windows": # print("get_sequential_data", "no order list") # return "" # # # 根据bbox的坐标对输出排序 # order_list.sort(key=lambda x: (x[3], x[1])) # # # 根据bbox分行分列 # # col_list = [] # # height_end = int((order_list[0][4] + order_list[0][3]) / 2) # # for i in range(len(order_list)): # # if height_end - threshold <= order_list[i][3] <= height_end + threshold: # # col_list.append(order_list[i]) # # else: # # row_list.append(col_list) # # col_list = [] # # height_end = int((order_list[i][4] + order_list[i][3]) / 2) # # col_list.append(order_list[i]) # # if i == len(order_list) - 1: # # row_list.append(col_list) # # row_list = [] # used_box = [] # threshold = 5 # for box in order_list: # if box in used_box: # continue # # height_center = (box[4] + box[3]) / 2 # row = [] # for box2 in order_list: # if box2 in used_box: # continue # height_center2 = (box2[4] + box2[3]) / 2 # if height_center - threshold <= height_center2 <= height_center + threshold: # if box2 not in row: # row.append(box2) # used_box.append(box2) # row.sort(key=lambda x: x[0]) # row_list.append(row) # # for row in row_list: # if not row: # continue # if len(row) <= 1: # text = text + row[0][0] + "\n" # else: # sub_text = "" # row.sort(key=lambda x: x[1]) # for col in row: # sub_text = sub_text + col[0] + " " # sub_text = sub_text + "\n" # text += sub_text # # if html: # text = "
" + text # text = re.sub("\n", "
\n
", text) # text += "
" # # if text[-5:] == "
": # # text = text[:-5] # return text # # except Exception as e: # logging.info("get_sequential_data error!") # print("get_sequential_data", traceback.print_exc()) # return [-1] # # # def get_formatted_table(text_list, text_bbox_list, table_bbox_list, split_line): # logging.info("into get_formatted_table") # try: # # 重新定义text_bbox_list,[point, point, text] # text_bbox_list = [[text_bbox_list[i][0], text_bbox_list[i][2], text_list[i]] for i in # range(len(text_bbox_list))] # # 按纵坐标排序 # text_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0])) # # # print("text_bbox_list", text_bbox_list) # # print("table_bbox_list", table_bbox_list) # # # bbox位置 threshold # threshold = 5 # # # 根据split_line分区,可能有个区多个表格 [(), ()] # area_text_bbox_list = [] # area_table_bbox_list = [] # # print("get_formatted_table, split_line", split_line) # for j in range(1, len(split_line)): # last_y = split_line[j - 1][0][1] # current_y = split_line[j][0][1] # temp_text_bbox_list = [] # temp_table_bbox_list = [] # # # 找出该区域下text bbox # for text_bbox in text_bbox_list: # # 计算 text bbox 中心点 # text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, # (text_bbox[1][1] + text_bbox[0][1]) / 2) # if last_y - threshold <= text_bbox_center[1] <= current_y + threshold: # temp_text_bbox_list.append(text_bbox) # area_text_bbox_list.append(temp_text_bbox_list) # # # 找出该区域下table bbox # for table_bbox in table_bbox_list: # # 计算 table bbox 中心点 # table_bbox_center = ((table_bbox[1][0] + table_bbox[0][0]) / 2, # (table_bbox[1][1] + table_bbox[0][1]) / 2) # if last_y < table_bbox_center[1] < current_y: # temp_table_bbox_list.append(table_bbox) # area_table_bbox_list.append(temp_table_bbox_list) # # # for j in range(len(area_text_bbox_list)): # # print("area_text_bbox_list", j, area_text_bbox_list[j]) # # # 对每个区域分别进行两个bbox匹配,生成表格 # area_text_list = [] # area_column_list = [] # for j in range(len(area_text_bbox_list)): # # 每个区域的table bbox 和text bbox # temp_table_bbox_list = area_table_bbox_list[j] # temp_text_bbox_list = area_text_bbox_list[j] # # # 判断该区域有无表格bbox # # 若无表格,将该区域文字连接 # if not temp_table_bbox_list: # # 找出该区域的所有text bbox # only_text_list = [] # only_bbox_list = [] # for text_bbox in temp_text_bbox_list: # only_text_list.append(text_bbox[2]) # only_bbox_list.append([text_bbox[0], text_bbox[1]]) # only_text = get_sequential_data(only_text_list, only_bbox_list, True) # if only_text == [-1]: # return [-1], [-1] # area_text_list.append(only_text) # area_column_list.append(0) # continue # # # 有表格 # # 文本对应的表格格子 # text_in_table = {} # for i in range(len(temp_text_bbox_list)): # text_bbox = temp_text_bbox_list[i] # # # 计算 text bbox 中心点 # text_bbox_center = ((text_bbox[1][0] + text_bbox[0][0]) / 2, # (text_bbox[1][1] + text_bbox[0][1]) / 2) # # # 判断中心点在哪个table bbox中 # for table_bbox in temp_table_bbox_list: # # 中心点在table bbox中,将text写入字典 # if table_bbox[0][0] <= text_bbox_center[0] <= table_bbox[1][0] and \ # table_bbox[0][1] <= text_bbox_center[1] <= table_bbox[1][1]: # if str(table_bbox) in text_in_table.keys(): # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # else: # text_in_table[str(table_bbox)] = text_bbox[2] # break # # # 如果未找到text bbox匹配的table bbox,加大threshold匹配 # # elif (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]+threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]-threshold <= table_bbox[1][1]) or \ # # (table_bbox[0][0] <= text_bbox_center[0]-threshold <= table_bbox[1][0] and # # table_bbox[0][1] <= text_bbox_center[1]+threshold <= table_bbox[1][1]): # # if str(table_bbox) in text_in_table.keys(): # # text_in_table[str(table_bbox)] = text_in_table.get(str(table_bbox)) + text_bbox[2] # # else: # # text_in_table[str(table_bbox)] = text_bbox[2] # # break # # # 对表格格子进行分行分列,并计算总计多少小列 # # 放入坐标 # all_col_list = [] # all_row_list = [] # for i in range(len(temp_table_bbox_list)): # table_bbox = temp_table_bbox_list[i] # # # 放入所有坐标x # if table_bbox[0][0] not in all_col_list: # all_col_list.append(table_bbox[0][0]) # if table_bbox[1][0] not in all_col_list: # all_col_list.append(table_bbox[1][0]) # # # 放入所有坐标y # if table_bbox[0][1] not in all_row_list: # all_row_list.append(table_bbox[0][1]) # if table_bbox[1][1] not in all_row_list: # all_row_list.append(table_bbox[1][1]) # all_col_list.sort(key=lambda x: x) # all_row_list.sort(key=lambda x: x) # # # 分行 # row_list = [] # rows = [] # temp_table_bbox_list.sort(key=lambda x: (x[0][1], x[0][0], x[1][1], x[1][0])) # y_row = temp_table_bbox_list[0][0][1] # for i in range(len(temp_table_bbox_list)): # table_bbox = temp_table_bbox_list[i] # # if y_row - threshold <= table_bbox[0][1] <= y_row + threshold: # rows.append(table_bbox) # else: # y_row = table_bbox[0][1] # if rows: # rows.sort(key=lambda x: x[0][0]) # row_list.append(rows) # rows = [] # rows.append(table_bbox) # # print("*" * 30) # # print(row_list) # # if i == len(temp_table_bbox_list) - 1: # if rows: # rows.sort(key=lambda x: x[0][0]) # row_list.append(rows) # # # 生成表格,包括文字和格子宽度 # area_column = [] # text = '' + "\n" # for row in row_list: # text += "" + "\n" # for col in row: # # 计算bbox y坐标之间有多少其他点,+1即为所占行数 # row_span = 1 # for y in all_row_list: # if col[0][1] < y < col[1][1]: # if y - col[0][1] >= 2 and col[1][1] - y >= 2: # row_span += 1 # # # 计算bbox x坐标之间有多少其他点,+1即为所占列数 # col_span = 1 # for x in all_col_list: # if col[0][0] < x < col[1][0]: # if x - col[0][0] >= 2 and col[1][0] - x >= 2: # col_span += 1 # # text += "" + "\n" # text += "" + "\n" # text += "
" # # if str(col) in text_in_table.keys(): # text += text_in_table.get(str(col)) # else: # text += "" # text += "
" + "\n" # # # 计算最大column # max_col_num = 0 # for row in row_list: # col_num = 0 # for col in row: # col_num += 1 # if max_col_num < col_num: # max_col_num = col_num # # area_text_list.append(text) # area_column_list.append(max_col_num) # # text = "" # if get_platform() == "Windows": # print("get_formatted_table area_text_list", area_text_list) # for area_text in area_text_list: # text += area_text # return text, area_column_list # except Exception as e: # logging.info("get_formatted_table error!") # print("get_formatted_table", traceback.print_exc()) # return [-1], [-1] port_num = [0] def choose_port(): process_num = 4 if port_num[0] % process_num == 0: _url = local_url + ":15011" elif port_num[0] % process_num == 1: _url = local_url + ":15012" elif port_num[0] % process_num == 2: _url = local_url + ":15013" elif port_num[0] % process_num == 3: _url = local_url + ":15014" port_num[0] = port_num[0] + 1 return _url def getText(_type, path_or_stream): print("file type - " + _type) logging.info("file type - " + _type) try: ss = path_or_stream.split(".") unique_type_dir = ss[-2] + "_" + ss[-1] + os.sep except: unique_type_dir = path_or_stream + "_" + _type + os.sep if _type == "pdf": # return pdf2text(path_or_stream, unique_type_dir) return PDFConvert(path_or_stream, unique_type_dir).get_html() if _type == "docx": # return docx2text(path_or_stream, unique_type_dir) return DocxConvert(path_or_stream, unique_type_dir).get_html() if _type == "zip": # return zip2text(path_or_stream, unique_type_dir) return ZipConvert(path_or_stream, unique_type_dir).get_html() if _type == "rar": # return rar2text(path_or_stream, unique_type_dir) return RarConvert(path_or_stream, unique_type_dir).get_html() if _type == "xlsx": # return xlsx2text(path_or_stream, unique_type_dir) return XlsxConvert(path_or_stream, unique_type_dir).get_html() if _type == "xls": # return xls2text(path_or_stream, unique_type_dir) return XlsConvert(path_or_stream, unique_type_dir).get_html() if _type == "doc": # return doc2text(path_or_stream, unique_type_dir) return DocConvert(path_or_stream, unique_type_dir).get_html() if _type == "jpg" or _type == "png" or _type == "jpeg": # return picture2text(path_or_stream) return ImageConvert(path_or_stream, unique_type_dir).get_html() if _type == "swf": # return swf2text(path_or_stream, unique_type_dir) return SwfConvert(path_or_stream, unique_type_dir).get_html() if _type == "txt": # return txt2text(path_or_stream) return TxtConvert(path_or_stream, unique_type_dir).get_html() return [""] def to_html(path, text): with open(path, 'w',encoding="utf8") as f: f.write("") f.write('') f.write("") f.write(text) f.write("") def resize_image(image_path, size): try: image_np = cv2.imread(image_path) # print(image_np.shape) width = image_np.shape[1] height = image_np.shape[0] h_w_rate = height / width # width_standard = 900 # height_standard = 1400 width_standard = size[1] height_standard = size[0] width_new = int(height_standard / h_w_rate) height_new = int(width_standard * h_w_rate) if width > width_standard: image_np = cv2.resize(image_np, (width_standard, height_new)) elif height > height_standard: image_np = cv2.resize(image_np, (width_new, height_standard)) cv2.imwrite(image_path, image_np) # print("resize_image", image_np.shape) return except Exception as e: logging.info("resize_image") print("resize_image", e, global_type) return def remove_red_seal(image_np): """ 去除红色印章 """ # 获得红色通道 blue_c, green_c, red_c = cv2.split(image_np) # 多传入一个参数cv2.THRESH_OTSU,并且把阈值thresh设为0,算法会找到最优阈值 thresh, ret = cv2.threshold(red_c, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) # print("remove_red_seal thresh", thresh) # 实测调整为95%效果好一些 filter_condition = int(thresh * 0.98) thresh1, red_thresh = cv2.threshold(red_c, filter_condition, 255, cv2.THRESH_BINARY) # 把图片转回 3 通道 image_and = np.expand_dims(red_thresh, axis=2) image_and = np.concatenate((image_and, image_and, image_and), axis=-1) # print(image_and.shape) # 膨胀 gray = cv2.cvtColor(image_and, cv2.COLOR_RGB2GRAY) kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) erode = cv2.erode(gray, kernel) cv2.imshow("erode", erode) cv2.waitKey(0) image_and = np.bitwise_and(cv2.bitwise_not(blue_c), cv2.bitwise_not(erode)) result_img = cv2.bitwise_not(image_and) cv2.imshow("remove_red_seal", result_img) cv2.waitKey(0) return result_img def remove_underline(image_np): """ 去除文字下划线 """ # 灰度化 gray = cv2.cvtColor(image_np, cv2.COLOR_BGR2GRAY) # 二值化 binary = cv2.adaptiveThreshold(~gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 15, 10) # Sobel kernel_row = np.array([[-1, -2, -1], [0, 0, 0], [1, 2, 1]], np.float32) kernel_col = np.array([[-1, 0, 1], [-2, 0, 2], [-1, 0, 1]], np.float32) # binary = cv2.filter2D(binary, -1, kernel=kernel) binary_row = cv2.filter2D(binary, -1, kernel=kernel_row) binary_col = cv2.filter2D(binary, -1, kernel=kernel_col) cv2.imshow("custom_blur_demo", binary) cv2.waitKey(0) rows, cols = binary.shape # 识别横线 scale = 5 kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (cols // scale, 1)) erodedcol = cv2.erode(binary_row, kernel, iterations=1) cv2.imshow("Eroded Image", erodedcol) cv2.waitKey(0) dilatedcol = cv2.dilate(erodedcol, kernel, iterations=1) cv2.imshow("dilate Image", dilatedcol) cv2.waitKey(0) return def getMDFFromFile(path): _length = 0 try: _md5 = hashlib.md5() with open(path, "rb") as ff: while True: data = ff.read(4096) if not data: break _length += len(data) _md5.update(data) return _md5.hexdigest(), _length except Exception as e: traceback.print_exc() return None, _length def add_html_format(text_list): new_text_list = [] for t in text_list: html_t = "\n" html_t += '\n' html_t += "\n" html_t += t html_t += "\n\n" new_text_list.append(html_t) return new_text_list @timeout_decorator.timeout(1200, timeout_exception=TimeoutError) def unique_temp_file_process(stream, _type): logging.info("into unique_temp_file_process") try: # 每个调用在temp中创建一个唯一空间 uid1 = uuid.uuid1().hex unique_space_path = _path + os.sep + "temp" + os.sep + uid1 + os.sep # unique_space_path = "/mnt/fangjiasheng/" + "temp/" + uid1 + "/" # 判断冲突 if not os.path.exists(unique_space_path): if not os.path.exists(_path + os.sep + "temp"): os.mkdir(_path + os.sep + "temp" + os.sep) os.mkdir(unique_space_path) else: uid2 = uuid.uuid1().hex if not os.path.exists(_path + os.sep + "temp"): os.mkdir(_path + os.sep + "temp" + os.sep) os.mkdir(_path + os.sep + "temp" + os.sep + uid2 + os.sep) # os.mkdir("/mnt/" + "temp/" + uid2 + "/") # 在唯一空间中,对传入的文件也保存为唯一 uid3 = uuid.uuid1().hex file_path = unique_space_path + uid3 + "." + _type with open(file_path, "wb") as ff: ff.write(stream) # 跳过一些编号 pass_md5 = getMDFFromFile(file_path) print("getMDFFromFile", pass_md5) if pass_md5 == '84dba5a65339f338d3ebdf9f33fae13e'\ or pass_md5 == '3d9f9f4354582d85b21b060ebd5786db'\ or pass_md5 == 'b52da40f24c6b29dfc2ebeaefe4e41f1' \ or pass_md5 == 'eefb925b7ccec1467be20b462fde2a09': raise Exception text = getText(_type, file_path) return text except Exception as e: # print("Convert error! Delete temp file. ", e, global_type) logging.info("unique_temp_file_process") print("unique_temp_file_process:", traceback.print_exc()) return [-1] finally: print("======================================") print("File md5:", getMDFFromFile(file_path)) try: if get_platform() == "Linux": # 删除该唯一空间下所有文件 if os.path.exists(unique_space_path): shutil.rmtree(unique_space_path) print() except Exception as e: logging.info("Delete Files Failed!") # print("Delete Files Failed!") return [-1] print("Finally") # to_html(_path + "6.html", text[0]) # to_html(unique_space_path + "result.html", text[0]) # return text logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) def log(msg): """ @summary:打印信息 """ logger.info(msg) def cut_str(text_list, only_text_list, max_bytes_length=2000000): logging.info("into cut_str") try: # 计算有格式总字节数 bytes_length = 0 for text in text_list: bytes_length += len(bytes(text, encoding='utf-8')) print("text_list", bytes_length) # 小于直接返回 if bytes_length < max_bytes_length: print("return text_list no cut") return text_list # 全部文件连接,重新计算无格式字节数 all_text = "" bytes_length = 0 for text in only_text_list: bytes_length += len(bytes(text, encoding='utf-8')) all_text += text # print("only_text_list", bytes_length) # 小于直接返回 if bytes_length < max_bytes_length: print("return only_text_list no cut") return only_text_list # 截取字符 all_text = all_text[:int(max_bytes_length/3)] print("text bytes ", len(bytes(all_text, encoding='utf-8'))) print("return only_text_list has cut") return [all_text] except Exception as e: logging.info("cut_str " + str(e)) return ["-1"] @get_memory_info.memory_decorator def convert(data, ocr_model, otr_model): """ 接口返回值: {[str], 1}: 处理成功 {[-1], 0}: 逻辑处理错误 {[-2], 0}: 接口调用错误 {[-3], 1}: 文件格式错误,无法打开 {[-4], 0}: 各类文件调用第三方包读取超时 {[-5], 0}: 整个转换过程超时 {[-6], 0}: 阿里云UDF队列超时 {[-7], 1}: 文件需密码,无法打开 :return: {"result": [], "is_success": int} """ # 控制内存 # soft, hard = resource.getrlimit(resource.RLIMIT_AS) # resource.setrlimit(resource.RLIMIT_AS, (15 * 1024 ** 3, hard)) logging.info("into convert") start_time = time.time() try: # 模型加入全局变量 globals().update({"global_ocr_model": ocr_model}) globals().update({"global_otr_model": otr_model}) stream = base64.b64decode(data.get("file")) _type = data.get("type") if get_platform() == "Windows": # 解除超时装饰器,直接访问原函数 origin_unique_temp_file_process = unique_temp_file_process.__wrapped__ text = origin_unique_temp_file_process(stream, _type) else: # Linux 通过装饰器设置整个转换超时时间 try: text = unique_temp_file_process(stream, _type) except TimeoutError: logging.info("convert time out! 1200 sec") text = [-5] # if text == [-1]: # print({"failed result": [-1], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-1"], "result_text": ["-1"], "is_success": 0} # if text == [-2]: # print({"failed result": [-2], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-2"], "result_text": ["-2"], "is_success": 0} # if text == [-3]: # print({"failed result": [-3], "is_success": 1}, time.time() - start_time) # return {"result_html": ["-3"], "result_text": ["-3"], "is_success": 1} # if text == [-4]: # print({"failed result": [-4], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-4"], "result_text": ["-4"], "is_success": 0} # if text == [-5]: # print({"failed result": [-5], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-5"], "result_text": ["-5"], "is_success": 0} # if text == [-7]: # print({"failed result": [-7], "is_success": 1}, time.time() - start_time) # return {"result_html": ["-7"], "result_text": ["-7"], "is_success": 1} # if text == [-8]: # print({"failed result": [-8], "is_success": 0}, time.time() - start_time) # return {"result_html": ["-8"], "result_text": ["-8"], "is_success": 1} error_code = [[-x] for x in range(1, 9)] still_success_code = [[-3], [-7]] if text in error_code: if text in still_success_code: print({"failed result": text, "is_success": 1}, time.time() - start_time) return {"result_html": [str(text[0])], "result_text": [str(text[0])], "is_success": 1} else: print({"failed result": text, "is_success": 0}, time.time() - start_time) return {"result_html": [str(text[0])], "result_text": [str(text[0])], "is_success": 0} # 结果保存result.html # if get_platform() == "Windows": text_str = "" for t in text: text_str += t to_html("../result.html", text_str) # 取纯文本 only_text = [] for t in text: new_t = BeautifulSoup(t, "lxml").get_text() new_t = re.sub("\n", "", new_t) only_text.append(new_t) # 判断长度,过长截取 text = cut_str(text, only_text) only_text = cut_str(only_text, only_text) if len(only_text) == 0: only_text = [""] if only_text[0] == '' and len(only_text) <= 1: print({"finished result": ["", 0], "is_success": 1}, time.time() - start_time) else: print({"finished result": [str(only_text)[:20], len(str(text))], "is_success": 1}, time.time() - start_time) return {"result_html": text, "result_text": only_text, "is_success": 1} except Exception as e: print({"failed result": [-1], "is_success": 0}, time.time() - start_time) print("convert", traceback.print_exc()) return {"result_html": ["-1"], "result_text": ["-1"], "is_success": 0} global_type = "" local_url = "http://127.0.0.1" if get_platform() == "Windows": _path = os.path.abspath(os.path.dirname(__file__)) else: _path = "/home/admin" if not os.path.exists(_path): _path = os.path.dirname(os.path.abspath(__file__)) if __name__ == '__main__': if get_platform() == "Windows": # file_path = "C:/Users/Administrator/Desktop/error13.pdf" # file_path = "D:/BIDI_DOC/比地_文档/2022/Test_Interface/1622529434414.rar" file_path = "D:/BIDI_DOC/比地_文档/2022/Test_ODPS/1624267820641.pdf" else: file_path = "1.doc" with open(file_path, "rb") as f: file_bytes = f.read() file_base64 = base64.b64encode(file_bytes) data = {"file": file_base64, "type": file_path.split(".")[-1], "filemd5": 100} ocr_model = ocr_interface.OcrModels().get_model() otr_model = otr_interface.OtrModels().get_model() result = convert(data, ocr_model, otr_model)