Spaces:
Runtime error
Runtime error
import os | |
import glob | |
from typing import List | |
from dotenv import load_dotenv | |
from multiprocessing import Pool | |
from tqdm import tqdm | |
from flask import Flask, jsonify, Blueprint | |
from langchain.document_loaders import ( | |
CSVLoader, | |
EverNoteLoader, | |
PyMuPDFLoader, | |
TextLoader, | |
UnstructuredEmailLoader, | |
UnstructuredEPubLoader, | |
UnstructuredHTMLLoader, | |
UnstructuredMarkdownLoader, | |
UnstructuredODTLoader, | |
UnstructuredPowerPointLoader, | |
UnstructuredWordDocumentLoader, | |
JSONLoader | |
) | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings import OllamaEmbeddings | |
from langchain.docstore.document import Document | |
#if not load_dotenv(): | |
if not load_dotenv(".env"): | |
print("Could not load .env file. It either does not exists or it is empty.") | |
exit(1) | |
from constants import CHROMA_SETTINGS | |
import chromadb | |
data_ingestion = Blueprint('ingest', __name__) | |
# Load environment variables | |
persist_directory = os.environ.get("PERSIST_DIRECTORY", "db") | |
source_directory = os.environ.get('SOURCE_DIRECTORY', "data") | |
embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME') | |
chunk_size = 200 | |
chunk_overlap = 30 | |
# Custom document loaders | |
class MyElmLoader(UnstructuredEmailLoader): | |
"""Wrapper to fallback to text/plain when default does not work""" | |
def load(self) -> List[Document]: | |
"""Wrapper adding fallback for elm without html""" | |
try: | |
try: | |
doc = UnstructuredEmailLoader.load(self) | |
except ValueError as e: | |
if 'text/html content not found in email' in str(e): | |
# Try plain text | |
self.unstructured_kwargs["content_source"]="text/plain" | |
doc = UnstructuredEmailLoader.load(self) | |
else: | |
raise | |
except Exception as e: | |
# Add file_path to exception message | |
raise type(e)(f"{self.file_path}: {e}") from e | |
return doc | |
# Map file extensions to document loaders and their arguments | |
LOADER_MAPPING = { | |
".csv": (CSVLoader, {}), | |
# ".docx": (Docx2txtLoader, {}), | |
".doc": (UnstructuredWordDocumentLoader, {}), | |
".docx": (UnstructuredWordDocumentLoader, {}), | |
".enex": (EverNoteLoader, {}), | |
".eml": (MyElmLoader, {}), | |
".epub": (UnstructuredEPubLoader, {}), | |
".html": (UnstructuredHTMLLoader, {}), | |
".md": (UnstructuredMarkdownLoader, {}), | |
".odt": (UnstructuredODTLoader, {}), | |
".pdf": (PyMuPDFLoader, {}), | |
".ppt": (UnstructuredPowerPointLoader, {}), | |
".pptx": (UnstructuredPowerPointLoader, {}), | |
".txt": (TextLoader, {"encoding": "utf8"}), | |
".json": (JSONLoader, {"jq_schema": ".", "text_content": False}) | |
# Add more mappings for other file extensions and loaders as needed | |
} | |
def load_single_document(file_path: str) -> List[Document]: | |
ext = "." + file_path.rsplit(".", 1)[-1].lower() | |
print(file_path) | |
if ext in LOADER_MAPPING: | |
loader_class, loader_args = LOADER_MAPPING[ext] | |
loader = loader_class(file_path, **loader_args) | |
return loader.load() | |
raise ValueError(f"Unsupported file extension '{ext}'") | |
def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]: | |
""" | |
Loads all documents from the source documents directory, ignoring specified files | |
""" | |
all_files = [] | |
for ext in LOADER_MAPPING: | |
all_files.extend( | |
glob.glob(os.path.join(source_dir, f"**/*{ext.lower()}"), recursive=True) | |
) | |
all_files.extend( | |
glob.glob(os.path.join(source_dir, f"**/*{ext.upper()}"), recursive=True) | |
) | |
filtered_files = [file_path for file_path in all_files if file_path not in ignored_files] | |
with Pool(processes=os.cpu_count()) as pool: | |
results = [] | |
with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar: | |
for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)): | |
results.extend(docs) | |
pbar.update() | |
return results | |
def process_documents(ignored_files: List[str] = []) -> List[Document]: | |
""" | |
Load documents and split in chunks | |
""" | |
print(f"Loading documents from {source_directory}") | |
documents = load_documents(source_directory, ignored_files) | |
if not documents: | |
print("No new documents to load") | |
exit(0) | |
print(f"Loaded {len(documents)} new documents from {source_directory}") | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) | |
texts = text_splitter.split_documents(documents) | |
print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)") | |
return texts | |
def does_vectorstore_exist(persist_directory: str, embeddings: OllamaEmbeddings) -> bool: | |
""" | |
Checks if vectorstore exists | |
""" | |
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS) | |
if not db.get()['documents']: | |
return False | |
return True | |
def base(): | |
return jsonify( | |
{ | |
"msgcode": 1, | |
"status": "success", | |
"message": "Welcome to the data ingestion system" | |
} | |
), 200 | |
def main(): | |
try: | |
# Create embeddings | |
embeddings = OllamaEmbeddings(model=embeddings_model_name) | |
# Chroma client | |
print("Here ") | |
chroma_client = chromadb.PersistentClient(settings=CHROMA_SETTINGS , path=persist_directory) | |
if does_vectorstore_exist(persist_directory, embeddings): | |
# Update and store locally vectorstore | |
print(f"Appending to existing vectorstore at {persist_directory}") | |
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS, client=chroma_client) | |
collection = db.get() | |
texts = process_documents([metadata['source'] for metadata in collection['metadatas']]) | |
print(f"Creating embeddings. May take some minutes...") | |
db.add_documents(texts) | |
else: | |
# Create and store locally vectorstore | |
print("Creating new vectorstore") | |
texts = process_documents() | |
print(f"Creating embeddings. May take some minutes...") | |
db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS, client=chroma_client) | |
db.persist() | |
db = None | |
print(f"Ingestion complete!") | |
return jsonify( | |
{ | |
'Status': 'Ingestion complete!', | |
"responseCode": 200 | |
} | |
) | |
# If an error occurs | |
except Exception: | |
return jsonify( | |
{ | |
"Status": "An error occurred", | |
"responseCode": 201 | |
} | |
) | |
# Flask App setup | |
app = Flask(__name__) | |
app.register_blueprint(data_ingestion) | |
if __name__ == "__main__": | |
app.run(debug=True, host='0.0.0.0', port=8000) | |
# main() | |