Site icon Use AI the right way

Simple Agentic RAG for Multi Vector stores with LangChain and LangGraph

When beginning with RAG and vector store creation, one question will come back soon: How can you choose the correct vector for each user in a simple way? If you have this question, then you are in the right place as we are going to give a simple solution: agentic RAG.

Introduction

In this post, you will learn:

As usual, we will be as straightforward and simple as possible so let’s get to work!

Pre-requisites

What is an Agent?

Here’s a definition of an agent:

An agent refers to a software entity that performs actions autonomously on behalf of a user or another program with some degree of intelligence and independence. 

We use the ‘intelligence’ of LLMs to plan tasks that achieve larger goals or decide whether to perform an action. The format used when using agents is the decision graph which shows all the panel of actions that can be done and the conditions:

For example, in this example, we have a graph that shows what are the panel of actions and the different decision (or condition) that needs to be done.


In this case, after the retrieve action that will get documents from the vector store, the grade_documents is a decision action where the LLM needs to evaluate if the chunks retrieved are relevant to the question. If it is, it will directly go to generate_answer to give the user its answer. If not, it will use a web search to enrich the answer.

One more ability of the agent is the capacity to integrate tools (api like web search, local function like reading a file, doing a RAG search, …) and decide when to use which in the graph.

All these allow, if the LLM is enough complex, to do some nearly autonomous actions.

What is Agentic RAG?

Here’a definition of Agentic RAG:

Agentic RAG (Retrieval-Augmented Generation) is a sophisticated approach in AI that incorporates agent-based systems to enhance the retrieval and generation processes used in responding to user queries. This method integrates intelligent agents that act autonomously to fetch and utilize information from various sources to answer questions more effectively. 

Basically the goal is to use agents to enhance the retrieval of information done by the RAG.
There are many types of agentic RAG but let’s discuss some of them:

There a lot of different types and papers are published every day but these are the mains.

What is LangGraph?

LangGraph is a library for building stateful, multi-actor applications with LLMs, used to create agent and multi-agent workflows, from the same developers as LangChain. This is one the best library (if not the best) to create agents.

Here’s the features:

Now that the presentation has been completed, we can begin the implementation.

What we want to implement

In this blog, we want a very simple agent that automatically choose which vector store to use depending of the question and the description of the vector store.

Here’s how it will work:

  1. A router (which is not displayed in the graph) which vector store to use depending on the vector store descriptions and the question
  2. It will either route to a vector store or a case not_answerable
  3. In the vector store case, chunks of data will be retrieved from the vector store to be passed to the generate
  4. In the generate, a response will be created using these chunks of data to give an answer to the user
  5. In the not_answerable case, a response will be generated explaining why the agent could not find an answer and gives advices for refining the question.

This is a very simple system with big limits but it can be very powerful because of its simplicity. There are of course other more complex solution but this will be a good introduction to agents.
Let’s implement it!

Initialize the work environment

We will use the same setup and code as the post on RAG with multiple vector (link) and so we will use Langchain, FAISS vector store and Pipenv for managing virtual env. For better readability, we will create a new folder called RAG-agent-multi-vector-store and copy inside it all the files the post on RAG with multiple vector.

mkdir RAG-agent-multi-vector-store
cp -R RAG-pipeline-multi-vector-store-langchain-app RAG-agent-multi-vector-store
cd RAG-agent-multi-vector-store

Now we need to install the pipenv and add the LangGraph library:

pipenv install langgraph

You can now check that the web app is launching with this:

pipenv run streamlit run app.py

Setup of the chains

Let’s setup all the LangChain chains that we need for our agentic RAG.

rag_template = """Answer the question based only on the following contexts:
{context}

Question: {question}
"""

router_template = """You are an expert at routing a user question to different vector stores.
There are 2 vector stores:
- clean_energy: a speech to advocates for a unified commitment to transitioning to clean energy through solar, 
wind, geothermal, and energy-efficient technologies, emphasizing the importance of community action, 
education, and innovation in creating a sustainable future.
- state_of_the_union: the State of the Union address emphasizes the resilience of the American people, 
highlights strong economic recovery efforts, pledges support for Ukraine, and calls for unity in facing domestic and global challenges.
Return the corresponding vectors store depending of the topics of the question or just not_answerable because it does't match with the vector stores.

Question: {question}
"""

