import inspect import os import sys from bs4 import BeautifulSoup sys.path.append(os.path.dirname(__file__) + "/../") from format_convert.convert_tree import _Document, _Page, _Table, _Sentence import logging import traceback import pandas as pd import numpy as np import xlrd from format_convert.utils import get_logger, log, memory_decorator from format_convert.wrapt_timeout_decorator import timeout @memory_decorator def xlsx2text(path, unique_type_dir): log("into xlsx2text") try: try: # sheet_name=None, 即拿取所有sheet,存为dict df_dict = pd.read_excel(path, header=None, keep_default_na=False, sheet_name=None) except Exception as e: log("xlsx format error!") return [-3] df_list = [sheet for sheet in df_dict.values()] sheet_text = "" for df in df_list: text = '' + "\n" for index, row in df.iterrows(): text = text + "" for r in row: text = text + "" + "\n" # print(text) text = text + "" + "\n" text = text + "
" + str(r) + "
" + "\n" sheet_text += text return [sheet_text] except Exception as e: log("xlsx2text error!") traceback.print_exc() return [-1] class XlsxConvert: def __init__(self, path, unique_type_dir, is_xls=False): self._doc = _Document(path) self.path = path self.unique_type_dir = unique_type_dir # xls直接用xlrd读取 self.is_xls = is_xls self.workbook = None self.sheet_list = [] # 防止读太多列行 self.col_limit = 100 self.row_limit = 2000 # 防止sheet太多 self.sheet_limit = 10 @timeout(30, timeout_exception=TimeoutError, use_signals=False) def read(self): # xlrd 为了读取合并单元格 或 直接读取xls workbook = xlrd.open_workbook(self.path) if not self.is_xls: # pandas # df = pd.read_excel(self.path, header=None, keep_default_na=False, sheet_name=None) use_xlrd = 0 try: df = pd.read_excel(self.path, header=None, keep_default_na=False, sheet_name=None, usecols=[x for x in range(self.col_limit)], nrows=self.row_limit) sheet_list = [sheet for sheet in df.values()] except: traceback.print_exc() print('pandas读取xlsx失败') use_xlrd = 1 else: use_xlrd = 1 if use_xlrd: # xlrd -> pandas data_list = [] for sheet in workbook.sheets(): data = [] # 读取工作表中的内容 for row_idx in range(sheet.nrows): if row_idx >= self.row_limit: break row = sheet.row_values(row_idx)[:self.col_limit] data.append(row) # 将读取的数据转换为 pandas DataFrame df = pd.DataFrame(data) data_list.append(df) sheet_list = data_list # 使用了定时装饰器,需直接返回结果,直接赋值对象变量无效 # self.workbook = workbook # self.sheet_list = self.sheet_list[:self.sheet_limit] return workbook, sheet_list def init_package(self): # 各个包初始化 try: self.workbook, self.sheet_list = self.read() # self.df, self.workbook = self.read() # self.sheet_list = [sheet for sheet in self.df.values()] # self.re_read = 0 # for s in self.sheet_list: # if s.shape[1] > self.col_limit and s.shape[0] > self.row_limit: # self.re_read = 3 # break # elif s.shape[0] > self.row_limit: # self.re_read = 2 # break # elif s.shape[1] > self.col_limit: # self.re_read = 1 # break # if self.re_read == 3: # self.df = pd.read_excel(self.path, header=None, keep_default_na=False, # sheet_name=None, usecols=[x for x in range(self.col_limit)], # nrows=self.row_limit) # if self.re_read == 2: # self.df = pd.read_excel(self.path, header=None, keep_default_na=False, # sheet_name=None, nrows=self.row_limit) # elif self.re_read == 1: # self.df = pd.read_excel(self.path, header=None, keep_default_na=False, # sheet_name=None, usecols=[x for x in range(self.col_limit)]) # if self.re_read > 0: # self.sheet_list = [sheet for sheet in self.df.values()] # print(self.sheet_list[0].shape) except: if self.is_xls: log("cannot open xls!") else: log("cannot open xlsx!") traceback.print_exc() self._doc.error_code = [-3] def convert(self): log('into xlsx_convert') # 先判断特殊xlsx文件,可能是html文本 is_html_xls = False try: with open(self.path, 'r', encoding='utf-8') as f: html_str = f.read() soup = BeautifulSoup(html_str, 'lxml') text = soup.text is_html_xls = True except: pass if is_html_xls: self._page = _Page(None, 0) _sen = _Sentence(text, (0, 0, 0, 0)) self._page.add_child(_sen) self._doc.add_child(self._page) return self.init_package() if self._doc.error_code is not None: return sheet_no = 0 for sheet in self.sheet_list: self._page = _Page(None, sheet_no) self.convert_page(sheet, sheet_no) if self._doc.error_code is None and self._page.error_code is not None: self._doc.error_code = self._page.error_code self._doc.add_child(self._page) sheet_no += 1 def convert_page_230101(self, sheet): text = '' + "\n" # 剔除多余空列 max_row_len = 0 max_col_len = 0 if self.re_read: for index, row in sheet.iterrows(): col_len = 0 row_empty_flag = 1 for i in range(len(row)): if row[i] not in [None, "", np.nan]: row_empty_flag = 0 col_len = i if self.re_read == 3 or self.re_read == 1: if col_len > max_col_len: max_col_len = col_len if self.re_read == 3 or self.re_read == 2: if row_empty_flag == 0: max_row_len = index for index, row in sheet.iterrows(): if self.re_read == 3 or self.re_read == 2: if index > max_row_len: break text = text + "" if self.re_read == 3 or self.re_read == 1: row = row[:max_col_len+1] for r in row: text = text + "" + "\n" # print(text) text = text + "" + "\n" text = text + "
" + str(r) + "
" + "\n" _table = _Table(text, (0, 0, 0, 0), is_html=True) self._page.add_child(_table) def convert_page_2405024(self, sheet, sheet_no): # 剔除多余空列 max_row_len = 0 max_col_len = 0 if self.re_read: for index, row in sheet.iterrows(): col_len = 0 row_empty_flag = 1 for i in range(len(row)): if row[i] not in [None, "", np.nan]: row_empty_flag = 0 col_len = i if self.re_read == 3 or self.re_read == 1: if col_len > max_col_len: max_col_len = col_len if self.re_read == 3 or self.re_read == 2: if row_empty_flag == 0: max_row_len = index row_list = [] for index, row in sheet.iterrows(): if self.re_read == 3 or self.re_read == 2: if index > max_row_len: break if self.re_read == 3 or self.re_read == 1: row = row[:max_col_len+1] col_list = [] for r in row: col_list.append(str(r)) row_list.append(col_list) # xlrd 获取合并单元格位置 sheet_xlrd = self.workbook.sheet_by_index(sheet_no) merged_cell_list = sheet_xlrd.merged_cells merged_cell_list.sort(key=lambda x: (x[0], x[1], x[2], x[3])) # print("merged_cell_list", merged_cell_list) # 复制填充合并单元格 for row_start, row_end, col_start, col_end in merged_cell_list: if row_start >= len(row_list) or row_end > len(row_list): continue if col_start >= len(row_list[row_start]) or col_end > len(row_list[row_start]): continue copy_cell = row_list[row_start][col_start] for i in range(row_start, row_end): row = row_list[i] # 第一行补少一个,其他行需补多一个 if i == row_start: col_start_real = col_start+1 else: col_start_real = col_start for j in range(col_start_real, col_end): if row[j] == "": row[j] = copy_cell # 拼接html表格 text = '' + "\n" for row in row_list: text = text + "" for col in row: text = text + "" + "\n" text = text + "" + "\n" text = text + "
" + str(col) + "
" + "\n" _table = _Table(text, (0, 0, 0, 0), is_html=True) self._page.add_child(_table) def convert_page(self, sheet, sheet_no): row_list = self.delete_empty_row_col(sheet) # xlrd 获取合并单元格位置 sheet_xlrd = self.workbook.sheet_by_index(sheet_no) merged_cell_list = sheet_xlrd.merged_cells merged_cell_list.sort(key=lambda x: (x[0], x[1], x[2], x[3])) # print("merged_cell_list", merged_cell_list) # # 复制填充合并单元格 # for row_start, row_end, col_start, col_end in merged_cell_list: # if row_start >= len(row_list) or row_end > len(row_list): # continue # if col_start >= len(row_list[row_start]) or col_end > len(row_list[row_start]): # continue # copy_cell = row_list[row_start][col_start] # for i in range(row_start, row_end): # row = row_list[i] # # 第一行补少一个,其他行需补多一个 # if i == row_start: # col_start_real = col_start+1 # else: # col_start_real = col_start # for j in range(col_start_real, col_end): # if row[j] == "": # row[j] = copy_cell # # # 拼接html表格 # text = '' + "\n" # for row in row_list: # text = text + "" # for col in row: # text = text + "" + "\n" # text = text + "" + "\n" # text = text + "
" + str(col) + "
" + "\n" # 保留合并单元格 table = [] for row in row_list: new_row = [] # print('row', row) for col in row: cell = {'text': col, 'rowspan': 1, 'colspan': 1} new_row.append(cell) table.append(new_row) for row_start, row_end, col_start, col_end in merged_cell_list: # print('111row_start, row_end, col_start, col_end', row_start, row_end, col_start, col_end) row_end = min(row_end, len(table)) if row_start >= row_end: continue col_end = min(col_end, len(table[row_start])) if col_start >= col_end: continue # print('len(table)', len(table), 'len(table[row_start])', len(table[row_start])) merge_cell = table[row_start][col_start] # print('row_start, row_end, col_start, col_end', row_start, row_end, col_start, col_end, merge_cell.get('text')) merge_cell['rowspan'] = row_end - row_start merge_cell['colspan'] = col_end - col_start # 多余的删掉 for i in range(row_start, row_end): row = table[i] if i == row_start: col_start_real = col_start+1 else: col_start_real = col_start for j in range(col_start_real, col_end): if row[j].get('text') == "": row[j]['delete'] = 1 # 拼接html表格 text = '' + "\n" for row in table: text = text + "" for col in row: if col.get('delete'): continue text = text + "\n".format(col.get('rowspan'), col.get('colspan'), col.get('text')) text = text + "" + "\n" text = text + "
{}
" + "\n" _table = _Table(text, (0, 0, 0, 0), is_html=True) self._page.add_child(_table) def delete_empty_row_col(self, sheet): # 删除xlsx全为空的行列 sheet.dropna(how='all', axis=1, inplace=True) sheet.dropna(how='all', axis=0, inplace=True) # 剔除多余空列 max_row_len = 0 max_col_len = 0 for index, row in sheet.iterrows(): col_len = 0 row_empty_flag = 1 for i in range(len(row)): if row[i] not in [None, "", np.nan]: row_empty_flag = 0 col_len = i if col_len > max_col_len: max_col_len = col_len if row_empty_flag == 0: max_row_len = index row_list = [] for index, row in sheet.iterrows(): if index > max_row_len: break row = row[:max_col_len+1] col_list = [] for r in row: col_list.append(str(r)) row_list.append(col_list) return row_list def get_html(self): try: self.convert() except: traceback.print_exc() self._doc.error_code = [-1] if self._doc.error_code is not None: return self._doc.error_code return self._doc.get_html()