| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- import os
- import re
- from loguru import logger
- class Document(object):
- def __init__(self, page_content="", metadata=None):
- self.page_content = page_content
- self.metadata = metadata if metadata is not None else {}
- class DocumentProcessor:
- def __init__(
- self,
- chunk_size=512,
- chunk_overlap=50,
- separators=None
- ):
- self.chunk_size = chunk_size
- self.chunk_overlap = chunk_overlap
- self.separators = separators or ["\n\n", "\n", " ", ""]
- self.text_splitter = None
- def _get_text_splitter(self):
- if self.text_splitter is None:
- from langchain_text_splitters import RecursiveCharacterTextSplitter
- self.text_splitter = RecursiveCharacterTextSplitter(
- chunk_size=self.chunk_size,
- chunk_overlap=self.chunk_overlap,
- separators=self.separators,
- length_function=len,
- )
- return self.text_splitter
- def _load_html_document(self, file_path):
- with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
- html = f.read()
- from bs4 import BeautifulSoup
- soup = BeautifulSoup(html, "lxml")
- for tag in soup(["script", "style", "noscript"]):
- tag.decompose()
- body = soup.find("body") or soup
- lines = []
- for element in body.find_all(["h1", "h2", "h3", "h4", "h5", "p", "li", "tr"], recursive=True):
- if element.name == "tr":
- cells = [
- re.sub(r"\s+", " ", cell.get_text(" ", strip=True))
- for cell in element.find_all(["th", "td"], recursive=False)
- ]
- text = " | ".join(cell for cell in cells if cell)
- else:
- if element.find_parent("tr") is not None:
- continue
- text = element.get_text(" ", strip=True)
- text = re.sub(r"\s+", " ", text)
- if text and (not lines or lines[-1] != text):
- lines.append(text)
- if not lines:
- text = body.get_text("\n", strip=True)
- lines = [line.strip() for line in text.splitlines() if line.strip()]
- return [
- Document(
- page_content="\n".join(lines),
- metadata={"source": file_path, "file_type": "html"},
- )
- ]
- def load_document(self, file_path):
- ext = os.path.splitext(file_path)[1].lower()
- if ext in (".html", ".htm"):
- return self._load_html_document(file_path)
- from langchain_community.document_loaders import (
- PyPDFLoader,
- Docx2txtLoader,
- TextLoader,
- UnstructuredExcelLoader,
- )
- loader_map = {
- ".pdf": PyPDFLoader,
- ".docx": Docx2txtLoader,
- ".txt": TextLoader,
- ".xlsx": UnstructuredExcelLoader,
- ".xls": UnstructuredExcelLoader,
- }
- loader_cls = loader_map.get(ext)
- if not loader_cls:
- raise ValueError("Unsupported file format: {}".format(ext))
-
- loader = loader_cls(file_path)
- raw_docs = loader.load()
- return [
- Document(
- page_content=doc.page_content,
- metadata=dict(doc.metadata, source=file_path)
- )
- for doc in raw_docs
- ]
- def load_directory(self, dir_path):
- all_docs = []
- for root, _, files in os.walk(dir_path):
- for file in files:
- file_path = os.path.join(root, file)
- try:
- docs = self.load_document(file_path)
- all_docs.extend(docs)
- logger.info("Loaded {} chunks from {}".format(len(docs), file_path))
- except Exception as e:
- logger.warning("Failed to load {}: {}".format(file_path, e))
- return all_docs
- def split_documents(self, documents):
- from langchain_core.documents import Document as LCDocument
- text_splitter = self._get_text_splitter()
- lc_docs = [
- LCDocument(page_content=doc.page_content, metadata=doc.metadata)
- for doc in documents
- ]
- split_docs = text_splitter.split_documents(lc_docs)
- return [
- Document(page_content=doc.page_content, metadata=doc.metadata)
- for doc in split_docs
- ]
- def process(self, source_path):
- if os.path.isfile(source_path):
- docs = self.load_document(source_path)
- elif os.path.isdir(source_path):
- docs = self.load_directory(source_path)
- else:
- raise ValueError("Source path does not exist: {}".format(source_path))
- return self.split_documents(docs)
|