import inspect import os import sys sys.path.append(os.path.dirname(__file__) + "/../") from format_convert.convert_tree import _Document, _Sentence, _Page, _Image, _Table import logging import re import traceback import xml import zipfile import docx from format_convert.convert_image import picture2text from format_convert.utils import judge_error_code, add_div, get_logger, log, memory_decorator, get_garble_code from format_convert.wrapt_timeout_decorator import timeout @memory_decorator def docx2text(path, unique_type_dir): log("into docx2text") try: try: doc = docx.Document(path) except Exception as e: print("docx format error!", e) print(traceback.print_exc()) log("docx format error!") return [-3] # 遍历段落 # print("docx2text extract paragraph") paragraph_text_list = [] for paragraph in doc.paragraphs: if paragraph.text != "": paragraph_text_list.append("
" + paragraph.text + "
" ) # print("paragraph_text", paragraph.text) # 遍历表 try: table_text_list = read_xml_table(path, unique_type_dir) except TimeoutError: return [-4] if judge_error_code(table_text_list): return table_text_list # 顺序遍历图片 # print("docx2text extract image") image_text_list = [] temp_image_path = unique_type_dir + "temp_image.png" pattern = re.compile('rId\d+') for graph in doc.paragraphs: for run in graph.runs: if run.text == '': try: if not pattern.search(run.element.xml): continue content_id = pattern.search(run.element.xml).group(0) content_type = doc.part.related_parts[content_id].content_type except Exception as e: print("docx no image!", e) continue if not content_type.startswith('image'): continue # 写入临时文件 img_data = doc.part.related_parts[content_id].blob with open(temp_image_path, 'wb') as f: f.write(img_data) # if get_platform() == "Windows": # print("img_data", img_data) if img_data is None: continue # 识别图片文字 image_text = picture2text(temp_image_path) if image_text == [-2]: return [-2] if image_text == [-1]: return [-1] if image_text == [-3]: continue image_text = image_text[0] image_text_list.append(add_div(image_text)) # 解析document.xml,获取文字顺序 order_list = read_xml_order(path, unique_type_dir) if order_list == [-2]: return [-2] if order_list == [-1]: return [-1] text = "" # print("len(order_list)", len(order_list)) # print("len(paragraph_text_list)", len(paragraph_text_list)) # print("len(image_text_list)", len(image_text_list)) # print("len(table_text_list)", len(table_text_list)) for tag in order_list: if tag == "w:t": if len(paragraph_text_list) > 0: text += paragraph_text_list.pop(0) if tag == "wp:docPr": if len(image_text_list) > 0: text += image_text_list.pop(0) if tag == "w:tbl": if len(table_text_list) > 0: text += table_text_list.pop(0) return [text] except Exception as e: log("docx2text error!") print("docx2text", traceback.print_exc()) return [-1] @timeout(50, timeout_exception=TimeoutError) def read_xml_order(path, save_path): log("into read_xml_order") try: try: f = zipfile.ZipFile(path) for file in f.namelist(): if "word/document.xml" == str(file): f.extract(file, save_path) f.close() except Exception as e: log("docx format error!") return [-3] try: collection = xml_analyze(save_path + "word/document.xml") except TimeoutError: log("xml_analyze timeout") return [-4] body = collection.getElementsByTagName("w:body")[0] order_list = [] text_list = [] # 编号组记录 num_pr_dict = {} last_node_level = 0 for line in body.childNodes: # print(str(line)) if "w:p" in str(line): # 文本的编号(如果有编号的话) text_no = '' # 提取编号 组-层级-序号 num_pr = line.getElementsByTagName("w:numPr") if num_pr: num_pr = num_pr[0] group_id = int(num_pr.getElementsByTagName("w:numId")[0].getAttribute("w:val")) if group_id >= 1: node_level = num_pr.getElementsByTagName("w:ilvl") if node_level: node_level = int(node_level[0].getAttribute("w:val")) # print('node_level', node_level, 'last_node_level', last_node_level) if group_id in num_pr_dict.keys(): if last_node_level != 0 and node_level < last_node_level: # print('重置', 'group_id', group_id, 'last_node_level', last_node_level) # 需循环重置node_level到last_node_level之间的level for l in range(node_level+1, last_node_level+1): num_pr_dict[group_id][l] = 0 num_pr_dict[group_id][node_level] += 1 elif node_level in num_pr_dict[group_id].keys(): num_pr_dict[group_id][node_level] += 1 else: num_pr_dict[group_id][node_level] = 1 else: num_pr_dict[group_id] = {node_level: 1} # print(num_pr_dict[group_id]) for level in range(node_level+1): # 当前level下有多少个node level_node_cnt = num_pr_dict[group_id][level] # print('level_node_cnt', level_node_cnt) text_no += str(level_node_cnt) + '.' last_node_level = node_level # print('read_xml_order text_no', text_no) text = line.getElementsByTagName("w:t") picture = line.getElementsByTagName("wp:docPr") if text: order_list.append("w:t") temp_text = "" for t in text: if len(t.childNodes) > 0: temp_text += t.childNodes[0].nodeValue else: continue if text_no: temp_text = text_no + ' ' + temp_text text_list.append(temp_text) if picture: order_list.append("wp:docPr") for line1 in line.childNodes: if "w:r" in str(line1): # print("read_xml_order", "w:r") picture1 = line1.getElementsByTagName("w:pict") if picture1: order_list.append("wp:docPr") if "w:tbl" in str(line): order_list.append("w:tbl") read_xml_table(path, save_path) return [order_list, text_list] except Exception as e: log("read_xml_order error!") print("read_xml_order", traceback.print_exc()) # log_traceback("read_xml_order") return [-1] @timeout(50, timeout_exception=TimeoutError) def read_xml_table(path, save_path): log("into read_xml_table") try: try: f = zipfile.ZipFile(path) for file in f.namelist(): if "word/document.xml" == str(file): f.extract(file, save_path) f.close() except Exception as e: # print("docx format error!", e) log("docx format error!") return [-3] log("xml_analyze%s"%(save_path)) try: collection = xml_analyze(save_path + "word/document.xml") except TimeoutError: log("xml_analyze timeout") return [-4] log("xml_analyze done") body = collection.getElementsByTagName("w:body")[0] table_text_list = [] # print("body.childNodes", body.childNodes) for line in body.childNodes: if "w:tbl" in str(line): # print("str(line)", str(line)) table_text = '' tr_list = line.getElementsByTagName("w:tr") # print("line.childNodes", line.childNodes) tr_index = 0 tr_text_list = [] tr_text_list_colspan = [] for tr in tr_list: table_text = table_text + "" tc_list = tr.getElementsByTagName("w:tc") tc_index = 0 tc_text_list = [] for tc in tc_list: tc_text = "" # 获取一格占多少列 col_span = tc.getElementsByTagName("w:gridSpan") if col_span: col_span = int(col_span[0].getAttribute("w:val")) else: col_span = 1 # 获取是否是合并单元格的下一个空单元格 is_merge = tc.getElementsByTagName("w:vMerge") if is_merge: is_merge = is_merge[0].getAttribute("w:val") if is_merge == "continue": col_span_index = 0 real_tc_index = 0 # if get_platform() == "Windows": # print("read_xml_table tr_text_list", tr_text_list) # print("read_xml_table tr_index", tr_index) if 0 <= tr_index - 1 < len(tr_text_list): for tc_colspan in tr_text_list[tr_index - 1]: if col_span_index < tc_index: col_span_index += tc_colspan[1] real_tc_index += 1 # print("tr_index-1, real_tc_index", tr_index-1, real_tc_index) # print(tr_text_list[tr_index-1]) if real_tc_index < len(tr_text_list[tr_index - 1]): tc_text = tr_text_list[tr_index - 1][real_tc_index][0] table_text = table_text + "" tc_index += 1 tc_text_list.append([tc_text, col_span]) table_text += "" tr_index += 1 tr_text_list.append(tc_text_list) table_text += "
" p_list = tc.getElementsByTagName("w:p") for p in p_list: t = p.getElementsByTagName("w:t") if t: for tt in t: # print("tt", tt.childNodes) if len(tt.childNodes) > 0: tc_text += tt.childNodes[0].nodeValue table_text = table_text + tc_text + "
" table_text_list.append(table_text) return table_text_list except Exception as e: log("read_xml_table error") print("read_xml_table", traceback.print_exc()) return [-1] @timeout(25, timeout_exception=TimeoutError) def xml_analyze(path): # 解析xml DOMTree = xml.dom.minidom.parse(path) collection = DOMTree.documentElement return collection def read_docx_table(document): table_text_list = [] for table in document.tables: table_text = "" # print("==================") for row in table.rows: table_text += "" for cell in row.cells: table_text += "" table_text += "" table_text += "
" + re.sub("\s","",str(cell.text)) + "
" # print(table_text) table_text_list.append(table_text) return table_text_list class DocxConvert: def __init__(self, path, unique_type_dir): self._doc = _Document(path) self.path = path self.unique_type_dir = unique_type_dir @memory_decorator def init_package(self): # 各个包初始化 try: self.docx = docx.Document(self.path) self.zip = zipfile.ZipFile(self.path) except: log("cannot open docx!") traceback.print_exc() self._doc.error_code = [-3] def convert(self): self.init_package() if self._doc.error_code is not None: return order_and_text_list = self.get_orders() if judge_error_code(order_and_text_list): self._doc.error_code = order_and_text_list return order_list, text_list = order_and_text_list self._page = _Page(None, 0) # 乱码返回文件格式错误 match1 = re.findall(get_garble_code(), ''.join(text_list)) if len(match1) > 10: log("doc/docx garbled code!") # self._doc.error_code = [-3] _sen = _Sentence('文件乱码!', (0, 0, 0, 0)) self._page.add_child(_sen) self._doc.add_child(self._page) return # test # for i in range(len(text_list)): # print(order_list[i], text_list[i]) table_list = self.get_tables() if judge_error_code(table_list): self._doc.error_code = table_list return # paragraph_list = self.get_paragraphs() image_list = self.get_images() order_y = 0 doc_pr_cnt = 0 for tag in order_list: bbox = (0, order_y, 0, 0) if tag == "w:t": if len(text_list) > 0: _para = text_list.pop(0) _sen = _Sentence(_para, bbox) _sen.combine=False self._page.add_child(_sen) if tag == "wp:docPr": if len(image_list) > 0: temp_image_path = self.unique_type_dir + "docpr" + str(doc_pr_cnt) + ".png" _image = image_list.pop(0) with open(temp_image_path, "wb") as f: f.write(_image) _img = _Image(_image, temp_image_path, bbox) _img.is_from_docx = True self._page.add_child(_img) doc_pr_cnt += 1 if tag == "w:tbl": if len(table_list) > 0: _table = table_list.pop(0) _table = _Table(_table, bbox) _table.is_html = True self._page.add_child(_table) order_y += 1 if self._doc.error_code is None and self._page.error_code is not None: self._doc.error_code = self._page.error_code self._doc.add_child(self._page) def get_paragraphs(self): # 遍历段落 paragraph_list = [] for paragraph in self.docx.paragraphs: if paragraph.text != "": paragraph_list.append(paragraph.text) return paragraph_list @memory_decorator def get_tables(self): # 遍历表 table_list = read_xml_table(self.path, self.unique_type_dir) return table_list def get_images(self): # 顺序遍历图片 image_list = [] pattern = re.compile('rId\d+') for graph in self.docx.paragraphs: for run in graph.runs: if run.text == '': try: if not pattern.search(run.element.xml): continue content_id = pattern.search(run.element.xml).group(0) content_type = self.docx.part.related_parts[content_id].content_type except Exception as e: print("docx no image!", e) continue if not content_type.startswith('image'): continue img_data = self.docx.part.related_parts[content_id].blob if img_data is not None: image_list.append(img_data) return image_list @memory_decorator def get_orders(self): # 解析document.xml,获取文字顺序 order_and_text_list = read_xml_order(self.path, self.unique_type_dir) return order_and_text_list def get_doc_object(self): return self._doc def get_html(self): try: self.convert() except: traceback.print_exc() self._doc.error_code = [-1] if self._doc.error_code is not None: return self._doc.error_code return self._doc.get_html() if __name__ == '__main__': c = DocxConvert("C:/Users/Administrator/Downloads/1631944542835.docx", "C:/Users/Administrator/Downloads/1/") print(c.get_html())