Source code for manolo_bot.ai.document_loaders

import io
import logging
from collections.abc import Iterator
from typing import BinaryIO

from langchain_community.document_loaders.base import BaseBlobParser
from langchain_community.document_loaders.parsers import PyPDFParser
from langchain_community.document_loaders.parsers.txt import TextParser
from langchain_core.document_loaders import Blob
from langchain_core.documents import Document


[docs] class UnsupportedFileError(ValueError): """Exception raised when a file format is not supported.""" pass
[docs] class DocxParser(BaseBlobParser): """ Parser for DOCX files using python-docx. Follows the LangChain BaseBlobParser interface. We use this instead of MsWordParser to avoid the heavy 'unstructured' dependency. """
[docs] def lazy_parse(self, blob: Blob) -> Iterator[Document]: import docx with blob.as_bytes_io() as file_like: doc = docx.Document(file_like) text = "\n".join([paragraph.text for paragraph in doc.paragraphs]) yield Document(page_content=text, metadata={"source": blob.source})
[docs] class DocumentLoader: """ Utility class for extracting text from different document formats using LangChain native parsers. """ _extensions = {"pdf": ["pdf"], "docx": ["docx"], "txt": ["txt", "md", "csv"]} SUPPORTED_EXTENSIONS = [ext for ext_list in _extensions.values() for ext in ext_list] def __init__(self): # mode="single" returns the whole document as one Document object self.pdf_parser = PyPDFParser(mode="single", pages_delimiter="\n") self.docx_parser = DocxParser() self.txt_parser = TextParser()
[docs] @classmethod def validate_filename(cls, filename: str) -> None: """ Validates if a filename has a supported extension. :param filename: The filename to validate. :raises UnsupportedFileError: If the extension is not supported. """ if not filename or "." not in filename: raise UnsupportedFileError("File has no extension") extension = filename.split(".")[-1].lower() if extension not in cls.SUPPORTED_EXTENSIONS: raise UnsupportedFileError(f"Unsupported file extension: {extension}")
[docs] def extract_text_from_pdf(self, file: BinaryIO) -> str: """ Extracts text from a PDF file using PyPDFParser. """ try: blob = Blob.from_data(file.read(), mime_type="application/pdf") docs = list(self.pdf_parser.lazy_parse(blob)) return "\n".join([doc.page_content for doc in docs]).strip() except Exception as e: logging.error(f"Error extracting text from PDF: {e}") raise
[docs] def extract_text_from_docx(self, file: BinaryIO) -> str: """ Extracts text from a DOCX file using DocxParser. """ try: blob = Blob.from_data( file.read(), mime_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document" ) docs = list(self.docx_parser.lazy_parse(blob)) return "\n".join([doc.page_content for doc in docs]).strip() except Exception as e: logging.error(f"Error extracting text from DOCX: {e}") raise
[docs] def extract_text_from_txt(self, file: BinaryIO) -> str: """ Extracts text from a TXT/MD/CSV file using TextParser. """ try: blob = Blob.from_data(file.read(), mime_type="text/plain") docs = list(self.txt_parser.lazy_parse(blob)) return "\n".join([doc.page_content for doc in docs]).strip() except Exception as e: logging.error(f"Error extracting text from TXT: {e}") raise
[docs] def extract_text(self, file_content: bytes, filename: str) -> str: """ Dispatcher method to extract text based on file extension. """ # Ensure validation is performed (it will raise UnsupportedFileError if invalid) self.validate_filename(filename) file_like = io.BytesIO(file_content) extension = filename.split(".")[-1].lower() if extension in self._extensions["pdf"]: return self.extract_text_from_pdf(file_like) elif extension in self._extensions["docx"]: return self.extract_text_from_docx(file_like) else: # Must be txt, md, or csv based on validate_filename check return self.extract_text_from_txt(file_like)
[docs] def clean_text(text: str) -> str: """ Basic text cleaning to reduce token usage. Removes multiple whitespaces and newlines. """ import re # Replace multiple spaces with a single space text = re.sub(r" +", " ", text) # Replace multiple newlines with a double newline text = re.sub(r"\n{3,}", "\n\n", text) return text.strip()