{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from datasets import load_dataset\n", "from IPython.display import clear_output\n", "import pandas as pd\n", "import re\n", "from dotenv import load_dotenv\n", "import os\n", "from ibm_watson_machine_learning.foundation_models.utils.enums import ModelTypes\n", "from ibm_watson_machine_learning.metanames import GenTextParamsMetaNames as GenParams\n", "from ibm_watson_machine_learning.foundation_models.utils.enums import DecodingMethods\n", "from langchain.llms import WatsonxLLM\n", "from langchain.embeddings import SentenceTransformerEmbeddings\n", "from langchain.embeddings.base import Embeddings\n", "from langchain.vectorstores.milvus import Milvus\n", "from langchain.embeddings import HuggingFaceEmbeddings # Not used in this example\n", "from dotenv import load_dotenv\n", "import os\n", "from pymilvus import Collection, utility\n", "from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility\n", "from towhee import pipe, ops\n", "import numpy as np\n", "#import langchain.chains as lc\n", "from langchain_core.retrievers import BaseRetriever\n", "from langchain_core.callbacks import CallbackManagerForRetrieverRun\n", "from langchain_core.documents import Document\n", "from pymilvus import Collection, utility\n", "from towhee import pipe, ops\n", "import numpy as np\n", "from towhee.datacollection import DataCollection\n", "from typing import List\n", "from langchain.chains import RetrievalQA\n", "from langchain.prompts import PromptTemplate\n", "from langchain.schema.runnable import RunnablePassthrough\n", "from langchain_core.retrievers import BaseRetriever\n", "from langchain_core.callbacks import CallbackManagerForRetrieverRun\n", "\n", "print_full_prompt=False\n", "\n", "## Step 1 Dataset Retrieving\n", "\n", "dataset = load_dataset(\"ruslanmv/ai-medical-chatbot\")\n", "clear_output()\n", "train_data = dataset[\"train\"]\n", "#For this demo let us choose the first 1000 dialogues\n", "\n", "df = pd.DataFrame(train_data[:1000])\n", "#df = df[[\"Patient\", \"Doctor\"]].rename(columns={\"Patient\": \"question\", \"Doctor\": \"answer\"})\n", "df = df[[\"Description\", \"Doctor\"]].rename(columns={\"Description\": \"question\", \"Doctor\": \"answer\"})\n", "# Add the 'ID' column as the first column\n", "df.insert(0, 'id', df.index)\n", "# Reset the index and drop the previous index column\n", "df = df.reset_index(drop=True)\n", "\n", "# Clean the 'question' and 'answer' columns\n", "df['question'] = df['question'].apply(lambda x: re.sub(r'\\s+', ' ', x.strip()))\n", "df['answer'] = df['answer'].apply(lambda x: re.sub(r'\\s+', ' ', x.strip()))\n", "df['question'] = df['question'].str.replace('^Q.', '', regex=True)\n", "# Assuming your DataFrame is named df\n", "max_length = 500 # Due to our enbeeding model does not allow long strings\n", "df['question'] = df['question'].str.slice(0, max_length)\n", "#To use the dataset to get answers, let's first define the dictionary:\n", "#- `id_answer`: a dictionary of id and corresponding answer\n", "id_answer = df.set_index('id')['answer'].to_dict()\n", "\n", "## Step 2 WatsonX connection\n", "\n", "load_dotenv()\n", "try:\n", " API_KEY = os.environ.get(\"API_KEY\")\n", " project_id =os.environ.get(\"PROJECT_ID\")\n", "except KeyError:\n", " API_KEY: input(\"Please enter your WML api key (hit enter): \")\n", " project_id = input(\"Please project_id (hit enter): \")\n", "\n", "credentials = {\n", " \"url\": \"https://us-south.ml.cloud.ibm.com\",\n", " \"apikey\": API_KEY \n", "} \n", "\n", "model_id = ModelTypes.GRANITE_13B_CHAT_V2\n", "\n", "\n", "parameters = {\n", " GenParams.DECODING_METHOD: DecodingMethods.GREEDY,\n", " GenParams.MIN_NEW_TOKENS: 1,\n", " GenParams.MAX_NEW_TOKENS: 500,\n", " GenParams.STOP_SEQUENCES: [\"<|endoftext|>\"]\n", "}\n", "\n", "\n", "watsonx_granite = WatsonxLLM(\n", " model_id=model_id.value,\n", " url=credentials.get(\"url\"),\n", " apikey=credentials.get(\"apikey\"),\n", " project_id=project_id,\n", " params=parameters\n", ")\n", "\n", "\n", "## Step 3 Milvus connection\n", "\n", "COLLECTION_NAME='qa_medical'\n", "load_dotenv()\n", "host_milvus = os.environ.get(\"REMOTE_SERVER\", '127.0.0.1')\n", "connections.connect(host=host_milvus, port='19530')\n", "\n", "\n", "collection = Collection(COLLECTION_NAME) \n", "collection.load(replica_number=1)\n", "utility.load_state(COLLECTION_NAME)\n", "utility.loading_progress(COLLECTION_NAME)\n", "\n", "\n", "max_input_length = 500 # Maximum length allowed by the model\n", "\n", "\n", "\n", "# Create the combined pipe for question encoding and answer retrieval\n", "combined_pipe = (\n", " pipe.input('question')\n", " .map('question', 'vec', lambda x: x[:max_input_length]) # Truncate the question if longer than 512 tokens\n", " .map('vec', 'vec', ops.text_embedding.dpr(model_name='facebook/dpr-ctx_encoder-single-nq-base'))\n", " .map('vec', 'vec', lambda x: x / np.linalg.norm(x, axis=0))\n", " .map('vec', 'res', ops.ann_search.milvus_client(host=host_milvus, port='19530', collection_name=COLLECTION_NAME, limit=1))\n", " .map('res', 'answer', lambda x: [id_answer[int(i[0])] for i in x])\n", " .output('question', 'answer')\n", ")\n", " \n", "# Step 4 Langchain Definitions\n", "\n", "class CustomRetrieverLang(BaseRetriever): \n", " def get_relevant_documents(\n", " self, query: str, *, run_manager: CallbackManagerForRetrieverRun\n", " ) -> List[Document]:\n", " # Perform the encoding and retrieval for a specific question\n", " ans = combined_pipe(query)\n", " ans = DataCollection(ans)\n", " answer=ans[0]['answer']\n", " answer_string = ' '.join(answer)\n", " return [Document(page_content=answer_string)] \n", "# Ensure correct VectorStoreRetriever usage\n", "retriever = CustomRetrieverLang()\n", "\n", "# Define the prompt template\n", "template = \"\"\"Use the following pieces of context to answer the question at the end. \n", "If you don't know the answer, just say that you don't know, don't try to make up an answer. \n", "Use three sentences maximum and keep the answer as concise as possible. \n", "Always say \"thanks for asking!\" at the end of the answer. \n", "{context}\n", "Question: {question}\n", "Helpful Answer:\"\"\"\n", "rag_prompt = PromptTemplate.from_template(template)\n", "rag_chain = (\n", " {\"context\": retriever, \"question\": RunnablePassthrough()}\n", " | rag_prompt\n", " | watsonx_granite\n", ")\n", "\n", "prompt = \"I have started to get lots of acne on my face, particularly on my forehead what can I do\"\n", "\n", "if print_full_prompt:\n", " # Get the retrieved context\n", " context = retriever.get_relevant_documents(prompt)\n", " print(\"Retrieved context:\")\n", " for doc in context:\n", " print(doc)\n", " # Construct the full prompt\n", " full_prompt = rag_prompt.format(context=context, question=prompt)\n", " print(\"Full prompt:\", full_prompt)\n", "\n", "print(rag_chain.invoke(prompt)) \n", "\n", "import towhee\n", "def chat(message, history):\n", " history = history or []\n", " response = rag_chain.invoke(message)\n", " history.append((message, response))\n", " return history, history\n", "\n", "import gradio\n", "collection.load()\n", "chatbot = gradio.Chatbot()\n", "interface = gradio.Interface(\n", " chat,\n", " [\"text\", \"state\"],\n", " [chatbot, \"state\"],\n", " allow_flagging=\"never\",\n", ")\n", "#interface.launch(inline=True, share=False) #For the notebook\n", "interface.launch(server_name=\"0.0.0.0\",server_port=7860)" ] } ], "metadata": { "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 2 }