import inspect import os import sys sys.path.append(os.path.dirname(__file__) + "/../") from format_convert.convert_tree import _Document, _Sentence, _Page, _Image, _Table import logging import re import traceback import xml import zipfile import docx from format_convert.convert_image import picture2text from format_convert.utils import judge_error_code, add_div, get_logger, log, memory_decorator, get_garble_code from format_convert.wrapt_timeout_decorator import timeout def docx2text(): return @timeout(50, timeout_exception=TimeoutError) def read_xml_order(path, save_path): log("into read_xml_order") try: try: f = zipfile.ZipFile(path) for file in f.namelist(): if "word/document.xml" == str(file): f.extract(file, save_path) f.close() except Exception as e: log("docx format error!") return [-3] try: collection = xml_analyze(save_path + "word/document.xml") except TimeoutError: log("xml_analyze timeout") return [-4] body = collection.getElementsByTagName("w:body")[0] order_list = [] text_list = [] # 编号组记录 num_pr_dict = {} last_node_level = 0 for line in body.childNodes: # print(str(line)) if "w:p" in str(line): # 文本的编号(如果有编号的话) text_no = '' # 提取编号 组-层级-序号 num_pr = line.getElementsByTagName("w:numPr") if num_pr: num_pr = num_pr[0] group_id = int(num_pr.getElementsByTagName("w:numId")[0].getAttribute("w:val")) if group_id >= 1: node_level = num_pr.getElementsByTagName("w:ilvl") if node_level: node_level = int(node_level[0].getAttribute("w:val")) # print('node_level', node_level, 'last_node_level', last_node_level) if group_id in num_pr_dict.keys(): if last_node_level != 0 and node_level < last_node_level: # print('重置', 'group_id', group_id, 'last_node_level', last_node_level) # 需循环重置node_level到last_node_level之间的level for l in range(node_level+1, last_node_level+1): num_pr_dict[group_id][l] = 0 num_pr_dict[group_id][node_level] += 1 elif node_level in num_pr_dict[group_id].keys(): num_pr_dict[group_id][node_level] += 1 else: num_pr_dict[group_id][node_level] = 1 else: num_pr_dict[group_id] = {node_level: 1} # print(num_pr_dict[group_id]) for level in range(node_level+1): # 当前level下有多少个node if level not in num_pr_dict[group_id]: continue level_node_cnt = num_pr_dict[group_id][level] # print('level_node_cnt', level_node_cnt) text_no += str(level_node_cnt) + '.' last_node_level = node_level # print('read_xml_order text_no', text_no) text = line.getElementsByTagName("w:t") picture = line.getElementsByTagName("wp:docPr") if text: order_list.append("w:t") temp_text = "" for t in text: if len(t.childNodes) > 0: temp_text += t.childNodes[0].nodeValue else: continue if text_no: temp_text = text_no + ' ' + temp_text text_list.append(temp_text) if picture: order_list.append("wp:docPr") for line1 in line.childNodes: if "w:r" in str(line1): # print("read_xml_order", "w:r") picture1 = line1.getElementsByTagName("w:pict") if picture1: order_list.append("wp:docPr") if "w:tbl" in str(line): order_list.append("w:tbl") # read_xml_table(path, save_path) return [order_list, text_list] except Exception as e: log("read_xml_order error!") print("read_xml_order", traceback.print_exc()) # log_traceback("read_xml_order") return [-1] @timeout(50, timeout_exception=TimeoutError) def read_xml_table(path, save_path): def recursion_read_table(table): table_text = '' tr_index = 0 tr_text_list = [] # 直接子节点用child表示,所有子节点用all表示 for table_child in table.childNodes: if 'w:tr' in str(table_child): tr = table_child tr_child_nodes = tr.childNodes tc_index = 0 tc_text_list = [] for tr_child in tr_child_nodes: if 'w:tc' in str(tr_child).split(' '): tc_text = "" tc = tr_child # 获取一格占多少列,相当于colspan col_span = tc.getElementsByTagName("w:gridSpan") if col_span: col_span = int(col_span[0].getAttribute("w:val")) else: col_span = 1 # 获取是否是合并单元格的下一个空单元格,相当于rowspan is_merge = tc.getElementsByTagName("w:vMerge") if is_merge: is_merge = is_merge[0].getAttribute("w:val") if is_merge == "continue": col_span_index = 0 real_tc_index = 0 if 0 <= tr_index - 1 < len(tr_text_list): for tc_colspan in tr_text_list[tr_index - 1]: if col_span_index < tc_index: col_span_index += tc_colspan[1] real_tc_index += 1 if real_tc_index < len(tr_text_list[tr_index - 1]): tc_text = tr_text_list[tr_index - 1][real_tc_index][0] # 设置colspan table_text = table_text + "" tc_index += 1 tc_text_list.append([tc_text, col_span]) # 结束该tr table_text += "" tr_index += 1 tr_text_list.append(tc_text_list) # 结束该table table_text += "
" # 放入文本 tc_child_nodes = tc.childNodes for tc_child in tc_child_nodes: if 'w:tbl' in str(tc_child).split(' '): # 嵌套在tc中的表格 tc_text += recursion_read_table(tc_child) if 'w:p' in str(tc_child).split(' '): tc_p_all_nodes = tc_child.getElementsByTagName("*") for tc_p_all in tc_p_all_nodes: if 'w:t' in str(tc_p_all).split(' '): # w:t必须加childNodes[0]才能读文本 tc_text += tc_p_all.childNodes[0].nodeValue # 结束该tc table_text = table_text + tc_text + "
" return table_text log("into read_xml_table") try: try: f = zipfile.ZipFile(path) for file in f.namelist(): if "word/document.xml" == str(file): f.extract(file, save_path) f.close() except Exception as e: # print("docx format error!", e) log("docx format error!") return [-3] log("xml_analyze%s"%(save_path)) try: collection = xml_analyze(save_path + "word/document.xml") except TimeoutError: log("xml_analyze timeout") return [-4] log("xml_analyze done") body = collection.getElementsByTagName("w:body")[0] table_text_list = [] body_nodes = body.childNodes for node in body_nodes: if 'w:tbl' in str(node).split(' '): _table = node _table_text = recursion_read_table(_table) table_text_list.append(_table_text) return table_text_list except Exception as e: log("read_xml_table error") print("read_xml_table", traceback.print_exc()) return [-1] @timeout(25, timeout_exception=TimeoutError) def xml_analyze(path): # 解析xml DOMTree = xml.dom.minidom.parse(path) collection = DOMTree.documentElement return collection def read_docx_table(document): table_text_list = [] for table in document.tables: table_text = "" # print("==================") for row in table.rows: table_text += "" for cell in row.cells: table_text += "" table_text += "" table_text += "
" + re.sub("\s","",str(cell.text)) + "
" # print(table_text) table_text_list.append(table_text) return table_text_list class DocxConvert: def __init__(self, path, unique_type_dir): self._doc = _Document(path) self.path = path self.unique_type_dir = unique_type_dir @memory_decorator def init_package(self): # 各个包初始化 try: self.docx = docx.Document(self.path) self.zip = zipfile.ZipFile(self.path) except: log("cannot open docx!") traceback.print_exc() self._doc.error_code = [-3] def convert(self): self.init_package() if self._doc.error_code is not None: return order_and_text_list = self.get_orders() if judge_error_code(order_and_text_list): self._doc.error_code = order_and_text_list return order_list, text_list = order_and_text_list self._page = _Page(None, 0) # 乱码返回文件格式错误 match1 = re.findall(get_garble_code(), ''.join(text_list)) if len(match1) > 10: log("doc/docx garbled code!") # self._doc.error_code = [-3] _sen = _Sentence('文件乱码!', (0, 0, 0, 0)) self._page.add_child(_sen) self._doc.add_child(self._page) return # test # for i in range(len(text_list)): # print(order_list[i], text_list[i]) table_list = self.get_tables() if judge_error_code(table_list): self._doc.error_code = table_list return # paragraph_list = self.get_paragraphs() image_list = self.get_images() order_y = 0 doc_pr_cnt = 0 for tag in order_list: bbox = (0, order_y, 0, 0) if tag == "w:t": if len(text_list) > 0: _para = text_list.pop(0) _sen = _Sentence(_para, bbox) _sen.combine=False self._page.add_child(_sen) if tag == "wp:docPr": if len(image_list) > 0: temp_image_path = self.unique_type_dir + "docpr" + str(doc_pr_cnt) + ".png" _image = image_list.pop(0) with open(temp_image_path, "wb") as f: f.write(_image) _img = _Image(_image, temp_image_path, bbox) _img.is_from_docx = True self._page.add_child(_img) doc_pr_cnt += 1 if tag == "w:tbl": if len(table_list) > 0: _table = table_list.pop(0) _table = _Table(_table, bbox) _table.is_html = True self._page.add_child(_table) order_y += 1 if self._doc.error_code is None and self._page.error_code is not None: self._doc.error_code = self._page.error_code self._doc.add_child(self._page) def get_paragraphs(self): # 遍历段落 paragraph_list = [] for paragraph in self.docx.paragraphs: if paragraph.text != "": paragraph_list.append(paragraph.text) return paragraph_list @memory_decorator def get_tables(self): # 遍历表 table_list = read_xml_table(self.path, self.unique_type_dir) return table_list def get_images(self): # 顺序遍历图片 image_list = [] pattern = re.compile('rId\d+') for graph in self.docx.paragraphs: for run in graph.runs: if run.text == '': try: if not pattern.search(run.element.xml): continue content_id = pattern.search(run.element.xml).group(0) content_type = self.docx.part.related_parts[content_id].content_type except Exception as e: print("docx no image!", e) continue if not content_type.startswith('image'): continue img_data = self.docx.part.related_parts[content_id].blob if img_data is not None: image_list.append(img_data) return image_list @memory_decorator def get_orders(self): # 解析document.xml,获取文字顺序 order_and_text_list = read_xml_order(self.path, self.unique_type_dir) return order_and_text_list def get_doc_object(self): return self._doc def get_html(self): try: self.convert() except: traceback.print_exc() self._doc.error_code = [-1] if self._doc.error_code is not None: return self._doc.error_code return self._doc.get_html() if __name__ == '__main__': c = DocxConvert("C:/Users/Administrator/Downloads/1631944542835.docx", "C:/Users/Administrator/Downloads/1/") print(c.get_html())