123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- import inspect
- import os
- import sys
- from bs4 import BeautifulSoup
- sys.path.append(os.path.dirname(__file__) + "/../")
- from format_convert.convert_tree import _Document, _Page, _Table, _Sentence
- import logging
- import traceback
- import pandas as pd
- import numpy as np
- import xlrd
- from format_convert.utils import get_logger, log, memory_decorator
- from format_convert.wrapt_timeout_decorator import timeout
- @memory_decorator
- def xlsx2text(path, unique_type_dir):
- log("into xlsx2text")
- try:
- try:
- # sheet_name=None, 即拿取所有sheet,存为dict
- df_dict = pd.read_excel(path, header=None, keep_default_na=False, sheet_name=None)
- except Exception as e:
- log("xlsx format error!")
- return [-3]
- df_list = [sheet for sheet in df_dict.values()]
- sheet_text = ""
- for df in df_list:
- text = '<table border="1">' + "\n"
- for index, row in df.iterrows():
- text = text + "<tr>"
- for r in row:
- text = text + "<td>" + str(r) + "</td>" + "\n"
- # print(text)
- text = text + "</tr>" + "\n"
- text = text + "</table>" + "\n"
- sheet_text += text
- return [sheet_text]
- except Exception as e:
- log("xlsx2text error!")
- traceback.print_exc()
- return [-1]
- class XlsxConvert:
- def __init__(self, path, unique_type_dir, is_xls=False):
- self._doc = _Document(path)
- self.path = path
- self.unique_type_dir = unique_type_dir
- # xls直接用xlrd读取
- self.is_xls = is_xls
- self.workbook = None
- self.sheet_list = []
- # 防止读太多列行
- self.col_limit = 100
- self.row_limit = 2000
- # 防止sheet太多
- self.sheet_limit = 10
- @timeout(30, timeout_exception=TimeoutError, use_signals=False)
- def read(self):
- # xlrd 为了读取合并单元格 或 直接读取xls
- workbook = xlrd.open_workbook(self.path)
- if not self.is_xls:
- # pandas
- # df = pd.read_excel(self.path, header=None, keep_default_na=False, sheet_name=None)
- use_xlrd = 0
- try:
- df = pd.read_excel(self.path, header=None, keep_default_na=False,
- sheet_name=None, usecols=[x for x in range(self.col_limit)],
- nrows=self.row_limit)
- sheet_list = [sheet for sheet in df.values()]
- except:
- traceback.print_exc()
- print('pandas读取xlsx失败')
- use_xlrd = 1
- else:
- use_xlrd = 1
- if use_xlrd:
- # xlrd -> pandas
- data_list = []
- for sheet in workbook.sheets():
- data = []
- # 读取工作表中的内容
- for row_idx in range(sheet.nrows):
- if row_idx >= self.row_limit:
- break
- row = sheet.row_values(row_idx)[:self.col_limit]
- data.append(row)
- # 将读取的数据转换为 pandas DataFrame
- df = pd.DataFrame(data)
- data_list.append(df)
- sheet_list = data_list
- # 使用了定时装饰器,需直接返回结果,直接赋值对象变量无效
- # self.workbook = workbook
- # self.sheet_list = self.sheet_list[:self.sheet_limit]
- return workbook, sheet_list
- def init_package(self):
- # 各个包初始化
- try:
- self.workbook, self.sheet_list = self.read()
- # self.df, self.workbook = self.read()
- # self.sheet_list = [sheet for sheet in self.df.values()]
- # self.re_read = 0
- # for s in self.sheet_list:
- # if s.shape[1] > self.col_limit and s.shape[0] > self.row_limit:
- # self.re_read = 3
- # break
- # elif s.shape[0] > self.row_limit:
- # self.re_read = 2
- # break
- # elif s.shape[1] > self.col_limit:
- # self.re_read = 1
- # break
- # if self.re_read == 3:
- # self.df = pd.read_excel(self.path, header=None, keep_default_na=False,
- # sheet_name=None, usecols=[x for x in range(self.col_limit)],
- # nrows=self.row_limit)
- # if self.re_read == 2:
- # self.df = pd.read_excel(self.path, header=None, keep_default_na=False,
- # sheet_name=None, nrows=self.row_limit)
- # elif self.re_read == 1:
- # self.df = pd.read_excel(self.path, header=None, keep_default_na=False,
- # sheet_name=None, usecols=[x for x in range(self.col_limit)])
- # if self.re_read > 0:
- # self.sheet_list = [sheet for sheet in self.df.values()]
- # print(self.sheet_list[0].shape)
- except:
- if self.is_xls:
- log("cannot open xls!")
- else:
- log("cannot open xlsx!")
- traceback.print_exc()
- self._doc.error_code = [-3]
- def convert(self):
- log('into xlsx_convert')
- # 先判断特殊xlsx文件,可能是html文本
- is_html_xls = False
- try:
- with open(self.path, 'r', encoding='utf-8') as f:
- html_str = f.read()
- soup = BeautifulSoup(html_str, 'lxml')
- text = soup.text
- is_html_xls = True
- except:
- pass
- if is_html_xls:
- self._page = _Page(None, 0)
- _sen = _Sentence(text, (0, 0, 0, 0))
- self._page.add_child(_sen)
- self._doc.add_child(self._page)
- return
- self.init_package()
- if self._doc.error_code is not None:
- return
- sheet_no = 0
- for sheet in self.sheet_list:
- self._page = _Page(None, sheet_no)
- self.convert_page(sheet, sheet_no)
- if self._doc.error_code is None and self._page.error_code is not None:
- self._doc.error_code = self._page.error_code
- self._doc.add_child(self._page)
- sheet_no += 1
- def convert_page_230101(self, sheet):
- text = '<table border="1">' + "\n"
- # 剔除多余空列
- max_row_len = 0
- max_col_len = 0
- if self.re_read:
- for index, row in sheet.iterrows():
- col_len = 0
- row_empty_flag = 1
- for i in range(len(row)):
- if row[i] not in [None, "", np.nan]:
- row_empty_flag = 0
- col_len = i
- if self.re_read == 3 or self.re_read == 1:
- if col_len > max_col_len:
- max_col_len = col_len
- if self.re_read == 3 or self.re_read == 2:
- if row_empty_flag == 0:
- max_row_len = index
- for index, row in sheet.iterrows():
- if self.re_read == 3 or self.re_read == 2:
- if index > max_row_len:
- break
- text = text + "<tr>"
- if self.re_read == 3 or self.re_read == 1:
- row = row[:max_col_len+1]
- for r in row:
- text = text + "<td>" + str(r) + "</td>" + "\n"
- # print(text)
- text = text + "</tr>" + "\n"
- text = text + "</table>" + "\n"
- _table = _Table(text, (0, 0, 0, 0), is_html=True)
- self._page.add_child(_table)
- def convert_page_2405024(self, sheet, sheet_no):
- # 剔除多余空列
- max_row_len = 0
- max_col_len = 0
- if self.re_read:
- for index, row in sheet.iterrows():
- col_len = 0
- row_empty_flag = 1
- for i in range(len(row)):
- if row[i] not in [None, "", np.nan]:
- row_empty_flag = 0
- col_len = i
- if self.re_read == 3 or self.re_read == 1:
- if col_len > max_col_len:
- max_col_len = col_len
- if self.re_read == 3 or self.re_read == 2:
- if row_empty_flag == 0:
- max_row_len = index
- row_list = []
- for index, row in sheet.iterrows():
- if self.re_read == 3 or self.re_read == 2:
- if index > max_row_len:
- break
- if self.re_read == 3 or self.re_read == 1:
- row = row[:max_col_len+1]
- col_list = []
- for r in row:
- col_list.append(str(r))
- row_list.append(col_list)
- # xlrd 获取合并单元格位置
- sheet_xlrd = self.workbook.sheet_by_index(sheet_no)
- merged_cell_list = sheet_xlrd.merged_cells
- merged_cell_list.sort(key=lambda x: (x[0], x[1], x[2], x[3]))
- # print("merged_cell_list", merged_cell_list)
- # 复制填充合并单元格
- for row_start, row_end, col_start, col_end in merged_cell_list:
- if row_start >= len(row_list) or row_end > len(row_list):
- continue
- if col_start >= len(row_list[row_start]) or col_end > len(row_list[row_start]):
- continue
- copy_cell = row_list[row_start][col_start]
- for i in range(row_start, row_end):
- row = row_list[i]
- # 第一行补少一个,其他行需补多一个
- if i == row_start:
- col_start_real = col_start+1
- else:
- col_start_real = col_start
- for j in range(col_start_real, col_end):
- if row[j] == "":
- row[j] = copy_cell
- # 拼接html表格
- text = '<table border="1">' + "\n"
- for row in row_list:
- text = text + "<tr>"
- for col in row:
- text = text + "<td>" + str(col) + "</td>" + "\n"
- text = text + "</tr>" + "\n"
- text = text + "</table>" + "\n"
- _table = _Table(text, (0, 0, 0, 0), is_html=True)
- self._page.add_child(_table)
- def convert_page(self, sheet, sheet_no):
- row_list = self.delete_empty_row_col(sheet)
- # xlrd 获取合并单元格位置
- sheet_xlrd = self.workbook.sheet_by_index(sheet_no)
- merged_cell_list = sheet_xlrd.merged_cells
- merged_cell_list.sort(key=lambda x: (x[0], x[1], x[2], x[3]))
- # print("merged_cell_list", merged_cell_list)
- # # 复制填充合并单元格
- # for row_start, row_end, col_start, col_end in merged_cell_list:
- # if row_start >= len(row_list) or row_end > len(row_list):
- # continue
- # if col_start >= len(row_list[row_start]) or col_end > len(row_list[row_start]):
- # continue
- # copy_cell = row_list[row_start][col_start]
- # for i in range(row_start, row_end):
- # row = row_list[i]
- # # 第一行补少一个,其他行需补多一个
- # if i == row_start:
- # col_start_real = col_start+1
- # else:
- # col_start_real = col_start
- # for j in range(col_start_real, col_end):
- # if row[j] == "":
- # row[j] = copy_cell
- #
- # # 拼接html表格
- # text = '<table border="1">' + "\n"
- # for row in row_list:
- # text = text + "<tr>"
- # for col in row:
- # text = text + "<td>" + str(col) + "</td>" + "\n"
- # text = text + "</tr>" + "\n"
- # text = text + "</table>" + "\n"
- # 保留合并单元格
- table = []
- for row in row_list:
- new_row = []
- # print('row', row)
- for col in row:
- cell = {'text': col, 'rowspan': 1, 'colspan': 1}
- new_row.append(cell)
- table.append(new_row)
- for row_start, row_end, col_start, col_end in merged_cell_list:
- # print('111row_start, row_end, col_start, col_end', row_start, row_end, col_start, col_end)
- row_end = min(row_end, len(table))
- if row_start >= row_end:
- continue
- col_end = min(col_end, len(table[row_start]))
- if col_start >= col_end:
- continue
- # print('len(table)', len(table), 'len(table[row_start])', len(table[row_start]))
- merge_cell = table[row_start][col_start]
- # print('row_start, row_end, col_start, col_end', row_start, row_end, col_start, col_end, merge_cell.get('text'))
- merge_cell['rowspan'] = row_end - row_start
- merge_cell['colspan'] = col_end - col_start
- # 多余的删掉
- for i in range(row_start, row_end):
- row = table[i]
- if i == row_start:
- col_start_real = col_start+1
- else:
- col_start_real = col_start
- for j in range(col_start_real, col_end):
- if row[j].get('text') == "":
- row[j]['delete'] = 1
- # 拼接html表格
- text = '<table border="1">' + "\n"
- for row in table:
- text = text + "<tr>"
- for col in row:
- if col.get('delete'):
- continue
- text = text + "<td rowspan={} colspan={}>{}</td>\n".format(col.get('rowspan'), col.get('colspan'), col.get('text'))
- text = text + "</tr>" + "\n"
- text = text + "</table>" + "\n"
- _table = _Table(text, (0, 0, 0, 0), is_html=True)
- self._page.add_child(_table)
- def delete_empty_row_col(self, sheet):
- # 删除xlsx全为空的行列
- sheet.dropna(how='all', axis=1, inplace=True)
- sheet.dropna(how='all', axis=0, inplace=True)
- # 剔除多余空列
- max_row_len = 0
- max_col_len = 0
- for index, row in sheet.iterrows():
- col_len = 0
- row_empty_flag = 1
- for i in range(len(row)):
- if row[i] not in [None, "", np.nan]:
- row_empty_flag = 0
- col_len = i
- if col_len > max_col_len:
- max_col_len = col_len
- if row_empty_flag == 0:
- max_row_len = index
- row_list = []
- for index, row in sheet.iterrows():
- if index > max_row_len:
- break
- row = row[:max_col_len+1]
- col_list = []
- for r in row:
- col_list.append(str(r))
- row_list.append(col_list)
- return row_list
- def get_html(self):
- try:
- self.convert()
- except:
- traceback.print_exc()
- self._doc.error_code = [-1]
- if self._doc.error_code is not None:
- return self._doc.error_code
- return self._doc.get_html()
|