123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- import inspect
- import os
- import re
- import sys
- import chardet
- from bs4 import BeautifulSoup
- sys.path.append(os.path.dirname(__file__) + "/../")
- from format_convert.convert_tree import _Document, _Sentence, _Page, _Image, _Table
- import logging
- import traceback
- from format_convert import get_memory_info
- from format_convert.convert_docx import docx2text, DocxConvert
- from format_convert.convert_need_interface import from_office_interface, from_tika_interface
- from format_convert.utils import judge_error_code, get_logger, log
- @get_memory_info.memory_decorator
- def doc2text(path, unique_type_dir):
- log("into doc2text")
- try:
- # 调用office格式转换
- file_path = from_office_interface(path, unique_type_dir, 'docx')
- if judge_error_code(file_path):
- return file_path
- text = docx2text(file_path, unique_type_dir)
- return text
- except Exception as e:
- log("doc2text error!")
- print("doc2text", traceback.print_exc())
- return [-1]
- class DocConvert:
- def __init__(self, path, unique_type_dir):
- self._doc = _Document(path)
- self._page = _Page(None, 0)
- self.path = path
- self.unique_type_dir = unique_type_dir
- self.tika_html = None
- print('into DocConvert __init__')
- def convert(self):
- print('into DocConvert convert')
- # 先判断特殊doc文件,可能是html文本
- # is_html_doc = False
- # try:
- # try:
- # with open(self.path, 'r') as f:
- # html_str = f.read()
- # except UnicodeDecodeError:
- # with open(self.path, 'r', errors='ignore') as f:
- # html_str = f.read()
- # # if re.search('<div|<html|<body|<head|<tr|<br|<table|<td|<p>|<span', html_str):
- # if len(re.findall('<div|<html|<body|<head|<tr|<br|<table|<td|<p>|<span', html_str)) >= 10:
- # log('doc as html!')
- # soup = BeautifulSoup(html_str, 'lxml')
- # text = soup.text
- # is_html_doc = True
- # except:
- # pass
- #
- # if is_html_doc:
- # self._page = _Page(None, 0)
- # _sen = _Sentence(text, (0, 0, 0, 0))
- # self._page.add_child(_sen)
- # self._doc.add_child(self._page)
- # 先判断特殊doc文件,可能是html文本
- is_html_doc = self.maybe_html()
- if not is_html_doc:
- # 调用office格式转换
- file_path = from_office_interface(self.path, self.unique_type_dir, 'docx')
- if judge_error_code(file_path):
- # office转换失败,调用tika,提取各个类型对象
- try:
- self.use_tika(self.path)
- except:
- traceback.print_exc()
- self._doc.error_code = [-17]
- log('doc tika failed too')
- return
- _docx = DocxConvert(file_path, self.unique_type_dir)
- _docx.convert()
- self._doc = _docx._doc
- # if self._doc.error_code is not None:
- # # docx提取失败,调用tika,提取各个类型对象
- # print('DocxConvert failed use_tika')
- # self.use_tika(self.path)
- # self._doc.error_code = None
- # # # 调用tika提取
- # # html = from_tika_interface(self.path)
- # # if judge_error_code(html):
- # # self._doc.error_code = html
- # # self.tika_html = html
- # # self._doc.error_code = None
- # return
- def maybe_html(self):
- # 先判断特殊doc文件,可能是html文本
- is_html_doc = False
- try:
- try:
- with open(self.path, 'r') as f:
- html_str = f.read()
- except UnicodeDecodeError:
- with open(self.path, 'r', errors='ignore') as f:
- html_str = f.read()
- # if re.search('<div|<html|<body|<head|<tr|<br|<table|<td|<p>|<span', html_str):
- if len(re.findall('<div|<html|<body|<head|<tr|<br|<table|<td|<p>|<span', html_str)) >= 10:
- log('doc as html!')
- soup = BeautifulSoup(html_str, 'lxml')
- text = soup.text
- is_html_doc = True
- except:
- pass
- if is_html_doc:
- self._page = _Page(None, 0)
- _sen = _Sentence(text, (0, 0, 0, 0))
- self._page.add_child(_sen)
- self._doc.add_child(self._page)
- return is_html_doc
- def use_tika(self, _path):
- # 调用tika提取
- # html = from_tika_interface(self.path)
- # if judge_error_code(html):
- # self._doc.error_code = html
- # self.tika_html = html
- data = from_tika_interface(_path)
- if judge_error_code(data):
- self._doc.error_code = data
- return
- current_y = 5
- for di, d in enumerate(data):
- data_type, value = d
- bbox = [0, current_y, 20, current_y+10]
- current_y += 20
- if data_type == 'text':
- _sen = _Sentence(value, bbox)
- _sen.combine = False
- self._page.add_child(_sen)
- elif data_type == 'img':
- with open(value, "rb") as f:
- img = f.read()
- _img = _Image(img, value, bbox)
- _img.is_from_docx = True
- self._page.add_child(_img)
- elif data_type == 'table':
- _table = _Table(value, bbox)
- _table.is_html = True
- self._page.add_child(_table)
- self._doc.add_child(self._page)
- def get_html(self):
- try:
- self.convert()
- except:
- traceback.print_exc()
- self._doc.error_code = [-1]
- if self._doc.error_code is not None:
- return self._doc.error_code
- if self.tika_html is not None:
- return [self.tika_html]
- # print(self._doc.children)
- return self._doc.get_html()
- def parse_summary_info(data):
- # 解析 OLE 属性集格式
- import olefile
- from olefile import OleFileIO, OleMetadata
- from io import BytesIO
- ole_metadata = OleMetadata()
- for prop in ole_metadata.parse_properties(data):
- print(f"{prop}: {ole_metadata.properties[prop]}")
- if __name__ == '__main__':
- # c = DocConvert("C:/Users/Administrator/Downloads/-4274446916340743056.doc", "C:/Users/Administrator/Downloads/1")
- # print(c.get_html())
- _p = "C:/Users/Administrator/Downloads/1716253106319.doc"
- # with open(_p, 'rb') as f:
- # _str = f.read()
- # print(_str.decode("utf-16le"))
- # import olefile
- # import chardet
- # # 打开 CFBF 格式文件
- # ole = olefile.OleFileIO(_p)
- #
- # ole_meta = ole.get_metadata()
- #
- # for attr in dir(ole_meta):
- # if '__' in attr:
- # continue
- #
- # print(attr, getattr(ole_meta, attr))
- #
- # # 获取根目录流
- # root_stream = ole.root
- #
- # parse_summary_info(ole)
- #
- # # 获取根目录流中的目录项
- # for files in ole.listdir():
- # for entry in files:
- # print(entry)
- # _stream = ole.openstream(entry).read()
- #
- # encoding = chardet.detect(_stream).get('encoding')
- # print(chardet.detect(_stream))
- # print(len(_stream) / 4)
- # print(parse_summary_info(_stream))
- # if not encoding:
- # encoding = "utf-16-le"
- # elif encoding in ['X-ISO-10646-UCS-4-3412']:
- # encoding = 'ISO-10646'
- # print(_stream.decode(encoding))
- # if encoding in ['ascii']:
- # print(_stream.decode('ascii'))
- # 输出目录项的名称和大小
- # print(f"名称:{entry.name}, 大小:{entry.stg_size} 字节")
- # 如果是流,读取其内容
- # if entry.is_stream():
- # data = root_stream.openstream(entry.name).read()
|