document_processor.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. import os
  2. import re
  3. from loguru import logger
  4. class Document(object):
  5. def __init__(self, page_content="", metadata=None):
  6. self.page_content = page_content
  7. self.metadata = metadata if metadata is not None else {}
  8. class DocumentProcessor:
  9. def __init__(
  10. self,
  11. chunk_size=512,
  12. chunk_overlap=50,
  13. separators=None
  14. ):
  15. self.chunk_size = chunk_size
  16. self.chunk_overlap = chunk_overlap
  17. self.separators = separators or ["\n\n", "\n", " ", ""]
  18. self.text_splitter = None
  19. def _get_text_splitter(self):
  20. if self.text_splitter is None:
  21. from langchain_text_splitters import RecursiveCharacterTextSplitter
  22. self.text_splitter = RecursiveCharacterTextSplitter(
  23. chunk_size=self.chunk_size,
  24. chunk_overlap=self.chunk_overlap,
  25. separators=self.separators,
  26. length_function=len,
  27. )
  28. return self.text_splitter
  29. def _load_html_document(self, file_path):
  30. with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
  31. html = f.read()
  32. from bs4 import BeautifulSoup
  33. soup = BeautifulSoup(html, "lxml")
  34. for tag in soup(["script", "style", "noscript"]):
  35. tag.decompose()
  36. body = soup.find("body") or soup
  37. lines = []
  38. for element in body.find_all(["h1", "h2", "h3", "h4", "h5", "p", "li", "tr"], recursive=True):
  39. if element.name == "tr":
  40. cells = [
  41. re.sub(r"\s+", " ", cell.get_text(" ", strip=True))
  42. for cell in element.find_all(["th", "td"], recursive=False)
  43. ]
  44. text = " | ".join(cell for cell in cells if cell)
  45. else:
  46. if element.find_parent("tr") is not None:
  47. continue
  48. text = element.get_text(" ", strip=True)
  49. text = re.sub(r"\s+", " ", text)
  50. if text and (not lines or lines[-1] != text):
  51. lines.append(text)
  52. if not lines:
  53. text = body.get_text("\n", strip=True)
  54. lines = [line.strip() for line in text.splitlines() if line.strip()]
  55. return [
  56. Document(
  57. page_content="\n".join(lines),
  58. metadata={"source": file_path, "file_type": "html"},
  59. )
  60. ]
  61. def load_document(self, file_path):
  62. ext = os.path.splitext(file_path)[1].lower()
  63. if ext in (".html", ".htm"):
  64. return self._load_html_document(file_path)
  65. from langchain_community.document_loaders import (
  66. PyPDFLoader,
  67. Docx2txtLoader,
  68. TextLoader,
  69. UnstructuredExcelLoader,
  70. )
  71. loader_map = {
  72. ".pdf": PyPDFLoader,
  73. ".docx": Docx2txtLoader,
  74. ".txt": TextLoader,
  75. ".xlsx": UnstructuredExcelLoader,
  76. ".xls": UnstructuredExcelLoader,
  77. }
  78. loader_cls = loader_map.get(ext)
  79. if not loader_cls:
  80. raise ValueError("Unsupported file format: {}".format(ext))
  81. loader = loader_cls(file_path)
  82. raw_docs = loader.load()
  83. return [
  84. Document(
  85. page_content=doc.page_content,
  86. metadata=dict(doc.metadata, source=file_path)
  87. )
  88. for doc in raw_docs
  89. ]
  90. def load_directory(self, dir_path):
  91. all_docs = []
  92. for root, _, files in os.walk(dir_path):
  93. for file in files:
  94. file_path = os.path.join(root, file)
  95. try:
  96. docs = self.load_document(file_path)
  97. all_docs.extend(docs)
  98. logger.info("Loaded {} chunks from {}".format(len(docs), file_path))
  99. except Exception as e:
  100. logger.warning("Failed to load {}: {}".format(file_path, e))
  101. return all_docs
  102. def split_documents(self, documents):
  103. from langchain_core.documents import Document as LCDocument
  104. text_splitter = self._get_text_splitter()
  105. lc_docs = [
  106. LCDocument(page_content=doc.page_content, metadata=doc.metadata)
  107. for doc in documents
  108. ]
  109. split_docs = text_splitter.split_documents(lc_docs)
  110. return [
  111. Document(page_content=doc.page_content, metadata=doc.metadata)
  112. for doc in split_docs
  113. ]
  114. def process(self, source_path):
  115. if os.path.isfile(source_path):
  116. docs = self.load_document(source_path)
  117. elif os.path.isdir(source_path):
  118. docs = self.load_directory(source_path)
  119. else:
  120. raise ValueError("Source path does not exist: {}".format(source_path))
  121. return self.split_documents(docs)