import os import re from loguru import logger class Document(object): def __init__(self, page_content="", metadata=None): self.page_content = page_content self.metadata = metadata if metadata is not None else {} class DocumentProcessor: def __init__( self, chunk_size=512, chunk_overlap=50, separators=None ): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap self.separators = separators or ["\n\n", "\n", " ", ""] self.text_splitter = None def _get_text_splitter(self): if self.text_splitter is None: from langchain_text_splitters import RecursiveCharacterTextSplitter self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.chunk_overlap, separators=self.separators, length_function=len, ) return self.text_splitter def _load_html_document(self, file_path): with open(file_path, "r", encoding="utf-8", errors="ignore") as f: html = f.read() from bs4 import BeautifulSoup soup = BeautifulSoup(html, "lxml") for tag in soup(["script", "style", "noscript"]): tag.decompose() body = soup.find("body") or soup lines = [] for element in body.find_all(["h1", "h2", "h3", "h4", "h5", "p", "li", "tr"], recursive=True): if element.name == "tr": cells = [ re.sub(r"\s+", " ", cell.get_text(" ", strip=True)) for cell in element.find_all(["th", "td"], recursive=False) ] text = " | ".join(cell for cell in cells if cell) else: if element.find_parent("tr") is not None: continue text = element.get_text(" ", strip=True) text = re.sub(r"\s+", " ", text) if text and (not lines or lines[-1] != text): lines.append(text) if not lines: text = body.get_text("\n", strip=True) lines = [line.strip() for line in text.splitlines() if line.strip()] return [ Document( page_content="\n".join(lines), metadata={"source": file_path, "file_type": "html"}, ) ] def load_document(self, file_path): ext = os.path.splitext(file_path)[1].lower() if ext in (".html", ".htm"): return self._load_html_document(file_path) from langchain_community.document_loaders import ( PyPDFLoader, Docx2txtLoader, TextLoader, UnstructuredExcelLoader, ) loader_map = { ".pdf": PyPDFLoader, ".docx": Docx2txtLoader, ".txt": TextLoader, ".xlsx": UnstructuredExcelLoader, ".xls": UnstructuredExcelLoader, } loader_cls = loader_map.get(ext) if not loader_cls: raise ValueError("Unsupported file format: {}".format(ext)) loader = loader_cls(file_path) raw_docs = loader.load() return [ Document( page_content=doc.page_content, metadata=dict(doc.metadata, source=file_path) ) for doc in raw_docs ] def load_directory(self, dir_path): all_docs = [] for root, _, files in os.walk(dir_path): for file in files: file_path = os.path.join(root, file) try: docs = self.load_document(file_path) all_docs.extend(docs) logger.info("Loaded {} chunks from {}".format(len(docs), file_path)) except Exception as e: logger.warning("Failed to load {}: {}".format(file_path, e)) return all_docs def split_documents(self, documents): from langchain_core.documents import Document as LCDocument text_splitter = self._get_text_splitter() lc_docs = [ LCDocument(page_content=doc.page_content, metadata=doc.metadata) for doc in documents ] split_docs = text_splitter.split_documents(lc_docs) return [ Document(page_content=doc.page_content, metadata=doc.metadata) for doc in split_docs ] def process(self, source_path): if os.path.isfile(source_path): docs = self.load_document(source_path) elif os.path.isdir(source_path): docs = self.load_directory(source_path) else: raise ValueError("Source path does not exist: {}".format(source_path)) return self.split_documents(docs)