from pydantic import BaseModel from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed import re import asyncio import gradio as gr import os import spaces from dotenv import load_dotenv from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import urllib3 import time import random urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) app = FastAPI() load_dotenv() HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") global_data = { 'models': {}, 'tokens': { 'eos': 'eos_token', 'pad': 'pad_token', 'padding': 'padding_token', 'unk': 'unk_token', 'bos': 'bos_token', 'sep': 'sep_token', 'cls': 'cls_token', 'mask': 'mask_token' } } model_configs = [ {"repo_id": "Ffftdtd5dtft/gpt2-xl-Q2_K-GGUF", "filename": "gpt2-xl-q2_k.gguf", "name": "GPT-2 XL"}, {"repo_id": "Ffftdtd5dtft/gemma-2-27b-Q2_K-GGUF", "filename": "gemma-2-27b-q2_k.gguf", "name": "Gemma 2-27B"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-Q2_K-GGUF", "filename": "phi-3-mini-128k-instruct-q2_k.gguf", "name": "Phi-3 Mini 128K Instruct"}, {"repo_id": "Ffftdtd5dtft/starcoder2-3b-Q2_K-GGUF", "filename": "starcoder2-3b-q2_k.gguf", "name": "Starcoder2 3B"}, {"repo_id": "Ffftdtd5dtft/Qwen2-1.5B-Instruct-Q2_K-GGUF", "filename": "qwen2-1.5b-instruct-q2_k.gguf", "name": "Qwen2 1.5B Instruct"}, {"repo_id": "Ffftdtd5dtft/Mistral-Nemo-Instruct-2407-Q2_K-GGUF", "filename": "mistral-nemo-instruct-2407-q2_k.gguf", "name": "Mistral Nemo Instruct 2407"}, {"repo_id": "Ffftdtd5dtft/Phi-3-mini-128k-instruct-IQ2_XXS-GGUF", "filename": "phi-3-mini-128k-instruct-iq2_xxs-imat.gguf", "name": "Phi 3 Mini 128K Instruct XXS"}, {"repo_id": "Ffftdtd5dtft/TinyLlama-1.1B-Chat-v1.0-IQ1_S-GGUF", "filename": "tinyllama-1.1b-chat-v1.0-iq1_s-imat.gguf", "name": "TinyLlama 1.1B Chat"}, {"repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-8B-Q2_K-GGUF", "filename": "meta-llama-3.1-8b-q2_k.gguf", "name": "Meta Llama 3.1-8B"}, {"repo_id": "Ffftdtd5dtft/codegemma-2b-IQ1_S-GGUF", "filename": "codegemma-2b-iq1_s-imat.gguf", "name": "Codegemma 2B"}, ] class ModelManager: def __init__(self): self.models = {} def load_model(self, model_config): if model_config['name'] not in self.models: try: self.models[model_config['name']] = Llama.from_pretrained(repo_id=model_config['repo_id'], filename=model_config['filename'], use_auth_token=HUGGINGFACE_TOKEN) except Exception as e: print(f"Error loading model {model_config['name']}: {e}") pass # Add pass to handle exceptions during model loading def load_all_models(self): with ThreadPoolExecutor() as executor: for config in model_configs: executor.submit(self.load_model, config) return self.models model_manager = ModelManager() global_data['models'] = model_manager.load_all_models() class ChatRequest(BaseModel): message: str def normalize_input(input_text): return input_text.strip() def remove_duplicates(text): text = re.sub(r'(Hello there, how are you\? \[/INST\]){2,}', 'Hello there, how are you? [/INST]', text) text = re.sub(r'(How are you\? \[/INST\]){2,}', 'How are you? [/INST]', text) text = text.replace('[/INST]', '') lines = text.split('\n') unique_lines = [] seen_lines = set() for line in lines: if line not in seen_lines: unique_lines.append(line) seen_lines.add(line) return '\n'.join(unique_lines) @spaces.GPU(queue=False, idle_timeout=0, timeout=0) def generate_model_response(model, inputs): try: response = model(inputs) return remove_duplicates(response['choices'][0]['text']) except Exception as e: if "You have exceeded your GPU quota" in str(e): time.sleep(random.uniform(1, 3)) try: response = model(inputs) return remove_duplicates(response['choices'][0]['text']) except Exception as e2: print(f"Error generating model response (after retry): {e2}") pass # Add pass to handle exceptions during retry return "" else: print(f"Error generating model response: {e}") pass # Add pass to handle other exceptions return "" def remove_repetitive_responses(responses): unique_responses = {} for response in responses: if response['model'] not in unique_responses: unique_responses[response['model']] = response['response'] return unique_responses async def process_message(message): inputs = normalize_input(message) with ThreadPoolExecutor() as executor: futures = [ executor.submit(generate_model_response, model, inputs) for model in global_data['models'].values() ] responses = [{'model': model_name, 'response': future.result()} for model_name, future in zip(global_data['models'].keys(), as_completed(futures))] unique_responses = remove_repetitive_responses(responses) formatted_response = "" for model, response in unique_responses.items(): formatted_response += f"**{model}:**\n{response}\n\n" return formatted_response @app.post("/generate_multimodel") async def api_generate_multimodel(request: Request): while True: try: data = await request.json() message = data["message"] formatted_response = await process_message(message) return JSONResponse({"response": formatted_response}) except Exception as e: print(f"Error in API request handling: {e}") pass # Add pass to handle exceptions in API request handling time.sleep(0) iface = gr.Interface( fn=process_message, inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), outputs=gr.Markdown(), title="Multi-Model LLM API", description="Enter a message and get responses from multiple LLMs.", ) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) iface.launch(server_port=port)