123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import inspect
- import os
- import re
- import sys
- import chardet
- from bs4 import BeautifulSoup
- sys.path.append(os.path.dirname(__file__) + "/../")
- from format_convert.convert_tree import _Document, _Sentence, _Page
- import logging
- import traceback
- from format_convert import get_memory_info
- from format_convert.convert_docx import docx2text, DocxConvert
- from format_convert.convert_need_interface import from_office_interface, from_tika_interface
- from format_convert.utils import judge_error_code, get_logger, log
- @get_memory_info.memory_decorator
- def doc2text(path, unique_type_dir):
- log("into doc2text")
- try:
- # 调用office格式转换
- file_path = from_office_interface(path, unique_type_dir, 'docx')
- if judge_error_code(file_path):
- return file_path
- text = docx2text(file_path, unique_type_dir)
- return text
- except Exception as e:
- log("doc2text error!")
- print("doc2text", traceback.print_exc())
- return [-1]
- class DocConvert:
- def __init__(self, path, unique_type_dir):
- self._doc = _Document(path)
- self.path = path
- self.unique_type_dir = unique_type_dir
- self.tika_html = None
- def convert(self):
- # 先判断特殊doc文件,可能是html文本
- is_html_doc = False
- try:
- try:
- with open(self.path, 'r') as f:
- html_str = f.read()
- except UnicodeDecodeError:
- with open(self.path, 'r', errors='ignore') as f:
- html_str = f.read()
- # if re.search('<div|<html|<body|<head|<tr|<br|<table|<td|<p>|<span', html_str):
- if len(re.findall('<div|<html|<body|<head|<tr|<br|<table|<td|<p>|<span', html_str)) >= 10:
- log('doc as html!')
- soup = BeautifulSoup(html_str, 'lxml')
- text = soup.text
- is_html_doc = True
- except:
- pass
- if is_html_doc:
- self._page = _Page(None, 0)
- _sen = _Sentence(text, (0, 0, 0, 0))
- self._page.add_child(_sen)
- self._doc.add_child(self._page)
- else:
- # 调用office格式转换
- file_path = from_office_interface(self.path, self.unique_type_dir, 'docx')
- if judge_error_code(file_path):
- # 调用tika提取
- html = from_tika_interface(self.path)
- if judge_error_code(html):
- self._doc.error_code = html
- self.tika_html = html
- return
- _docx = DocxConvert(file_path, self.unique_type_dir)
- _docx.convert()
- self._doc = _docx._doc
- if self._doc.error_code is not None:
- # 调用tika提取
- html = from_tika_interface(self.path)
- if judge_error_code(html):
- self._doc.error_code = html
- self.tika_html = html
- self._doc.error_code = None
- return
- def get_html(self):
- try:
- self.convert()
- except:
- traceback.print_exc()
- self._doc.error_code = [-1]
- if self._doc.error_code is not None:
- return self._doc.error_code
- if self.tika_html is not None:
- return [self.tika_html]
- # print(self._doc.children)
- return self._doc.get_html()
- def parse_summary_info(data):
- # 解析 OLE 属性集格式
- import olefile
- from olefile import OleFileIO, OleMetadata
- from io import BytesIO
- ole_metadata = OleMetadata()
- for prop in ole_metadata.parse_properties(data):
- print(f"{prop}: {ole_metadata.properties[prop]}")
- if __name__ == '__main__':
- # c = DocConvert("C:/Users/Administrator/Downloads/-4274446916340743056.doc", "C:/Users/Administrator/Downloads/1")
- # print(c.get_html())
- _p = "C:/Users/Administrator/Downloads/1716253106319.doc"
- # with open(_p, 'rb') as f:
- # _str = f.read()
- # print(_str.decode("utf-16le"))
- # import olefile
- # import chardet
- # # 打开 CFBF 格式文件
- # ole = olefile.OleFileIO(_p)
- #
- # ole_meta = ole.get_metadata()
- #
- # for attr in dir(ole_meta):
- # if '__' in attr:
- # continue
- #
- # print(attr, getattr(ole_meta, attr))
- #
- # # 获取根目录流
- # root_stream = ole.root
- #
- # parse_summary_info(ole)
- #
- # # 获取根目录流中的目录项
- # for files in ole.listdir():
- # for entry in files:
- # print(entry)
- # _stream = ole.openstream(entry).read()
- #
- # encoding = chardet.detect(_stream).get('encoding')
- # print(chardet.detect(_stream))
- # print(len(_stream) / 4)
- # print(parse_summary_info(_stream))
- # if not encoding:
- # encoding = "utf-16-le"
- # elif encoding in ['X-ISO-10646-UCS-4-3412']:
- # encoding = 'ISO-10646'
- # print(_stream.decode(encoding))
- # if encoding in ['ascii']:
- # print(_stream.decode('ascii'))
- # 输出目录项的名称和大小
- # print(f"名称:{entry.name}, 大小:{entry.stg_size} 字节")
- # 如果是流,读取其内容
- # if entry.is_stream():
- # data = root_stream.openstream(entry.name).read()
|