123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627 |
- import os
- import sys
- sys.path.append(os.path.dirname(__file__) + "/../")
- from format_convert.convert_tree import _Document, _Sentence, _Page, _Image, _Table
- import re
- import traceback
- import xml
- import zipfile
- import docx
- from bs4 import BeautifulSoup
- from format_convert.utils import judge_error_code, add_div, get_logger, log, memory_decorator, get_garble_code
- from format_convert.wrapt_timeout_decorator import timeout
- from format_convert.convert_image import ImageConvert
- from format_convert.convert_need_interface import from_tika_interface
- def docx2text():
- return
- def read_rel_image(document_xml_rels):
- if not document_xml_rels:
- return {}
- # 获取映射文件里的关系 Id-Target
- image_rel_dict = {}
- for rel in document_xml_rels:
- if 'Relationship' in str(rel):
- _id = rel.get("Id")
- _target = rel.get("Target")
- _type = rel.get("Type")
- if 'image' in _type:
- image_rel_dict[_id] = _target
- return image_rel_dict
- def read_no_start(numbering_xml):
- """
- 读取编号组的起始值
- :return:
- """
- if not numbering_xml:
- return {}, {}
- # 获取虚拟-真实id映射关系
- w_num_list = numbering_xml.getElementsByTagName("w:num")
- abstract_real_id_dict = {}
- for w_num in w_num_list:
- w_num_id = w_num.getAttribute("w:numId")
- w_abstract_num_id = w_num.getElementsByTagName('w:abstractNumId')[0].getAttribute("w:val")
- abstract_real_id_dict[w_abstract_num_id] = w_num_id
- # 获取虚拟id的开始编号
- w_abstract_num_list = numbering_xml.getElementsByTagName("w:abstractNum")
- abstract_id_level_dict = {}
- abstract_id_level_text_dict = {}
- for w_abstract_num in w_abstract_num_list:
- w_abstract_num_id = w_abstract_num.getAttribute("w:abstractNumId")
- w_lvl_list = w_abstract_num.getElementsByTagName("w:lvl")
- level_start_dict = {}
- level_text_dict = {}
- for w_lvl in w_lvl_list:
- w_ilvl_value = w_lvl.getAttribute('w:ilvl')
- if w_lvl.getElementsByTagName("w:start"):
- w_ilvl_start_num = w_lvl.getElementsByTagName("w:start")[0].getAttribute("w:val")
- level_start_dict[int(w_ilvl_value)] = int(w_ilvl_start_num)
- if w_lvl.getElementsByTagName("w:lvlText") and w_lvl.getElementsByTagName("w:numFmt"):
- w_lvl_text = w_lvl.getElementsByTagName("w:lvlText")[0].getAttribute("w:val")
- w_lvl_format = w_lvl.getElementsByTagName("w:numFmt")[0].getAttribute("w:val")
- if w_lvl_format == 'upperLetter':
- w_lvl_text = re.sub('%\d', '%A', w_lvl_text)
- elif w_lvl_format == 'lowerLetter':
- w_lvl_text = re.sub('%\d', '%a', w_lvl_text)
- level_text_dict[int(w_ilvl_value)] = w_lvl_text
- abstract_id_level_dict[w_abstract_num_id] = level_start_dict
- abstract_id_level_text_dict[w_abstract_num_id] = level_text_dict
- # 映射回真实id
- real_id_level_start_dict = {}
- for abstract_id in abstract_real_id_dict.keys():
- real_id = abstract_real_id_dict.get(abstract_id)
- level_start_dict = abstract_id_level_dict.get(abstract_id)
- if level_start_dict:
- real_id_level_start_dict[int(real_id)] = level_start_dict
- real_id_level_text_dict = {}
- for abstract_id in abstract_real_id_dict.keys():
- real_id = abstract_real_id_dict.get(abstract_id)
- level_text_dict = abstract_id_level_text_dict.get(abstract_id)
- if level_text_dict:
- real_id_level_text_dict[int(real_id)] = level_text_dict
- return real_id_level_start_dict, real_id_level_text_dict
- def read_p_text(unique_type_dir, p_node, _last_node_level, _num_pr_dict, numbering_xml, document_xml_rels,
- is_sdt=False):
- """
- 读取w:p下的文本,包括编号
- :param unique_type_dir:
- :param p_node:
- :param _last_node_level:
- :param _num_pr_dict:
- :param numbering_xml:
- :param document_xml_rels:
- :param is_sdt:
- :return:
- """
- _text_list = []
- _order_list = []
- # 文本的编号(如果有编号的话)
- text_no = ''
- # 获取编号组的起始值和编号组的展示形式
- id_level_start_dict, id_level_text_dict = read_no_start(numbering_xml)
- # print('_num_pr_dict', _num_pr_dict)
- # 提取编号 组-层级-序号
- num_pr = p_node.getElementsByTagName("w:numPr")
- if num_pr:
- num_pr = num_pr[0]
- if num_pr.getElementsByTagName("w:numId"):
- group_id = int(num_pr.getElementsByTagName("w:numId")[0].getAttribute("w:val"))
- if group_id >= 1:
- node_level = num_pr.getElementsByTagName("w:ilvl")
- if node_level:
- node_level = int(node_level[0].getAttribute("w:val"))
- # print('group_id', group_id, 'node_level', node_level, 'last_node_level', _last_node_level)
- if group_id in _num_pr_dict.keys():
- # if node_level == 0 and node_level not in _num_pr_dict[group_id].keys():
- if node_level == 0 and _num_pr_dict.get(group_id) and node_level not in _num_pr_dict.get(group_id).keys():
- _num_pr_dict[group_id][node_level] = 1
- if _last_node_level != 0 and node_level < _last_node_level:
- # print('重置', 'group_id', group_id, 'last_node_level', last_node_level)
- # 需循环重置node_level到last_node_level之间的level
- for l in range(node_level+1, _last_node_level+1):
- _num_pr_dict[group_id][l] = 0
- if _num_pr_dict[group_id].get(node_level):
- _num_pr_dict[group_id][node_level] += 1
- else:
- pass
- # print('group_id, node_level', group_id, node_level)
- # elif node_level in _num_pr_dict[group_id].keys():
- elif node_level in _num_pr_dict.get(group_id).keys():
- _num_pr_dict[group_id][node_level] += 1
- else:
- _num_pr_dict[group_id][node_level] = 1
- else:
- _num_pr_dict[group_id] = {node_level: 1}
- # print(num_pr_dict[group_id])
- for level in range(node_level+1):
- # 当前level下有多少个node
- # if level not in _num_pr_dict[group_id]:
- if level not in _num_pr_dict.get(group_id):
- # if level not in id_level_start_dict[group_id]:
- if not id_level_start_dict.get(group_id) or level not in id_level_start_dict.get(group_id):
- continue
- else:
- level_node_cnt = id_level_start_dict[group_id][level]
- else:
- level_node_cnt = _num_pr_dict[group_id][level]
- if id_level_start_dict.get(group_id) and _num_pr_dict.get(group_id) and id_level_start_dict.get(group_id).get(level) and _num_pr_dict.get(group_id).get(level):
- start_no = id_level_start_dict.get(group_id).get(level)
- level_node_cnt += start_no - 1
- level_text = None
- if id_level_text_dict.get(group_id) and id_level_text_dict.get(group_id).get(level) and _num_pr_dict.get(group_id).get(level):
- level_text = id_level_text_dict.get(group_id).get(level)
- # print('level_node_cnt', level_node_cnt)
- if level_text:
- if re.search('a', level_text):
- level_node_cnt = chr(ord('a') + level_node_cnt - 1)
- text_no += re.sub('%a', str(level_node_cnt), level_text)
- elif re.search('A', level_text):
- level_node_cnt = chr(ord('A') + level_node_cnt - 1)
- text_no += re.sub('%A', str(level_node_cnt), level_text)
- else:
- text_no += re.sub('%\d', str(level_node_cnt), level_text)
- else:
- text_no += str(level_node_cnt) + '.'
- # print('text_no', text_no)
- _last_node_level = node_level
- # text = p_node.getElementsByTagName("w:t")
- # picture = p_node.getElementsByTagName("wp:docPr")
- # if text:
- # _order_list.append("w:t")
- # temp_text = ""
- # if is_sdt and len(text) == 2:
- # if len(text[0].childNodes) > 0 and len(text[1].childNodes) > 0:
- # temp_text += text[0].childNodes[0].nodeValue + '.'*20 + text[1].childNodes[0].nodeValue
- # else:
- # for t in text:
- # if len(t.childNodes) > 0:
- # temp_text += t.childNodes[0].nodeValue
- # else:
- # continue
- # if text_no:
- # temp_text = text_no + ' ' + temp_text
- # _text_list.append(temp_text)
- # # 只有序号
- # elif len(text_no) >= 2:
- # _text_list.append(text_no[:-1])
- #
- # if picture:
- # _order_list.append("wp:docPr")
- #
- # for line1 in p_node.childNodes:
- # if "w:r" in str(line1):
- # picture1 = line1.getElementsByTagName("w:pict")
- # if picture1:
- # _order_list.append("wp:docPr")
- p_node_text = ''
- has_html = False
- # 编号先加上
- if text_no:
- p_node_text += text_no
- text = p_node.getElementsByTagName("w:t")
- # 目录页单特殊生成
- if is_sdt and len(text) == 2:
- p_node_text += text[0].childNodes[0].nodeValue + '.'*20 + text[1].childNodes[0].nodeValue
- # 正常页面
- else:
- image_rel_dict = read_rel_image(document_xml_rels)
- p_node_all = p_node.getElementsByTagName("*")
- for node in p_node_all:
- # 文本
- if "w:t" in str(node).split(' '):
- if node.childNodes:
- p_node_text += node.childNodes[0].nodeValue
- # 图片,提前识别,不做成Image对象放入Page了
- elif "a:blip" in str(node).split(' '):
- _id = node.getAttribute("r:embed")
- image_path = image_rel_dict.get(_id)
- if image_path:
- image_path = unique_type_dir + 'word/' + image_path
- image_convert = ImageConvert(image_path, '')
- image_html = image_convert.get_html()[0]
- if isinstance(image_html, int):
- image_html = ''
- p_node_text += image_html
- has_html = True
- # 只有编号
- if len(p_node_text) > 0 and p_node_text == text_no:
- p_node_text = p_node_text[:-1]
- _text_list.append(p_node_text)
- if has_html:
- _order_list.append('w:t html')
- else:
- _order_list.append('w:t')
- return _text_list, _order_list, _num_pr_dict, _last_node_level
- @timeout(50, timeout_exception=TimeoutError)
- def read_xml_order(unique_type_dir, document_xml, numbering_xml, document_xml_rels):
- log("into read_xml_order")
- try:
- body = document_xml.getElementsByTagName("w:body")[0]
- order_list = []
- text_list = []
- # 编号组记录
- num_pr_dict = {}
- last_node_level = 0
- for line in body.childNodes:
- # 普通文本
- if "w:p" in str(line):
- t_list, o_list, num_pr_dict, last_node_level = read_p_text(unique_type_dir,
- line,
- last_node_level,
- num_pr_dict,
- numbering_xml,
- document_xml_rels)
- text_list += t_list
- order_list += o_list
- # 目录索引
- elif "w:sdt" in str(line):
- sdt = line
- for sdt_child in sdt.childNodes:
- if "w:sdtContent" in str(sdt_child):
- sdt_content = sdt_child
- for sdt_content_child in sdt_content.childNodes:
- if 'w:p' in str(sdt_content_child):
- t_list, o_list, num_pr_dict, last_node_level = read_p_text(unique_type_dir,
- sdt_content_child,
- last_node_level,
- num_pr_dict,
- numbering_xml,
- document_xml_rels,
- is_sdt=True)
- text_list += t_list
- order_list += o_list
- elif "w:tbl" in str(line):
- order_list.append("w:tbl")
- # read_xml_table(path, save_path)
- return [order_list, text_list]
- except Exception as e:
- log("read_xml_order error!")
- traceback.print_exc()
- return [-1]
- @timeout(50, timeout_exception=TimeoutError)
- def read_xml_table(unique_type_dir, document_xml, numbering_xml, document_xml_rels):
- def recursion_read_table(table):
- table_text = '<table border="1">'
- tr_index = 0
- tr_text_list = []
- last_node_level = 0
- num_pr_dict = {}
- # 直接子节点用child表示,所有子节点用all表示
- for table_child in table.childNodes:
- if 'w:tr' in str(table_child):
- table_text += "<tr>"
- tr = table_child
- tr_child_nodes = tr.childNodes
- tc_index = 0
- tc_text_list = []
- for tr_child in tr_child_nodes:
- if 'w:tc' in str(tr_child).split(' '):
- tc_text = ""
- tc = tr_child
- # 获取一格占多少列,相当于colspan
- col_span = tc.getElementsByTagName("w:gridSpan")
- if col_span:
- col_span = int(col_span[0].getAttribute("w:val"))
- else:
- col_span = 1
- # 获取是否是合并单元格的下一个空单元格,相当于rowspan
- is_merge = tc.getElementsByTagName("w:vMerge")
- if is_merge:
- is_merge = is_merge[0].getAttribute("w:val")
- if is_merge == "continue":
- col_span_index = 0
- real_tc_index = 0
- if 0 <= tr_index - 1 < len(tr_text_list):
- for tc_colspan in tr_text_list[tr_index - 1]:
- if col_span_index < tc_index:
- col_span_index += tc_colspan[1]
- real_tc_index += 1
- if real_tc_index < len(tr_text_list[tr_index - 1]):
- tc_text = tr_text_list[tr_index - 1][real_tc_index][0]
- # 设置colspan
- table_text = table_text + "<td colspan=" + str(col_span) + ">"
- # 放入文本
- tc_child_nodes = tc.childNodes
- for tc_child in tc_child_nodes:
- if 'w:tbl' in str(tc_child).split(' '):
- # 嵌套在tc中的表格
- tc_text += recursion_read_table(tc_child)
- if 'w:p' in str(tc_child).split(' '):
- tc_p_all_nodes = tc_child.getElementsByTagName("*")
- _t_list, _, num_pr_dict, last_node_level = read_p_text(unique_type_dir,
- tc_child,
- last_node_level,
- num_pr_dict,
- numbering_xml,
- document_xml_rels)
- # print('_t_list', _t_list)
- tc_text += ''.join(_t_list)
- # for tc_p_all in tc_p_all_nodes:
- # if 'w:t' in str(tc_p_all).split(' '):
- # # w:t必须加childNodes[0]才能读文本
- # tc_text += tc_p_all.childNodes[0].nodeValue
- # 结束该tc
- table_text = table_text + tc_text + "</td>"
- tc_index += 1
- tc_text_list.append([tc_text, col_span])
- # 结束该tr
- table_text += "</tr>"
- tr_index += 1
- tr_text_list.append(tc_text_list)
- # 结束该table
- table_text += "</table>"
- return table_text
- log("into read_xml_table")
- try:
- body = document_xml.getElementsByTagName("w:body")[0]
- table_text_list = []
- body_nodes = body.childNodes
- for node in body_nodes:
- if 'w:tbl' in str(node).split(' '):
- _table = node
- _table_text = recursion_read_table(_table)
- table_text_list.append(_table_text)
- return table_text_list
- except Exception as e:
- log("read_xml_table error")
- print("read_xml_table", traceback.print_exc())
- return [-1]
- @timeout(25, timeout_exception=TimeoutError)
- def parse_xml(path):
- # 解析xml
- DOMTree = xml.dom.minidom.parse(path)
- collection = DOMTree.documentElement
- return collection
- @timeout(25, timeout_exception=TimeoutError)
- def parse_xml2(path):
- # 解析xml
- tree = xml.etree.ElementTree.parse(path)
- root = tree.getroot()
- return root
- class DocxConvert:
- def __init__(self, path, unique_type_dir):
- self._doc = _Document(path)
- self.path = path
- self.unique_type_dir = unique_type_dir
- # 解压docx
- try:
- f = zipfile.ZipFile(path)
- for file in f.namelist():
- if "word/" in str(file):
- f.extract(file, self.unique_type_dir)
- f.close()
- except Exception as e:
- log("docx format error!")
- self._doc.error_code = [-3]
- # 读取内容
- try:
- self.document_xml = parse_xml(self.unique_type_dir + "word/document.xml")
- if os.path.exists(self.unique_type_dir + "word/numbering.xml"):
- self.numbering_xml = parse_xml(self.unique_type_dir + "word/numbering.xml")
- else:
- self.numbering_xml = []
- if os.path.exists(self.unique_type_dir + "word/_rels/document.xml.rels"):
- self.document_xml_rels = parse_xml2(self.unique_type_dir + "word/_rels/document.xml.rels")
- else:
- self.document_xml_rels = []
- except FileNotFoundError:
- # 找不到解压文件,就用html格式读
- log('FileNotFoundError')
- self._doc.error_code = None
- except TimeoutError:
- log("parse_xml timeout")
- self._doc.error_code = [-4]
- @memory_decorator
- def init_package(self):
- # 各个包初始化
- try:
- self.docx = docx.Document(self.path)
- self.zip = zipfile.ZipFile(self.path)
- except:
- log("cannot open docx!")
- traceback.print_exc()
- self._doc.error_code = [-3]
- def convert(self):
- self._page = _Page(None, 0)
- # 先判断特殊doc文件,可能是html文本
- is_html_doc = False
- try:
- with open(self.path, 'r') as f:
- html_str = f.read()
- if re.search('<div|<html|<body|<head|<tr|<br|<table|<td', html_str):
- soup = BeautifulSoup(html_str, 'lxml')
- text = soup.text
- is_html_doc = True
- except:
- pass
- if is_html_doc:
- _sen = _Sentence(text, (0, 0, 0, 0))
- self._page.add_child(_sen)
- self._doc.add_child(self._page)
- return
- self.init_package()
- if self._doc.error_code is not None:
- return
- order_and_text_list = self.get_orders()
- if judge_error_code(order_and_text_list):
- self._doc.error_code = order_and_text_list
- return
- order_list, text_list = order_and_text_list
- # 乱码返回文件格式错误
- match1 = re.findall(get_garble_code(), ''.join(text_list))
- if len(match1) > 10:
- log("doc/docx garbled code!")
- self._doc.error_code = [-3]
- # _sen = _Sentence('文件乱码!', (0, 0, 0, 0))
- # self._page.add_child(_sen)
- self._doc.add_child(self._page)
- return
- # test
- # for i in range(len(text_list)):
- # print(order_list[i], text_list[i])
- table_list = self.get_tables()
- if judge_error_code(table_list):
- self._doc.error_code = table_list
- return
- # paragraph_list = self.get_paragraphs()
- image_list = self.get_images()
- order_y = 0
- doc_pr_cnt = 0
- for tag in order_list:
- bbox = (0, order_y, 0, 0)
- if tag == "w:t html":
- if len(text_list) > 0:
- _para = text_list.pop(0)
- _sen = _Sentence(_para, bbox)
- _sen.combine = False
- _sen.is_html = True
- self._page.add_child(_sen)
- if tag == "w:t":
- if len(text_list) > 0:
- _para = text_list.pop(0)
- _sen = _Sentence(_para, bbox)
- _sen.combine = False
- self._page.add_child(_sen)
- if tag == "wp:docPr":
- if len(image_list) > 0:
- temp_image_path = self.unique_type_dir + "docpr" + str(doc_pr_cnt) + ".png"
- _image = image_list.pop(0)
- with open(temp_image_path, "wb") as f:
- f.write(_image)
- _img = _Image(_image, temp_image_path, bbox)
- _img.is_from_docx = True
- self._page.add_child(_img)
- doc_pr_cnt += 1
- if tag == "w:tbl":
- if len(table_list) > 0:
- _table = table_list.pop(0)
- _table = _Table(_table, bbox)
- _table.is_html = True
- self._page.add_child(_table)
- order_y += 1
- if self._doc.error_code is None and self._page.error_code is not None:
- self._doc.error_code = self._page.error_code
- self._doc.add_child(self._page)
- @memory_decorator
- def get_tables(self):
- # 遍历表
- table_list = read_xml_table(self.unique_type_dir, self.document_xml, self.numbering_xml, self.document_xml_rels)
- return table_list
- def get_images(self):
- # 顺序遍历图片
- image_list = []
- pattern = re.compile('rId\d+')
- for graph in self.docx.paragraphs:
- for run in graph.runs:
- if run.text == '':
- try:
- if not pattern.search(run.element.xml):
- continue
- content_id = pattern.search(run.element.xml).group(0)
- content_type = self.docx.part.related_parts[content_id].content_type
- except Exception as e:
- print("docx no image!", e)
- continue
- if not content_type.startswith('image'):
- continue
- img_data = self.docx.part.related_parts[content_id].blob
- if img_data is not None:
- image_list.append(img_data)
- return image_list
- @memory_decorator
- def get_orders(self):
- # 解析document.xml,获取文字顺序
- order_and_text_list = read_xml_order(self.unique_type_dir, self.document_xml, self.numbering_xml, self.document_xml_rels)
- return order_and_text_list
- def get_doc_object(self):
- return self._doc
- def get_html(self):
- if self._doc.error_code is not None:
- return self._doc.error_code
- try:
- self.convert()
- except:
- traceback.print_exc()
- self._doc.error_code = [-1]
- # log('docx error code ' + str(self._doc.error_code))
- if self._doc.error_code is not None:
- # 调用tika提取
- html = from_tika_interface(self.path)
- if judge_error_code(html):
- self._doc.error_code = html
- return self._doc.error_code
- else:
- return [html]
- return self._doc.get_html()
- if __name__ == '__main__':
- c = DocxConvert("C:/Users/Administrator/Downloads/1631944542835.docx", "C:/Users/Administrator/Downloads/1/")
- print(c.get_html())
|