Simple domain specific Corrective RAG with LangChain and LangGraph

If you are using RAG in your use cases, at some point, you will see that most of the answers are not domain specific but only depend on your vector stores. In this post, we are going to see a solution to add domain specific vocabulary into you RAG using agents with LangChain and LangGraph: a domain specific Corrective RAG.

CRAG representation

Introduction

In this post, you will learn:

  • What is domain specific data?
  • What is CRAG?
  • Implement a domain specific CRAG with LangChain and LangGraph

As usual, we will be as straightforward and simple as possible so let’s get to work!

Pre-requisites

  • A functional OpenAI account with credentials ready
  • A functional Python environment
  • You should have read the post on Agentic RAG (link) and the post on RAG with multiple vectors (link). The first article will be very important as it contains all the explanation on agents and the code we are going to use here.
  • You can find all the relevant code here.

What is domain specific vocabulary?

Here’s a definition:

Domain specific vocabulary refers to the set of specialized words, phrases, and jargon used within a particular field or domain of knowledge, such as medicine, law, or technology. It includes the precise language and definitions that professionals in that domain use to communicate effectively and accurately.

If we wanted to give a simpler definition, it would be “the language used in the domain”. The deeper one goes inside a field, the more vocabulary he learn and the more it is difficult to understand him if we do not have the same level of understanding. At the same time, if they use a tool, they would want an answer that uses the same language and vocabulary that they use.

RAG and Domain specific vocabulary

Domain specific vocabulary and RAG should go hands in hand but it is not the case at all because:

  • A vector store is just a database that gives you the data that is the most similar to the input you give using embeddings.
  • The embedding model is a generalist model and does not know all the vocabulary of a field and cannot deduce the similarity between certain terms of doesn’t know them at all.
  • The LLM used gives a layer if inference that use the retrieved data to give a well formulated answer. But it does not have the vocabulary to give a domain specific answer.

All this means that to have a truly powerful RAG system, you need the capacity to get and improve the queries and answers using domain-specific vocabulary.

Domain specific Corrective RAG

Corrective RAG (or CRAG) is a type of Agentic RAG where we use Agents (autonomous entity powered by LLMs) to make vector store more intelligent (check out this post for more information).
The idea behind CRAG is to correct the user query using other data so that it find more relevant information in the vector stores and give better answer.

A domain specific CRAG in particular, is a way to correct the query and the answer with domain specific vocabulary. It will play the role of a translator between the vocabulary used in the vector store data and the vocabulary used by the user and so will be able to link concepts that would have been lost otherwise.

What we want to implement

Graph representation of the turorial

Here’s the graph flow of what we want to implement:

  1. We retrieve the most similar chunks of data from the vector store to the question
  2. We use a condition that checks whether the data retrieved is relevant to the question.
  3. If the retrieved data is relevant, the condition will route the flow to the generation of the answer.
  4. If not, it will route the flow to the task that gets domain specific vocabulary.
  5. Then, the rewrite_question will rewrite the question with this vocabulary and return to the start of the graph.
  6. The retrieve step will get the most similar data to this new question, with normally, better results and generate an answer.

This particular method, Domain specific CRAG, will make user question more relevant to the vector store it should use and so more relevant to the domain.
We could say that this is a RAG that is tailored to people with less knowledge of the domain, giving them the ability to still learn a great deal.

Pretty cool right !? Now let’s implement it!

Initialize the work environnement

We will use the same set up and code as the post on RAG with multiple vector (link) and so we will use Langchain, FAISS vector store and Pipenv for managing virtual env. For better readability, we will create a new folder called RAG-agent-multi-vector-store and copy inside it all the files the post on RAG with multiple vector.

mkdir domain-specific-CRAG
cp -R RAG-agent-multi-vector-store domain-specific-CRAG
cd domain-specific-CRAG

Now we need to install the pipenv and add the LangGraph library:

pipenv install langgraph

You can now check that the web app is launching with this:

pipenv run streamlit run app.py

Setup of the chains

Let’s set up all the LangChain chains that we need for our agentic RAG.

grade_template = """You are a grader assessing relevance of a retrieved document to a user question. 
    If the document contains keyword(s) or semantic meaning related to the question, grade it as relevant.
    Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.

User question: {question}

Retrieved documents: {documents}
"""

rewrite_template = """You a question re-writer that converts an input question to a better version that is upgraded
     by using the given domain specific vocabulary. Look at the input and try to reason about the underlying semantic intent / meaning.

Question: {question}

Domain specific definitions: {vocabulary}
"""

...

clean_energy_vocabulary = open("domain_specific_vocabulary.txt", "r").read()
clean_energy = open("clean_energy_domain_specific.txt", "r").read()
clean_energy_vectorstore = FAISS.from_texts(
    clean_energy.split("\n\n"), embedding=embedding
)
clean_energy_retriever = clean_energy_vectorstore.as_retriever()

...

structured_model_grader = model.with_structured_output(GradeDocuments)
grader = grade_prompt | structured_model_grader

rewriter = rewrite_prompt | model | StrOutputParser()

Here we set up the different chains we are going to use. The most interesting points comes from the grade, rewrite, the others being just the classical RAG and vector store loading.

  • grade“: this will be the chain that determines if a question is relevant to the retrieved data from the vector store and more precisely about the vocabulary. In this case, we have precised that it needs to check the vocabulary and if there is any need to add vocabulary definition.
  • rewrite“: this will be the chain that rewrite the question by adding vocabulary and definitions to have a more relevant vector search.
  • clean_energy_vocabulary“: this is the vocabulary that we are going to load from a text file and use to rewrite the question. This is really a very simple way to do it for this blog post but normally, this should come from some kind of data store.

