import os import sys sys.path.append(os.path.dirname(__file__) + "/../") from format_convert.convert_tree import _Document, _Sentence, _Page, _Image, _Table import re import traceback import xml import zipfile import docx from bs4 import BeautifulSoup from format_convert.utils import judge_error_code, add_div, get_logger, log, memory_decorator, get_garble_code from format_convert.wrapt_timeout_decorator import timeout from format_convert.convert_image import ImageConvert from format_convert.convert_need_interface import from_tika_interface def docx2text(): return def read_rel_image(document_xml_rels): if not document_xml_rels: return {} # 获取映射文件里的关系 Id-Target image_rel_dict = {} for rel in document_xml_rels: if 'Relationship' in str(rel): _id = rel.get("Id") _target = rel.get("Target") _type = rel.get("Type") if 'image' in _type: image_rel_dict[_id] = _target return image_rel_dict def read_no_start(numbering_xml): """ 读取编号组的起始值 :return: """ if not numbering_xml: return {}, {} # 获取虚拟-真实id映射关系 w_num_list = numbering_xml.getElementsByTagName("w:num") abstract_real_id_dict = {} for w_num in w_num_list: w_num_id = w_num.getAttribute("w:numId") w_abstract_num_id = w_num.getElementsByTagName('w:abstractNumId')[0].getAttribute("w:val") abstract_real_id_dict[w_abstract_num_id] = w_num_id # 获取虚拟id的开始编号 w_abstract_num_list = numbering_xml.getElementsByTagName("w:abstractNum") abstract_id_level_dict = {} abstract_id_level_text_dict = {} for w_abstract_num in w_abstract_num_list: w_abstract_num_id = w_abstract_num.getAttribute("w:abstractNumId") w_lvl_list = w_abstract_num.getElementsByTagName("w:lvl") level_start_dict = {} level_text_dict = {} for w_lvl in w_lvl_list: w_ilvl_value = w_lvl.getAttribute('w:ilvl') if w_lvl.getElementsByTagName("w:start"): w_ilvl_start_num = w_lvl.getElementsByTagName("w:start")[0].getAttribute("w:val") level_start_dict[int(w_ilvl_value)] = int(w_ilvl_start_num) if w_lvl.getElementsByTagName("w:lvlText") and w_lvl.getElementsByTagName("w:numFmt"): w_lvl_text = w_lvl.getElementsByTagName("w:lvlText")[0].getAttribute("w:val") w_lvl_format = w_lvl.getElementsByTagName("w:numFmt")[0].getAttribute("w:val") if w_lvl_format == 'upperLetter': w_lvl_text = re.sub('%\d', '%A', w_lvl_text) elif w_lvl_format == 'lowerLetter': w_lvl_text = re.sub('%\d', '%a', w_lvl_text) level_text_dict[int(w_ilvl_value)] = w_lvl_text abstract_id_level_dict[w_abstract_num_id] = level_start_dict abstract_id_level_text_dict[w_abstract_num_id] = level_text_dict # 映射回真实id real_id_level_start_dict = {} for abstract_id in abstract_real_id_dict.keys(): real_id = abstract_real_id_dict.get(abstract_id) level_start_dict = abstract_id_level_dict.get(abstract_id) if level_start_dict: real_id_level_start_dict[int(real_id)] = level_start_dict real_id_level_text_dict = {} for abstract_id in abstract_real_id_dict.keys(): real_id = abstract_real_id_dict.get(abstract_id) level_text_dict = abstract_id_level_text_dict.get(abstract_id) if level_text_dict: real_id_level_text_dict[int(real_id)] = level_text_dict return real_id_level_start_dict, real_id_level_text_dict def read_p_text(unique_type_dir, p_node, _last_node_level, _num_pr_dict, numbering_xml, document_xml_rels, is_sdt=False): """ 读取w:p下的文本,包括编号 :param unique_type_dir: :param p_node: :param _last_node_level: :param _num_pr_dict: :param numbering_xml: :param document_xml_rels: :param is_sdt: :return: """ _text_list = [] _order_list = [] # 文本的编号(如果有编号的话) text_no = '' # 获取编号组的起始值和编号组的展示形式 id_level_start_dict, id_level_text_dict = read_no_start(numbering_xml) # print('_num_pr_dict', _num_pr_dict) # 提取编号 组-层级-序号 num_pr = p_node.getElementsByTagName("w:numPr") if num_pr: num_pr = num_pr[0] if num_pr.getElementsByTagName("w:numId"): group_id = int(num_pr.getElementsByTagName("w:numId")[0].getAttribute("w:val")) if group_id >= 1: node_level = num_pr.getElementsByTagName("w:ilvl") if node_level: node_level = int(node_level[0].getAttribute("w:val")) # print('group_id', group_id, 'node_level', node_level, 'last_node_level', _last_node_level) if group_id in _num_pr_dict.keys(): # if node_level == 0 and node_level not in _num_pr_dict[group_id].keys(): if node_level == 0 and _num_pr_dict.get(group_id) and node_level not in _num_pr_dict.get(group_id).keys(): _num_pr_dict[group_id][node_level] = 1 if _last_node_level != 0 and node_level < _last_node_level: # print('重置', 'group_id', group_id, 'last_node_level', last_node_level) # 需循环重置node_level到last_node_level之间的level for l in range(node_level+1, _last_node_level+1): _num_pr_dict[group_id][l] = 0 if _num_pr_dict[group_id].get(node_level): _num_pr_dict[group_id][node_level] += 1 else: pass # print('group_id, node_level', group_id, node_level) # elif node_level in _num_pr_dict[group_id].keys(): elif node_level in _num_pr_dict.get(group_id).keys(): _num_pr_dict[group_id][node_level] += 1 else: _num_pr_dict[group_id][node_level] = 1 else: _num_pr_dict[group_id] = {node_level: 1} # print(num_pr_dict[group_id]) for level in range(node_level+1): # 当前level下有多少个node # if level not in _num_pr_dict[group_id]: if level not in _num_pr_dict.get(group_id): # if level not in id_level_start_dict[group_id]: if not id_level_start_dict.get(group_id) or level not in id_level_start_dict.get(group_id): continue else: level_node_cnt = id_level_start_dict[group_id][level] else: level_node_cnt = _num_pr_dict[group_id][level] if id_level_start_dict.get(group_id) and _num_pr_dict.get(group_id) and id_level_start_dict.get(group_id).get(level) and _num_pr_dict.get(group_id).get(level): start_no = id_level_start_dict.get(group_id).get(level) level_node_cnt += start_no - 1 level_text = None if id_level_text_dict.get(group_id) and id_level_text_dict.get(group_id).get(level) and _num_pr_dict.get(group_id).get(level): level_text = id_level_text_dict.get(group_id).get(level) # print('level_node_cnt', level_node_cnt) if level_text: if re.search('a', level_text): level_node_cnt = chr(ord('a') + level_node_cnt - 1) text_no += re.sub('%a', str(level_node_cnt), level_text) elif re.search('A', level_text): level_node_cnt = chr(ord('A') + level_node_cnt - 1) text_no += re.sub('%A', str(level_node_cnt), level_text) else: text_no += re.sub('%\d', str(level_node_cnt), level_text) else: text_no += str(level_node_cnt) + '.' # print('text_no', text_no) _last_node_level = node_level # text = p_node.getElementsByTagName("w:t") # picture = p_node.getElementsByTagName("wp:docPr") # if text: # _order_list.append("w:t") # temp_text = "" # if is_sdt and len(text) == 2: # if len(text[0].childNodes) > 0 and len(text[1].childNodes) > 0: # temp_text += text[0].childNodes[0].nodeValue + '.'*20 + text[1].childNodes[0].nodeValue # else: # for t in text: # if len(t.childNodes) > 0: # temp_text += t.childNodes[0].nodeValue # else: # continue # if text_no: # temp_text = text_no + ' ' + temp_text # _text_list.append(temp_text) # # 只有序号 # elif len(text_no) >= 2: # _text_list.append(text_no[:-1]) # # if picture: # _order_list.append("wp:docPr") # # for line1 in p_node.childNodes: # if "w:r" in str(line1): # picture1 = line1.getElementsByTagName("w:pict") # if picture1: # _order_list.append("wp:docPr") p_node_text = '' has_html = False # 编号先加上 if text_no: p_node_text += text_no text = p_node.getElementsByTagName("w:t") # 目录页单特殊生成 if is_sdt and len(text) == 2: p_node_text += text[0].childNodes[0].nodeValue + '.'*20 + text[1].childNodes[0].nodeValue # 正常页面 else: image_rel_dict = read_rel_image(document_xml_rels) p_node_all = p_node.getElementsByTagName("*") for node in p_node_all: # 文本 if "w:t" in str(node).split(' '): if node.childNodes: p_node_text += node.childNodes[0].nodeValue # 图片,提前识别,不做成Image对象放入Page了 elif "a:blip" in str(node).split(' '): _id = node.getAttribute("r:embed") image_path = image_rel_dict.get(_id) if image_path: image_path = unique_type_dir + 'word/' + image_path image_convert = ImageConvert(image_path, '') image_html = image_convert.get_html()[0] if isinstance(image_html, int): image_html = '' p_node_text += image_html has_html = True # 只有编号 if len(p_node_text) > 0 and p_node_text == text_no: p_node_text = p_node_text[:-1] _text_list.append(p_node_text) if has_html: _order_list.append('w:t html') else: _order_list.append('w:t') return _text_list, _order_list, _num_pr_dict, _last_node_level @timeout(50, timeout_exception=TimeoutError) def read_xml_order(unique_type_dir, document_xml, numbering_xml, document_xml_rels): log("into read_xml_order") try: body = document_xml.getElementsByTagName("w:body")[0] order_list = [] text_list = [] # 编号组记录 num_pr_dict = {} last_node_level = 0 for line in body.childNodes: # 普通文本 if "w:p" in str(line): t_list, o_list, num_pr_dict, last_node_level = read_p_text(unique_type_dir, line, last_node_level, num_pr_dict, numbering_xml, document_xml_rels) text_list += t_list order_list += o_list # 目录索引 elif "w:sdt" in str(line): sdt = line for sdt_child in sdt.childNodes: if "w:sdtContent" in str(sdt_child): sdt_content = sdt_child for sdt_content_child in sdt_content.childNodes: if 'w:p' in str(sdt_content_child): t_list, o_list, num_pr_dict, last_node_level = read_p_text(unique_type_dir, sdt_content_child, last_node_level, num_pr_dict, numbering_xml, document_xml_rels, is_sdt=True) text_list += t_list order_list += o_list elif "w:tbl" in str(line): order_list.append("w:tbl") # read_xml_table(path, save_path) return [order_list, text_list] except Exception as e: log("read_xml_order error!") traceback.print_exc() return [-1] @timeout(50, timeout_exception=TimeoutError) def read_xml_table(unique_type_dir, document_xml, numbering_xml, document_xml_rels): def recursion_read_table(table): table_text = '' tr_index = 0 tr_text_list = [] last_node_level = 0 num_pr_dict = {} # 直接子节点用child表示,所有子节点用all表示 for table_child in table.childNodes: if 'w:tr' in str(table_child): table_text += "" tr = table_child tr_child_nodes = tr.childNodes tc_index = 0 tc_text_list = [] for tr_child in tr_child_nodes: if 'w:tc' in str(tr_child).split(' '): tc_text = "" tc = tr_child # 获取一格占多少列,相当于colspan col_span = tc.getElementsByTagName("w:gridSpan") if col_span: col_span = int(col_span[0].getAttribute("w:val")) else: col_span = 1 # 获取是否是合并单元格的下一个空单元格,相当于rowspan is_merge = tc.getElementsByTagName("w:vMerge") if is_merge: is_merge = is_merge[0].getAttribute("w:val") if is_merge == "continue": col_span_index = 0 real_tc_index = 0 if 0 <= tr_index - 1 < len(tr_text_list): for tc_colspan in tr_text_list[tr_index - 1]: if col_span_index < tc_index: col_span_index += tc_colspan[1] real_tc_index += 1 if real_tc_index < len(tr_text_list[tr_index - 1]): tc_text = tr_text_list[tr_index - 1][real_tc_index][0] # 设置colspan table_text = table_text + "" tc_index += 1 tc_text_list.append([tc_text, col_span]) # 结束该tr table_text += "" tr_index += 1 tr_text_list.append(tc_text_list) # 结束该table table_text += "
" # 放入文本 tc_child_nodes = tc.childNodes for tc_child in tc_child_nodes: if 'w:tbl' in str(tc_child).split(' '): # 嵌套在tc中的表格 tc_text += recursion_read_table(tc_child) if 'w:p' in str(tc_child).split(' '): tc_p_all_nodes = tc_child.getElementsByTagName("*") _t_list, _, num_pr_dict, last_node_level = read_p_text(unique_type_dir, tc_child, last_node_level, num_pr_dict, numbering_xml, document_xml_rels) # print('_t_list', _t_list) tc_text += ''.join(_t_list) # for tc_p_all in tc_p_all_nodes: # if 'w:t' in str(tc_p_all).split(' '): # # w:t必须加childNodes[0]才能读文本 # tc_text += tc_p_all.childNodes[0].nodeValue # 结束该tc table_text = table_text + tc_text + "
" return table_text log("into read_xml_table") try: body = document_xml.getElementsByTagName("w:body")[0] table_text_list = [] body_nodes = body.childNodes for node in body_nodes: if 'w:tbl' in str(node).split(' '): _table = node _table_text = recursion_read_table(_table) table_text_list.append(_table_text) return table_text_list except Exception as e: log("read_xml_table error") print("read_xml_table", traceback.print_exc()) return [-1] @timeout(25, timeout_exception=TimeoutError) def parse_xml(path): # 解析xml DOMTree = xml.dom.minidom.parse(path) collection = DOMTree.documentElement return collection @timeout(25, timeout_exception=TimeoutError) def parse_xml2(path): # 解析xml tree = xml.etree.ElementTree.parse(path) root = tree.getroot() return root class DocxConvert: def __init__(self, path, unique_type_dir): self._doc = _Document(path) self.path = path self.unique_type_dir = unique_type_dir # 解压docx try: f = zipfile.ZipFile(path) for file in f.namelist(): if "word/" in str(file): f.extract(file, self.unique_type_dir) f.close() except Exception as e: log("docx format error!") self._doc.error_code = [-3] # 读取内容 try: self.document_xml = parse_xml(self.unique_type_dir + "word/document.xml") if os.path.exists(self.unique_type_dir + "word/numbering.xml"): self.numbering_xml = parse_xml(self.unique_type_dir + "word/numbering.xml") else: self.numbering_xml = [] if os.path.exists(self.unique_type_dir + "word/_rels/document.xml.rels"): self.document_xml_rels = parse_xml2(self.unique_type_dir + "word/_rels/document.xml.rels") else: self.document_xml_rels = [] except FileNotFoundError: # 找不到解压文件,就用html格式读 log('FileNotFoundError') self._doc.error_code = None except TimeoutError: log("parse_xml timeout") self._doc.error_code = [-4] @memory_decorator def init_package(self): # 各个包初始化 try: self.docx = docx.Document(self.path) self.zip = zipfile.ZipFile(self.path) except: log("cannot open docx!") traceback.print_exc() self._doc.error_code = [-3] def convert(self): self._page = _Page(None, 0) # 先判断特殊doc文件,可能是html文本 is_html_doc = False try: with open(self.path, 'r') as f: html_str = f.read() if re.search(' 10: log("doc/docx garbled code!") self._doc.error_code = [-3] # _sen = _Sentence('文件乱码!', (0, 0, 0, 0)) # self._page.add_child(_sen) self._doc.add_child(self._page) return # test # for i in range(len(text_list)): # print(order_list[i], text_list[i]) table_list = self.get_tables() if judge_error_code(table_list): self._doc.error_code = table_list return # paragraph_list = self.get_paragraphs() image_list = self.get_images() order_y = 0 doc_pr_cnt = 0 for tag in order_list: bbox = (0, order_y, 0, 0) if tag == "w:t html": if len(text_list) > 0: _para = text_list.pop(0) _sen = _Sentence(_para, bbox) _sen.combine = False _sen.is_html = True self._page.add_child(_sen) if tag == "w:t": if len(text_list) > 0: _para = text_list.pop(0) _sen = _Sentence(_para, bbox) _sen.combine = False self._page.add_child(_sen) if tag == "wp:docPr": if len(image_list) > 0: temp_image_path = self.unique_type_dir + "docpr" + str(doc_pr_cnt) + ".png" _image = image_list.pop(0) with open(temp_image_path, "wb") as f: f.write(_image) _img = _Image(_image, temp_image_path, bbox) _img.is_from_docx = True self._page.add_child(_img) doc_pr_cnt += 1 if tag == "w:tbl": if len(table_list) > 0: _table = table_list.pop(0) _table = _Table(_table, bbox) _table.is_html = True self._page.add_child(_table) order_y += 1 if self._doc.error_code is None and self._page.error_code is not None: self._doc.error_code = self._page.error_code self._doc.add_child(self._page) @memory_decorator def get_tables(self): # 遍历表 table_list = read_xml_table(self.unique_type_dir, self.document_xml, self.numbering_xml, self.document_xml_rels) return table_list def get_images(self): # 顺序遍历图片 image_list = [] pattern = re.compile('rId\d+') for graph in self.docx.paragraphs: for run in graph.runs: if run.text == '': try: if not pattern.search(run.element.xml): continue content_id = pattern.search(run.element.xml).group(0) content_type = self.docx.part.related_parts[content_id].content_type except Exception as e: print("docx no image!", e) continue if not content_type.startswith('image'): continue img_data = self.docx.part.related_parts[content_id].blob if img_data is not None: image_list.append(img_data) return image_list @memory_decorator def get_orders(self): # 解析document.xml,获取文字顺序 order_and_text_list = read_xml_order(self.unique_type_dir, self.document_xml, self.numbering_xml, self.document_xml_rels) return order_and_text_list def get_doc_object(self): return self._doc def get_html(self): if self._doc.error_code is not None: return self._doc.error_code try: self.convert() except: traceback.print_exc() self._doc.error_code = [-1] # log('docx error code ' + str(self._doc.error_code)) if self._doc.error_code is not None: # 调用tika提取 html = from_tika_interface(self.path) if judge_error_code(html): self._doc.error_code = html return self._doc.error_code else: return [html] return self._doc.get_html() if __name__ == '__main__': c = DocxConvert("C:/Users/Administrator/Downloads/1631944542835.docx", "C:/Users/Administrator/Downloads/1/") print(c.get_html())