import inspect
import os
import sys
sys.path.append(os.path.dirname(__file__) + "/../")
from format_convert.convert_tree import _Document, _Page, _Table
import logging
import traceback
import pandas as pd
import numpy as np
import xlrd
from format_convert.utils import get_logger, log, memory_decorator
from format_convert.wrapt_timeout_decorator import timeout
@memory_decorator
def xlsx2text(path, unique_type_dir):
log("into xlsx2text")
try:
try:
# sheet_name=None, 即拿取所有sheet,存为dict
df_dict = pd.read_excel(path, header=None, keep_default_na=False, sheet_name=None)
except Exception as e:
log("xlsx format error!")
return [-3]
df_list = [sheet for sheet in df_dict.values()]
sheet_text = ""
for df in df_list:
text = '
' + "\n"
for index, row in df.iterrows():
text = text + ""
for r in row:
text = text + "" + str(r) + " | " + "\n"
# print(text)
text = text + "
" + "\n"
text = text + "
" + "\n"
sheet_text += text
return [sheet_text]
except Exception as e:
log("xlsx2text error!")
traceback.print_exc()
return [-1]
class XlsxConvert:
def __init__(self, path, unique_type_dir, is_xls=False):
self._doc = _Document(path)
self.path = path
self.unique_type_dir = unique_type_dir
# xls直接用xlrd读取
self.is_xls = is_xls
self.workbook = None
self.sheet_list = []
# 防止读太多列行
self.col_limit = 100
self.row_limit = 2000
# 防止sheet太多
self.sheet_limit = 10
@timeout(30, timeout_exception=TimeoutError, use_signals=False)
def read(self):
# xlrd 为了读取合并单元格 或 直接读取xls
workbook = xlrd.open_workbook(self.path)
if not self.is_xls:
# pandas
# df = pd.read_excel(self.path, header=None, keep_default_na=False, sheet_name=None)
df = pd.read_excel(self.path, header=None, keep_default_na=False,
sheet_name=None, usecols=[x for x in range(self.col_limit)],
nrows=self.row_limit)
sheet_list = [sheet for sheet in df.values()]
else:
# xlrd -> pandas
data_list = []
for sheet in workbook.sheets():
data = []
# 读取工作表中的内容
for row_idx in range(sheet.nrows):
if row_idx >= self.row_limit:
break
row = sheet.row_values(row_idx)[:self.col_limit]
data.append(row)
# 将读取的数据转换为 pandas DataFrame
df = pd.DataFrame(data)
data_list.append(df)
sheet_list = data_list
# 使用了定时装饰器,需直接返回结果,直接赋值对象变量无效
# self.workbook = workbook
# self.sheet_list = self.sheet_list[:self.sheet_limit]
return workbook, sheet_list
def init_package(self):
# 各个包初始化
try:
self.workbook, self.sheet_list = self.read()
# self.df, self.workbook = self.read()
# self.sheet_list = [sheet for sheet in self.df.values()]
# self.re_read = 0
# for s in self.sheet_list:
# if s.shape[1] > self.col_limit and s.shape[0] > self.row_limit:
# self.re_read = 3
# break
# elif s.shape[0] > self.row_limit:
# self.re_read = 2
# break
# elif s.shape[1] > self.col_limit:
# self.re_read = 1
# break
# if self.re_read == 3:
# self.df = pd.read_excel(self.path, header=None, keep_default_na=False,
# sheet_name=None, usecols=[x for x in range(self.col_limit)],
# nrows=self.row_limit)
# if self.re_read == 2:
# self.df = pd.read_excel(self.path, header=None, keep_default_na=False,
# sheet_name=None, nrows=self.row_limit)
# elif self.re_read == 1:
# self.df = pd.read_excel(self.path, header=None, keep_default_na=False,
# sheet_name=None, usecols=[x for x in range(self.col_limit)])
# if self.re_read > 0:
# self.sheet_list = [sheet for sheet in self.df.values()]
# print(self.sheet_list[0].shape)
except:
if self.is_xls:
log("cannot open xls!")
else:
log("cannot open xlsx!")
traceback.print_exc()
self._doc.error_code = [-3]
def convert(self):
log('into xlsx_convert')
self.init_package()
if self._doc.error_code is not None:
return
sheet_no = 0
for sheet in self.sheet_list:
self._page = _Page(None, sheet_no)
self.convert_page(sheet, sheet_no)
if self._doc.error_code is None and self._page.error_code is not None:
self._doc.error_code = self._page.error_code
self._doc.add_child(self._page)
sheet_no += 1
def convert_page_230101(self, sheet):
text = '' + "\n"
# 剔除多余空列
max_row_len = 0
max_col_len = 0
if self.re_read:
for index, row in sheet.iterrows():
col_len = 0
row_empty_flag = 1
for i in range(len(row)):
if row[i] not in [None, "", np.nan]:
row_empty_flag = 0
col_len = i
if self.re_read == 3 or self.re_read == 1:
if col_len > max_col_len:
max_col_len = col_len
if self.re_read == 3 or self.re_read == 2:
if row_empty_flag == 0:
max_row_len = index
for index, row in sheet.iterrows():
if self.re_read == 3 or self.re_read == 2:
if index > max_row_len:
break
text = text + ""
if self.re_read == 3 or self.re_read == 1:
row = row[:max_col_len+1]
for r in row:
text = text + "" + str(r) + " | " + "\n"
# print(text)
text = text + "
" + "\n"
text = text + "
" + "\n"
_table = _Table(text, (0, 0, 0, 0), is_html=True)
self._page.add_child(_table)
def convert_page_2405024(self, sheet, sheet_no):
# 剔除多余空列
max_row_len = 0
max_col_len = 0
if self.re_read:
for index, row in sheet.iterrows():
col_len = 0
row_empty_flag = 1
for i in range(len(row)):
if row[i] not in [None, "", np.nan]:
row_empty_flag = 0
col_len = i
if self.re_read == 3 or self.re_read == 1:
if col_len > max_col_len:
max_col_len = col_len
if self.re_read == 3 or self.re_read == 2:
if row_empty_flag == 0:
max_row_len = index
row_list = []
for index, row in sheet.iterrows():
if self.re_read == 3 or self.re_read == 2:
if index > max_row_len:
break
if self.re_read == 3 or self.re_read == 1:
row = row[:max_col_len+1]
col_list = []
for r in row:
col_list.append(str(r))
row_list.append(col_list)
# xlrd 获取合并单元格位置
sheet_xlrd = self.workbook.sheet_by_index(sheet_no)
merged_cell_list = sheet_xlrd.merged_cells
merged_cell_list.sort(key=lambda x: (x[0], x[1], x[2], x[3]))
# print("merged_cell_list", merged_cell_list)
# 复制填充合并单元格
for row_start, row_end, col_start, col_end in merged_cell_list:
if row_start >= len(row_list) or row_end > len(row_list):
continue
if col_start >= len(row_list[row_start]) or col_end > len(row_list[row_start]):
continue
copy_cell = row_list[row_start][col_start]
for i in range(row_start, row_end):
row = row_list[i]
# 第一行补少一个,其他行需补多一个
if i == row_start:
col_start_real = col_start+1
else:
col_start_real = col_start
for j in range(col_start_real, col_end):
if row[j] == "":
row[j] = copy_cell
# 拼接html表格
text = '' + "\n"
for row in row_list:
text = text + ""
for col in row:
text = text + "" + str(col) + " | " + "\n"
text = text + "
" + "\n"
text = text + "
" + "\n"
_table = _Table(text, (0, 0, 0, 0), is_html=True)
self._page.add_child(_table)
def convert_page(self, sheet, sheet_no):
row_list = self.delete_empty_row_col(sheet)
# xlrd 获取合并单元格位置
sheet_xlrd = self.workbook.sheet_by_index(sheet_no)
merged_cell_list = sheet_xlrd.merged_cells
merged_cell_list.sort(key=lambda x: (x[0], x[1], x[2], x[3]))
# print("merged_cell_list", merged_cell_list)
# 复制填充合并单元格
for row_start, row_end, col_start, col_end in merged_cell_list:
if row_start >= len(row_list) or row_end > len(row_list):
continue
if col_start >= len(row_list[row_start]) or col_end > len(row_list[row_start]):
continue
copy_cell = row_list[row_start][col_start]
for i in range(row_start, row_end):
row = row_list[i]
# 第一行补少一个,其他行需补多一个
if i == row_start:
col_start_real = col_start+1
else:
col_start_real = col_start
for j in range(col_start_real, col_end):
if row[j] == "":
row[j] = copy_cell
# 拼接html表格
text = '' + "\n"
for row in row_list:
text = text + ""
for col in row:
text = text + "" + str(col) + " | " + "\n"
text = text + "
" + "\n"
text = text + "
" + "\n"
_table = _Table(text, (0, 0, 0, 0), is_html=True)
self._page.add_child(_table)
def delete_empty_row_col(self, sheet):
# 删除xlsx全为空的行列
sheet.dropna(how='all', axis=1, inplace=True)
sheet.dropna(how='all', axis=0, inplace=True)
# 剔除多余空列
max_row_len = 0
max_col_len = 0
for index, row in sheet.iterrows():
col_len = 0
row_empty_flag = 1
for i in range(len(row)):
if row[i] not in [None, "", np.nan]:
row_empty_flag = 0
col_len = i
if col_len > max_col_len:
max_col_len = col_len
if row_empty_flag == 0:
max_row_len = index
row_list = []
for index, row in sheet.iterrows():
if index > max_row_len:
break
row = row[:max_col_len+1]
col_list = []
for r in row:
col_list.append(str(r))
row_list.append(col_list)
return row_list
def get_html(self):
try:
self.convert()
except:
traceback.print_exc()
self._doc.error_code = [-1]
if self._doc.error_code is not None:
return self._doc.error_code
return self._doc.get_html()