Setup of the graph flow

Now let’s set up up the domain specific CRAG part and the graph flow.

class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        question: question
        generation: LLM generation
        documents: list of documents
        vocabulary: domain specific vocabulary definition
    """

    original_question: str
    question: str
    generation: str
    documents: List[str]
    vocabulary: str

This is the definition GraphState that we are going to use and will keep the state of the graph flow. It will contains informations on the question, the documents retrived from the vector store, the generation of the answer and finally the domain specific vocabulary.

Now let’s create each task of the graph.

class GradeDocuments(BaseModel):
    """Binary score for relevance check on retrieved documents."""

    binary_score: str = Field(
        description="Documents are relevant to the question, 'yes' or 'no'"
    )

...

def get_vocabulary(state):
    """
    Get domain specific vocabulary

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---GET_VOCABULARY---")
    question = state["question"]

    return {"vocabulary": clean_energy_vocabulary}

def rewrite_question(state):
    """
    Rewrite the question with domain specific information

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation, that contains LLM generation
    """
    print("---REWRITE QUESTION---")
    question = state["question"]
    original_question = state["original_question"]
    vocabulary = state["vocabulary"]

    # Not answerable generation
    generation = rewriter.invoke({"question": question, "vocabulary": vocabulary})

    if original_question is None:
        original_question = question

    question = generation
    return {"question": question}

def grade_question(state):
    """
    Grade question to see if there is a need to add domain specific vocabulary.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """

    print("---GRADE QUESTION---")
    question = state["question"]
    documents = state["documents"]
    grade = grader.invoke({"question": question, "documents": documents})
    if grade.binary_score == "yes":
        print("---GRADE QUESTION TO GENERATE---")
        return "yes"
    elif grade.binary_score == "no":
        print("---GRADE QUESTION TO ---")
        return "no"

The specific task for our case are the following:

  • “class GradeDocuments“: this is the format of the output the grader chain should return. In this case, it is very simple as it is binary.
  • “get_vocabulary”: this is the task that get the vocabulary. What is interesting is that this is not a chain or a retriever but just the return of a file read. You can put whatever type of python code inside a LangGraph task (or node).
  • “rewrite_question”: this is the task that will rewrite the question with the vocabulary and replace the previous one the new one.
  • “grade_question”: this is the condition that will be used to see whether the question is relevant or not and if there is not more need to rewrite the question.

Now we can add the tasks and create the graph flow itself:

# Define the nodes
workflow = StateGraph(GraphState)

workflow.add_node("clean_energy_retrieve", clean_energy_retrieve)
workflow.add_node("generate", generate)
workflow.add_node("get_vocabulary", get_vocabulary)
workflow.add_node("rewrite_question", rewrite_question)

# Build graph
workflow.add_edge(START, "clean_energy_retrieve")
workflow.add_conditional_edges(
    "clean_energy_retrieve",
    grade_question,
    {
        "yes": "generate",
        "no": "get_vocabulary",
    },
)

workflow.add_edge("get_vocabulary", "rewrite_question")
workflow.add_edge("rewrite_question", "clean_energy_retrieve")
workflow.add_edge("clean_energy_retrieve", "generate")
workflow.add_edge("generate", END)

# Compile
graph = workflow.compile()

And here’s the graph that you should have with this:

Graph representation of the turorial

What is interesting is that we have a loop over the retriever, the vocabulary fetcher and the question rewriter where, until the question is good enough, we continue to improve it. In a more sophisticated system, the vocabulary fetcher would be a dynamic system that would get interesting information from a variety of sources.

First look of our app

Now let’s finally launch our application with the following command:

pipenv run streamlit run app.py

You should have the following visual:

Now let’s ask a question:

Here’s what happened in our flow:

  1. We asked the question “how to use clean energy ?”
  2. The app retrieved the most similar chunks of data from the vector store
  3. Then it checked if the chunks are relevant to the question
  4. In this case, it was not relevant enough.
  5. So it moves to the task for getting the vocabulary.
  6. With this vocabulary, it rewrites the question.
  7. With this new question it go back to the beginning of the graph.
  8. Documents a retrieved once more from the vector store.
  9. A new relevance check is done between the new question and the documents.
  10. In this case, the check is positive.
  11. The app finally move to the generation of the answer and finish.

Pretty impressive right ? This can pretty much be the base for a very sophisticated Agentic RAG system right ?! Well done!

Conclusion

In this blog post, we saw how to create sophisticated RAG system that uses agents to add domain specific information to answers. But there are possible limits and improvements:

  • The way the domain specific information is stored and fetched is very basic here and will need a far more sophisticated system to have an enterprise level system but this is a good beginning.
  • This app takes care of people that have less domain knowledge than what is available in the vector stores. It is not made for users that are specialist of a domain and need to use either other domain data or more basic data.
  • As always, the vector store creation is kept super simple for the blog post, but in real life, this will be far more complex.
  • The loop inside the graph need a safeguard to limit the number of iteration as it could be potenialy run without stopping.

So there are lots of limits and possible improvements but this post is a very good beginning in how to use agents to add domain specific information in our RAG and make them smarter and more useful.

Afterwards

I hope this tutorial helped you and taught you many things. I will update this post with more nuggets from time to time. Don’t forget to check my other post as I write a lot of cool posts on practical stuff in AI.

Cheers !

Leave a Reply

Your email address will not be published. Required fields are marked *