not_answerable_template = """The following question cannot be answered using the following vector stores:
- clean_energy: a speech to advocates for a unified commitment to transitioning to clean energy through solar, 
wind, geothermal, and energy-efficient technologies, emphasizing the importance of community action, 
education, and innovation in creating a sustainable future.
- state_of_the_union: the State of the Union address emphasizes the resilience of the American people, 
highlights strong economic recovery efforts, pledges support for Ukraine, and calls for unity in facing domestic and global challenges.

Explain to the question writer why it is not possible to answer this question using the vector store 
and give some advices if possible to make an answerable question.

Question: {question}
"""

rag_prompt = ChatPromptTemplate.from_template(rag_template)
route_prompt = ChatPromptTemplate.from_template(router_template)
not_answerable_prompt = ChatPromptTemplate.from_template(not_answerable_template)model = ChatOpenAI(
    temperature=0,
    model_name="gpt-4o-2024-08-06",
    openai_api_key=os.environ["OPENAI_KEY"],
)
embedding = OpenAIEmbeddings(openai_api_key=os.environ["OPENAI_KEY"])

state_of_the_union = open("state_of_the_union.txt", "r").read()
state_of_the_union_vectorstore = FAISS.from_texts(
    state_of_the_union.split("\n\n"), embedding=embedding
)
state_of_the_union_retriever = state_of_the_union_vectorstore.as_retriever()

clean_energy = open("generated_clean_energy_discourse.txt", "r").read()
clean_energy_vectorstore = FAISS.from_texts(
    clean_energy.split("\n\n"), embedding=embedding
)
clean_energy_retriever = clean_energy_vectorstore.as_retriever()

rag_chain = rag_prompt | model | StrOutputParser()

structured_model_router = model.with_structured_output(RouteQuery)
question_router = route_prompt | structured_model_router

not_answerable_chain = not_answerable_prompt | model | StrOutputParser()

These are the templates that we are going to use:

Now let’s take care of the chains

class RouteQuery(BaseModel):
    """Route a user query to the most relevant vector store."""

    datasource: Literal["state_of_the_union", "clean_energy", "not_answerable"] = Field(
        ...,
        description="Given a user question choose to route it to the relevant vector store or say it is not answerable.",
    )

model = ChatOpenAI(
    temperature=0,
    model_name="gpt-4o-2024-08-06",
    openai_api_key=os.environ["OPENAI_KEY"],
)
embedding = OpenAIEmbeddings(openai_api_key=os.environ["OPENAI_KEY"])

state_of_the_union = open("state_of_the_union.txt", "r").read()
state_of_the_union_vectorstore = FAISS.from_texts(
    state_of_the_union.split("\n\n"), embedding=embedding
)
state_of_the_union_retriever = state_of_the_union_vectorstore.as_retriever()

clean_energy = open("generated_clean_energy_discourse.txt", "r").read()
clean_energy_vectorstore = FAISS.from_texts(
    clean_energy.split("\n\n"), embedding=embedding
)
clean_energy_retriever = clean_energy_vectorstore.as_retriever()

rag_chain = rag_prompt | model | StrOutputParser()

structured_model_router = model.with_structured_output(RouteQuery)
question_router = route_prompt | structured_model_router

not_answerable_chain = not_answerable_prompt | model | StrOutputParser()

Let’s see what is happening here:

Setup of the graph flow

Now let’s set up up the agentic part and the graph flow.

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
    """

    question: str
    generation: str
    documents: List[str]

This is the definition GraphState that we are going to use. A graph state is an object that will be passed to each of the task of a graph and should contain all the informations that are generated or modified by the tasks. This needs to be tailored for each graph.
In our case, it will contain the question, the generation (which is the answer of the generation tasks) and the documents (chunks retrieved from the vector stores).

Now let’s create each task of the graph.

def state_of_the_union_retrieve(state):
    """
    Retrieve documents fromt the state of the union vector store

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE STATE OF THE UNION---")
    question = state["question"]

    # Retrieval
    documents = state_of_the_union_retriever.invoke(question)
    return {"documents": documents, "question": question}


