""" Module for testing agent """ from time import time import os # from langchain_core.pydantic_v1 import ( # BaseModel, # ) from langchain.globals import set_llm_cache from langchain_core.callbacks.manager import ( AsyncCallbackManager, ) from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_core.runnables import RunnablePassthrough from langchain_core.callbacks.streaming_stdout import ( StreamingStdOutCallbackHandler, ) from langchain_openai.chat_models import ChatOpenAI from langchain_community.chat_message_histories import ChatMessageHistory from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser from langchain.agents.format_scratchpad import format_to_openai_functions from langchain.agents import AgentExecutor from dotenv import ( load_dotenv, find_dotenv, ) from fastapi import ( APIRouter, Path, Query ) from pydantic import BaseModel from openai_functions_and_agents import ( # create_consumable_functions, consumable_functions, # consumable_tools, ) from prompt_verified import create_agent_prompt if not load_dotenv(find_dotenv()): print("Could not load `.env` file or it is empty. Please check that it exists \ and is readable by the current user") # for caching LLM calls set_llm_cache(True) OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") CHATBOT_NAME = os.environ.get("CHATBOT_NAME") chat_router = APIRouter() class query(BaseModel): query: str = "Hello there" model = ChatOpenAI( model="gpt-3.5-turbo-1106", # model="gpt-4-0125-preview", temperature=0.7, streaming=True, callback_manager=AsyncCallbackManager( [StreamingStdOutCallbackHandler()] ), verbose=True, ).bind( functions=consumable_functions() ) base_chain = create_agent_prompt() | model | OpenAIFunctionsAgentOutputParser() agent_scratchpad_runnable = RunnablePassthrough.assign( agent_scratchpad = lambda x: format_to_openai_functions(x["intermediate_steps"]) ) agent_chain = agent_scratchpad_runnable | base_chain # Check: https://python.langchain.com/docs/modules/agents/quick_start#adding-in-memory for docs message_history = ChatMessageHistory() agent_executor = AgentExecutor( agent=agent_chain, tools=consumable_functions(return_tool=True), verbose=True, handle_parsing_errors=True, ) final_agent = RunnableWithMessageHistory( agent_executor, get_session_history=lambda session_id: message_history, input_messages_key="input", history_messages_key="chat_history", output_messages_key="output", ) @chat_router.post("/chat", summary="For querying the chatbot") async def run_final_agent( query_: query, phone_num: int = Query(None, title="The user's phone number if \ authenticated; otherwise null"), customer_name: str = Query(None, title="The name of the customer if \ authenticated, otherwise null"), session_id: str = Query(..., title="The current session ID"), ): start_time = time() response = await final_agent.ainvoke( input={ "input": query_.query, "customer_name": customer_name, "CHATBOT_NAME": CHATBOT_NAME, }, config={ "configurable": { "session_id": "session_id" } }, ) print(response) total_time = round(time()-start_time, 2) return { "status": 200, "response": response["output"], "time_taken": f"{total_time}s", } test_app = APIRouter() @test_app.get("/your_endpoint/") def your_endpoint( phone_num: str = Query(None, title="The phone number"), session_id: str = Query(None, title="The session ID"), customer_name: str = Query(None, title="The customer name") ): return { "phone_num": phone_num, "session_id": session_id, "customer_name": customer_name }