import inspect
import os
import sys
sys.path.append(os.path.dirname(__file__) + "/../")
from format_convert.convert_tree import _Document, _Sentence, _Page, _Image, _Table
import logging
import re
import traceback
import xml
import zipfile
import docx
from format_convert.convert_image import picture2text
from format_convert.utils import judge_error_code, add_div, get_logger, log, memory_decorator, get_garble_code
from format_convert.wrapt_timeout_decorator import timeout
def docx2text():
return
@timeout(50, timeout_exception=TimeoutError)
def read_xml_order(path, save_path):
log("into read_xml_order")
try:
try:
f = zipfile.ZipFile(path)
for file in f.namelist():
if "word/document.xml" == str(file):
f.extract(file, save_path)
f.close()
except Exception as e:
log("docx format error!")
return [-3]
try:
collection = xml_analyze(save_path + "word/document.xml")
except TimeoutError:
log("xml_analyze timeout")
return [-4]
body = collection.getElementsByTagName("w:body")[0]
order_list = []
text_list = []
# 编号组记录
num_pr_dict = {}
last_node_level = 0
for line in body.childNodes:
# print(str(line))
if "w:p" in str(line):
# 文本的编号(如果有编号的话)
text_no = ''
# 提取编号 组-层级-序号
num_pr = line.getElementsByTagName("w:numPr")
if num_pr:
num_pr = num_pr[0]
group_id = int(num_pr.getElementsByTagName("w:numId")[0].getAttribute("w:val"))
if group_id >= 1:
node_level = num_pr.getElementsByTagName("w:ilvl")
if node_level:
node_level = int(node_level[0].getAttribute("w:val"))
# print('node_level', node_level, 'last_node_level', last_node_level)
if group_id in num_pr_dict.keys():
if last_node_level != 0 and node_level < last_node_level:
# print('重置', 'group_id', group_id, 'last_node_level', last_node_level)
# 需循环重置node_level到last_node_level之间的level
for l in range(node_level+1, last_node_level+1):
num_pr_dict[group_id][l] = 0
num_pr_dict[group_id][node_level] += 1
elif node_level in num_pr_dict[group_id].keys():
num_pr_dict[group_id][node_level] += 1
else:
num_pr_dict[group_id][node_level] = 1
else:
num_pr_dict[group_id] = {node_level: 1}
# print(num_pr_dict[group_id])
for level in range(node_level+1):
# 当前level下有多少个node
if level not in num_pr_dict[group_id]:
continue
level_node_cnt = num_pr_dict[group_id][level]
# print('level_node_cnt', level_node_cnt)
text_no += str(level_node_cnt) + '.'
last_node_level = node_level
# print('read_xml_order text_no', text_no)
text = line.getElementsByTagName("w:t")
picture = line.getElementsByTagName("wp:docPr")
if text:
order_list.append("w:t")
temp_text = ""
for t in text:
if len(t.childNodes) > 0:
temp_text += t.childNodes[0].nodeValue
else:
continue
if text_no:
temp_text = text_no + ' ' + temp_text
text_list.append(temp_text)
if picture:
order_list.append("wp:docPr")
for line1 in line.childNodes:
if "w:r" in str(line1):
# print("read_xml_order", "w:r")
picture1 = line1.getElementsByTagName("w:pict")
if picture1:
order_list.append("wp:docPr")
if "w:tbl" in str(line):
order_list.append("w:tbl")
# read_xml_table(path, save_path)
return [order_list, text_list]
except Exception as e:
log("read_xml_order error!")
print("read_xml_order", traceback.print_exc())
# log_traceback("read_xml_order")
return [-1]
@timeout(50, timeout_exception=TimeoutError)
def read_xml_table(path, save_path):
def recursion_read_table(table):
table_text = '
'
tr_index = 0
tr_text_list = []
# 直接子节点用child表示,所有子节点用all表示
for table_child in table.childNodes:
if 'w:tr' in str(table_child):
tr = table_child
tr_child_nodes = tr.childNodes
tc_index = 0
tc_text_list = []
for tr_child in tr_child_nodes:
if 'w:tc' in str(tr_child).split(' '):
tc_text = ""
tc = tr_child
# 获取一格占多少列,相当于colspan
col_span = tc.getElementsByTagName("w:gridSpan")
if col_span:
col_span = int(col_span[0].getAttribute("w:val"))
else:
col_span = 1
# 获取是否是合并单元格的下一个空单元格,相当于rowspan
is_merge = tc.getElementsByTagName("w:vMerge")
if is_merge:
is_merge = is_merge[0].getAttribute("w:val")
if is_merge == "continue":
col_span_index = 0
real_tc_index = 0
if 0 <= tr_index - 1 < len(tr_text_list):
for tc_colspan in tr_text_list[tr_index - 1]:
if col_span_index < tc_index:
col_span_index += tc_colspan[1]
real_tc_index += 1
if real_tc_index < len(tr_text_list[tr_index - 1]):
tc_text = tr_text_list[tr_index - 1][real_tc_index][0]
# 设置colspan
table_text = table_text + "| "
# 放入文本
tc_child_nodes = tc.childNodes
for tc_child in tc_child_nodes:
if 'w:tbl' in str(tc_child).split(' '):
# 嵌套在tc中的表格
tc_text += recursion_read_table(tc_child)
if 'w:p' in str(tc_child).split(' '):
tc_p_all_nodes = tc_child.getElementsByTagName("*")
for tc_p_all in tc_p_all_nodes:
if 'w:t' in str(tc_p_all).split(' '):
# w:t必须加childNodes[0]才能读文本
tc_text += tc_p_all.childNodes[0].nodeValue
# 结束该tc
table_text = table_text + tc_text + " | "
tc_index += 1
tc_text_list.append([tc_text, col_span])
# 结束该tr
table_text += ""
tr_index += 1
tr_text_list.append(tc_text_list)
# 结束该table
table_text += "
"
return table_text
log("into read_xml_table")
try:
try:
f = zipfile.ZipFile(path)
for file in f.namelist():
if "word/document.xml" == str(file):
f.extract(file, save_path)
f.close()
except Exception as e:
# print("docx format error!", e)
log("docx format error!")
return [-3]
log("xml_analyze%s"%(save_path))
try:
collection = xml_analyze(save_path + "word/document.xml")
except TimeoutError:
log("xml_analyze timeout")
return [-4]
log("xml_analyze done")
body = collection.getElementsByTagName("w:body")[0]
table_text_list = []
body_nodes = body.childNodes
for node in body_nodes:
if 'w:tbl' in str(node).split(' '):
_table = node
_table_text = recursion_read_table(_table)
table_text_list.append(_table_text)
return table_text_list
except Exception as e:
log("read_xml_table error")
print("read_xml_table", traceback.print_exc())
return [-1]
@timeout(25, timeout_exception=TimeoutError)
def xml_analyze(path):
# 解析xml
DOMTree = xml.dom.minidom.parse(path)
collection = DOMTree.documentElement
return collection
def read_docx_table(document):
table_text_list = []
for table in document.tables:
table_text = ""
# print("==================")
for row in table.rows:
table_text += ""
for cell in row.cells:
table_text += "| " + re.sub("\s","",str(cell.text)) + " | "
table_text += "
"
table_text += "
"
# print(table_text)
table_text_list.append(table_text)
return table_text_list
class DocxConvert:
def __init__(self, path, unique_type_dir):
self._doc = _Document(path)
self.path = path
self.unique_type_dir = unique_type_dir
@memory_decorator
def init_package(self):
# 各个包初始化
try:
self.docx = docx.Document(self.path)
self.zip = zipfile.ZipFile(self.path)
except:
log("cannot open docx!")
traceback.print_exc()
self._doc.error_code = [-3]
def convert(self):
self.init_package()
if self._doc.error_code is not None:
return
order_and_text_list = self.get_orders()
if judge_error_code(order_and_text_list):
self._doc.error_code = order_and_text_list
return
order_list, text_list = order_and_text_list
self._page = _Page(None, 0)
# 乱码返回文件格式错误
match1 = re.findall(get_garble_code(), ''.join(text_list))
if len(match1) > 10:
log("doc/docx garbled code!")
# self._doc.error_code = [-3]
_sen = _Sentence('文件乱码!', (0, 0, 0, 0))
self._page.add_child(_sen)
self._doc.add_child(self._page)
return
# test
# for i in range(len(text_list)):
# print(order_list[i], text_list[i])
table_list = self.get_tables()
if judge_error_code(table_list):
self._doc.error_code = table_list
return
# paragraph_list = self.get_paragraphs()
image_list = self.get_images()
order_y = 0
doc_pr_cnt = 0
for tag in order_list:
bbox = (0, order_y, 0, 0)
if tag == "w:t":
if len(text_list) > 0:
_para = text_list.pop(0)
_sen = _Sentence(_para, bbox)
_sen.combine=False
self._page.add_child(_sen)
if tag == "wp:docPr":
if len(image_list) > 0:
temp_image_path = self.unique_type_dir + "docpr" + str(doc_pr_cnt) + ".png"
_image = image_list.pop(0)
with open(temp_image_path, "wb") as f:
f.write(_image)
_img = _Image(_image, temp_image_path, bbox)
_img.is_from_docx = True
self._page.add_child(_img)
doc_pr_cnt += 1
if tag == "w:tbl":
if len(table_list) > 0:
_table = table_list.pop(0)
_table = _Table(_table, bbox)
_table.is_html = True
self._page.add_child(_table)
order_y += 1
if self._doc.error_code is None and self._page.error_code is not None:
self._doc.error_code = self._page.error_code
self._doc.add_child(self._page)
def get_paragraphs(self):
# 遍历段落
paragraph_list = []
for paragraph in self.docx.paragraphs:
if paragraph.text != "":
paragraph_list.append(paragraph.text)
return paragraph_list
@memory_decorator
def get_tables(self):
# 遍历表
table_list = read_xml_table(self.path, self.unique_type_dir)
return table_list
def get_images(self):
# 顺序遍历图片
image_list = []
pattern = re.compile('rId\d+')
for graph in self.docx.paragraphs:
for run in graph.runs:
if run.text == '':
try:
if not pattern.search(run.element.xml):
continue
content_id = pattern.search(run.element.xml).group(0)
content_type = self.docx.part.related_parts[content_id].content_type
except Exception as e:
print("docx no image!", e)
continue
if not content_type.startswith('image'):
continue
img_data = self.docx.part.related_parts[content_id].blob
if img_data is not None:
image_list.append(img_data)
return image_list
@memory_decorator
def get_orders(self):
# 解析document.xml,获取文字顺序
order_and_text_list = read_xml_order(self.path, self.unique_type_dir)
return order_and_text_list
def get_doc_object(self):
return self._doc
def get_html(self):
try:
self.convert()
except:
traceback.print_exc()
self._doc.error_code = [-1]
if self._doc.error_code is not None:
return self._doc.error_code
return self._doc.get_html()
if __name__ == '__main__':
c = DocxConvert("C:/Users/Administrator/Downloads/1631944542835.docx", "C:/Users/Administrator/Downloads/1/")
print(c.get_html())