def clean_energy_retrieve(state):
    """
    Retrieve documents fromt the clean energy vector store

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, documents, that contains retrieved documents
    """
    print("---RETRIEVE CLEAN ENERGY---")
    question = state["question"]

    # Retrieval
    documents = clean_energy_retriever.invoke(question)
    return {"documents": documents, "question": question}


def generate(state):
    """
    Generate answer

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]
    documents = state["documents"]

    # RAG generation
    generation = rag_chain.invoke({"context": documents, "question": question})
    return {"documents": documents, "question": question, "generation": generation}


def not_answerable_generate(state):
    """
    Generate answer in case of not answerable decision

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GENERATE---")
    question = state["question"]

    # Not answerable generation
    generation = not_answerable_chain.invoke({"question": question})
    return {"documents": None, "question": question, "generation": generation}


de(state):
    """
    Route question to corresponding RAG.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---ROUTE QUESTION---")
    question = state["question"]
    source = question_router.invoke({"question": question})
    if source.datasource == "state_of_the_union":
        print("---ROUTE QUESTION TO STATE OF THE UNION---")
        return "state_of_the_union"
    elif source.datasource == "clean_energy":
        print("---ROUTE QUESTION TO CLEAN ENERGY---")
        return "clean_energy"
    elif source.datasource == "not_answerable":
        print("---ROUTE QUESTION TO NOT ANSWERABLE---")
        return "not_answerable"

This code chunk seems like but it is actually simple. We just create a class for each task.
Each task inherits from “state” so that they already implement the interface of a graph flow task.

All the task are pretty straightforward and just take the required inputs from the given state object and call the corresponding chains. But the routing task is actually a little different as we use a simple if … else to get the decision of the routing and return the corresponding value. This task will be used as a condition (or decision) to choose which branch to take in the graph flow.

Now we can add the tasks and create the graph flow itself:

# Define the nodes
workflow = StateGraph(GraphState)

workflow.add_node("state_of_the_union_retrieve", state_of_the_union_retrieve)
workflow.add_node("clean_energy_retrieve", clean_energy_retrieve)
workflow.add_node("generate", generate)
workflow.add_node("not_answerable_generate", not_answerable_generate)

# Build graph
workflow.add_conditional_edges(
    START,
    route_question,
    {
        "state_of_the_union": "state_of_the_union_retrieve",
        "clean_energy": "clean_energy_retrieve",
        "not_answerable": "not_answerable_generate",
    },
)

workflow.add_edge("state_of_the_union_retrieve", "generate")
workflow.add_edge("clean_energy_retrieve", "generate")
workflow.add_edge("generate", END)
workflow.add_edge("not_answerable_generate", END)

# Compile
graph = workflow.compile()

Let’s explain what is happening here:

Here’s the look of our graph if everything works

Integration of the graph flow in the app

Let’s integrate the graph in our streamlit app.

st.image(graph.get_graph(xray=True).draw_mermaid_png())

question = st.text_input("Input your question for the uploaded document")
inputs = {"question": question}

if question:
    result = None
    for output in graph.stream(inputs):
        st.write(output)

Let’s explain this code:

First look of our app

Now let’s finally launch our application with the following command:

pipenv run streamlit run app.py

You should have the following visual:

Pretty nice right ?! Now let’s ask a question:

The application just printed the output of the 2 tasks or nodes the got trough: “clean_energy_retrieve” and “generate”. This means the agent took the decision that our question could be answered by the clean_energy vector store.

Now let’s try another question that cannot be answered:

Here, we see that the application got through only “not_answerable_generate” because it took the decision that our vector stores could not answer the question and tried to give advices on better formulate the question.

Pretty powerful right ?!

Conclusion

You just saw a simple but powerful example on how to use agents in RAG. But there are limits and also possible improvements:

So there are lots of limits and possible improvements but this example is very simple so it is a good start to the world of agentic. Welcome!

Afterwards

I hope this tutorial helped you and taught you many things. I will update this post with more nuggets from time to time. Don’t forget to check my other post as I write a lot of cool posts on practical stuff in AI.

Cheers !

Exit mobile version