""" Module for ingesting data to be used by the RAG tool. """ import glob import os from typing import List from multiprocessing import Pool from tqdm import tqdm from langchain_community.document_loaders import ( CSVLoader, PyMuPDFLoader, TextLoader, UnstructuredWordDocumentLoader, UnstructuredPowerPointLoader, UnstructuredMarkdownLoader, UnstructuredEPubLoader, ) from langchain_community.vectorstores.chroma import Chroma from langchain_openai.embeddings import OpenAIEmbeddings from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_core.documents import Document import chromadb from dotenv import ( load_dotenv, find_dotenv, ) from fastapi import APIRouter from constants import CHROMA_SETTINGS ingestion_router = APIRouter() if not load_dotenv(find_dotenv()): print("Could not load `.env` file or it is empty. Please check that it exists \ and is readable by the current user") OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") embeddings_model = OpenAIEmbeddings() # Load environment variables persist_directory = os.environ.get("PERSIST_DIRECTORY", "chroma_vectorstore") source_directory = os.environ.get('SOURCE_DIRECTORY', "data") CHUNK_SIZE = 1000 CHUNK_OVERLAP = 200 LOADER_MAPPING = { ".csv": (CSVLoader, {}), ".doc": (UnstructuredWordDocumentLoader, {}), ".docx": (UnstructuredWordDocumentLoader, {}), ".epub": (UnstructuredEPubLoader, {}), ".md": (UnstructuredMarkdownLoader, {}), ".pdf": (PyMuPDFLoader, {}), ".ppt": (UnstructuredPowerPointLoader, {}), ".pptx": (UnstructuredPowerPointLoader, {}), ".txt": (TextLoader, {"encoding": "utf8"}), # ".json": (JSONLoader, {"jq_schema": ".", "text_content": False}) } def load_single_document(file_path: str) -> List[Document]: ext = "." + file_path.rsplit(".", 1)[-1].lower() print(file_path) if ext in LOADER_MAPPING: loader_class, loader_args = LOADER_MAPPING[ext] loader = loader_class(file_path, **loader_args) return loader.load() raise ValueError(f"Unsupported file extension '{ext}'") def load_documents( source_dir: str, ignored_files: List[str] = [] ) -> List[Document]: """ Loads all documents from the source documents directory, ignoring specified files """ all_files = [] for ext in LOADER_MAPPING: all_files.extend( glob.glob(os.path.join(source_dir, f"**/*{ext.lower()}"), recursive=True) ) all_files.extend( glob.glob(os.path.join(source_dir, f"**/*{ext.upper()}"), recursive=True) ) filtered_files = [file_path for file_path in all_files if file_path not in ignored_files] with Pool(processes=os.cpu_count()) as pool: results = [] with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar: for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)): results.extend(docs) pbar.update() return results def process_documents(ignored_files: List[str] = []) -> List[Document]: """ Load documents and split in chunks """ print(f"Loading documents from {source_directory}") documents = load_documents(source_directory, ignored_files) if not documents: print("No new documents to load") return None print(f"Loaded {len(documents)} new documents from {source_directory}") text_splitter = RecursiveCharacterTextSplitter( chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP ) texts = text_splitter.split_documents(documents) print(f"Split into {len(texts)} chunks of text (max. {CHUNK_SIZE} tokens each)") return texts def does_vectorstore_exist( persist_dir: str, embeddings: OpenAIEmbeddings ) -> bool: """ Checks if vectorstore exists """ db = Chroma( persist_directory=persist_dir, embedding_function=embeddings, client_settings=CHROMA_SETTINGS, ) if not db.get()['documents']: return False return True @ingestion_router.post("/ingest-data", summary="For ingesting data for RAG") def main(): try: # Create embeddings embeddings = OpenAIEmbeddings(api_key=OPENAI_API_KEY) # Chroma client chroma_client = chromadb.PersistentClient( settings=CHROMA_SETTINGS, path=persist_directory ) if does_vectorstore_exist(persist_directory, embeddings): # Update and store locally vectorstore print(f"Appending to existing vectorstore at {persist_directory}") db = Chroma( persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS, client=chroma_client ) collection = db.get() texts = process_documents( [metadata['source'] for metadata in collection['metadatas']] ) if not texts: return "No new document to load" print("Creating embeddings. May take some minutes...") db.add_documents(texts) else: # Create and store locally vectorstore print("Creating new vectorstore") texts = process_documents() if not texts: return "No new document to load" print("Creating embeddings. May take some minutes...") db = Chroma.from_documents( texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS, client=chroma_client ) db.persist() db = None print("Ingestion complete!") return { 'Status': 'Ingestion complete!', "responseCode": 200 } # If an error occurs except Exception as e: print(e) return { "Status": "An error occurred", "responseCode": 201 }