xtrade_bot / archived /ingest_data.py
Josh-Ola's picture
Upload folder using huggingface_hub
65976bc verified
raw
history blame contribute delete
No virus
7.28 kB
import os
import glob
from typing import List
from dotenv import load_dotenv
from multiprocessing import Pool
from tqdm import tqdm
from flask import Flask, jsonify, Blueprint
from langchain.document_loaders import (
CSVLoader,
EverNoteLoader,
PyMuPDFLoader,
TextLoader,
UnstructuredEmailLoader,
UnstructuredEPubLoader,
UnstructuredHTMLLoader,
UnstructuredMarkdownLoader,
UnstructuredODTLoader,
UnstructuredPowerPointLoader,
UnstructuredWordDocumentLoader,
JSONLoader
)
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import Chroma
from langchain.embeddings import OllamaEmbeddings
from langchain.docstore.document import Document
#if not load_dotenv():
if not load_dotenv(".env"):
print("Could not load .env file. It either does not exists or it is empty.")
exit(1)
from constants import CHROMA_SETTINGS
import chromadb
data_ingestion = Blueprint('ingest', __name__)
# Load environment variables
persist_directory = os.environ.get("PERSIST_DIRECTORY", "db")
source_directory = os.environ.get('SOURCE_DIRECTORY', "data")
embeddings_model_name = os.environ.get('EMBEDDINGS_MODEL_NAME')
chunk_size = 200
chunk_overlap = 30
# Custom document loaders
class MyElmLoader(UnstructuredEmailLoader):
"""Wrapper to fallback to text/plain when default does not work"""
def load(self) -> List[Document]:
"""Wrapper adding fallback for elm without html"""
try:
try:
doc = UnstructuredEmailLoader.load(self)
except ValueError as e:
if 'text/html content not found in email' in str(e):
# Try plain text
self.unstructured_kwargs["content_source"]="text/plain"
doc = UnstructuredEmailLoader.load(self)
else:
raise
except Exception as e:
# Add file_path to exception message
raise type(e)(f"{self.file_path}: {e}") from e
return doc
# Map file extensions to document loaders and their arguments
LOADER_MAPPING = {
".csv": (CSVLoader, {}),
# ".docx": (Docx2txtLoader, {}),
".doc": (UnstructuredWordDocumentLoader, {}),
".docx": (UnstructuredWordDocumentLoader, {}),
".enex": (EverNoteLoader, {}),
".eml": (MyElmLoader, {}),
".epub": (UnstructuredEPubLoader, {}),
".html": (UnstructuredHTMLLoader, {}),
".md": (UnstructuredMarkdownLoader, {}),
".odt": (UnstructuredODTLoader, {}),
".pdf": (PyMuPDFLoader, {}),
".ppt": (UnstructuredPowerPointLoader, {}),
".pptx": (UnstructuredPowerPointLoader, {}),
".txt": (TextLoader, {"encoding": "utf8"}),
".json": (JSONLoader, {"jq_schema": ".", "text_content": False})
# Add more mappings for other file extensions and loaders as needed
}
def load_single_document(file_path: str) -> List[Document]:
ext = "." + file_path.rsplit(".", 1)[-1].lower()
print(file_path)
if ext in LOADER_MAPPING:
loader_class, loader_args = LOADER_MAPPING[ext]
loader = loader_class(file_path, **loader_args)
return loader.load()
raise ValueError(f"Unsupported file extension '{ext}'")
def load_documents(source_dir: str, ignored_files: List[str] = []) -> List[Document]:
"""
Loads all documents from the source documents directory, ignoring specified files
"""
all_files = []
for ext in LOADER_MAPPING:
all_files.extend(
glob.glob(os.path.join(source_dir, f"**/*{ext.lower()}"), recursive=True)
)
all_files.extend(
glob.glob(os.path.join(source_dir, f"**/*{ext.upper()}"), recursive=True)
)
filtered_files = [file_path for file_path in all_files if file_path not in ignored_files]
with Pool(processes=os.cpu_count()) as pool:
results = []
with tqdm(total=len(filtered_files), desc='Loading new documents', ncols=80) as pbar:
for i, docs in enumerate(pool.imap_unordered(load_single_document, filtered_files)):
results.extend(docs)
pbar.update()
return results
def process_documents(ignored_files: List[str] = []) -> List[Document]:
"""
Load documents and split in chunks
"""
print(f"Loading documents from {source_directory}")
documents = load_documents(source_directory, ignored_files)
if not documents:
print("No new documents to load")
exit(0)
print(f"Loaded {len(documents)} new documents from {source_directory}")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
texts = text_splitter.split_documents(documents)
print(f"Split into {len(texts)} chunks of text (max. {chunk_size} tokens each)")
return texts
def does_vectorstore_exist(persist_directory: str, embeddings: OllamaEmbeddings) -> bool:
"""
Checks if vectorstore exists
"""
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS)
if not db.get()['documents']:
return False
return True
@data_ingestion.route("/home", methods=["GET"])
@data_ingestion.route("/")
def base():
return jsonify(
{
"msgcode": 1,
"status": "success",
"message": "Welcome to the data ingestion system"
}
), 200
@data_ingestion.route("/ingest-data", methods=["POST"])
def main():
try:
# Create embeddings
embeddings = OllamaEmbeddings(model=embeddings_model_name)
# Chroma client
print("Here ")
chroma_client = chromadb.PersistentClient(settings=CHROMA_SETTINGS , path=persist_directory)
if does_vectorstore_exist(persist_directory, embeddings):
# Update and store locally vectorstore
print(f"Appending to existing vectorstore at {persist_directory}")
db = Chroma(persist_directory=persist_directory, embedding_function=embeddings, client_settings=CHROMA_SETTINGS, client=chroma_client)
collection = db.get()
texts = process_documents([metadata['source'] for metadata in collection['metadatas']])
print(f"Creating embeddings. May take some minutes...")
db.add_documents(texts)
else:
# Create and store locally vectorstore
print("Creating new vectorstore")
texts = process_documents()
print(f"Creating embeddings. May take some minutes...")
db = Chroma.from_documents(texts, embeddings, persist_directory=persist_directory, client_settings=CHROMA_SETTINGS, client=chroma_client)
db.persist()
db = None
print(f"Ingestion complete!")
return jsonify(
{
'Status': 'Ingestion complete!',
"responseCode": 200
}
)
# If an error occurs
except Exception:
return jsonify(
{
"Status": "An error occurred",
"responseCode": 201
}
)
# Flask App setup
app = Flask(__name__)
app.register_blueprint(data_ingestion)
if __name__ == "__main__":
app.run(debug=True, host='0.0.0.0', port=8000)
# main()