Build End-to-End RAG Pipeline with Monitoring and Evaluation using Langchain, Azure AI Search, OpenAI, Langfuse, Nemo-gaurdrails, ragas | by Anurag Mishra | Apr, 2024


Langchain: Langchain is one of highly popular framework to simply building and desiging LLM based applications. It provides great abstraction and in-built functionality to develop RAG based application from scratch

Azure AI Search: Azure AI Search is search engine from Azure stack. It provides secure information retrieval at scale over user-owned content in traditional and GenAI search application

Langfuse: Langfuse is open-source monitoring and observability tool. This helps in build and improvde LLM application across lifecyle. It provides features to trace, evals, prompt management and metrics. It could easily deploy to local system or cloud services

NeMo-Guardrails: NeMo-guardrails is an open-source toolkit for easily adding programmable guardrails to LLM-based conversational system. The key guardrails includes Input guardrails, Output guardrails and dialog guardrails.

Ragas: Ragas is a framework that helps you evaluate your Retrieval Augmented Generation (RAG) pipelines. This provides key metrics that could help in assessing RAG.

Azure OpenAI Search: This services from Azure stack, provides suits of models from OpenAI as REST API access. These models includes- GPT-4, GPT-4-turbo, GPT-3.5-turbo and Embedding models series

This section covered the brief summary and core functionality of each tool/framework. To implement this blog, we assume you are aware of core working of all of them and already installed/utilized them.

In this section, we walk through each pipeline components in details and try to understand the use of the above framework/tools.

from dotenv import load_dotenv
load_dotenv()
from langchain_community.document_loaders import TextLoader
from ragas.testset.generator import TestsetGenerator
from ragas.testset.evolutions import simple, reasoning, multi_context
from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings
from langchain.chains import RetrievalQA
from langchain_core.output_parsers import StrOutputParser
from langchain.vectorstores.azuresearch import AzureSearch
from langchain.text_splitter import CharacterTextSplitter
from langfuse.callback import CallbackHandler
from azure.search.documents.indexes.models import (
FreshnessScoringFunction,
FreshnessScoringParameters,
ScoringProfile,
SearchableField,
SearchField,
SearchFieldDataType,
SimpleField,
TextWeights,
)
from tqdm import tqdm
from langfuse import Langfuse
from datasets import Dataset
from nemoguardrails import LLMRails, RailsConfig
from nemoguardrails.integrations.langchain.runnable_rails import RunnableRails
import os
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision,context_recall
from ragas.metrics.critique import SUPPORTED_ASPECTS, harmfulness
import pandas as pd
import nest_asyncio
nest_asyncio.apply()
import warnings
warnings.filterwarnings("ignore")

vector_store_address = os.getenv("AZURE_SEARCH_ENDPOIND")
vector_store_password = os.getenv("AZURE_SEARCH_PASSWORD")

We uses Earning Calls transcript of FY24 Q1 and FY24 Q2 from Microsoft. The idea is to build application that allows to perform query on these earning calls transcript.

Adding sample release date and filename as metadata to each chunks.

def split_doc(filename_):
print(f'Reading - {filename_}')
loader = TextLoader(filename_, encoding="utf-8")
documents = loader.load()
text_splitter = CharacterTextSplitter(chunk_size=2500, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
return docs

def add_metadata(data,time):
for chunk in data:
chunk.metadata['last_update'] = time
return data

msft_q1 = split_doc('MSFT_q1_2024.txt')
msft_q2 = split_doc('MSFT_q2_2024.txt')

# Adding same data with different last_update
from datetime import datetime, timedelta

q2_time = (datetime.utcnow() - timedelta(days=90)).strftime(
"%Y-%m-%dT%H:%M:%S-00:00"
)
q1_time = (datetime.utcnow() - timedelta(days=180)).strftime(
"%Y-%m-%dT%H:%M:%S-00:00"
)

msft_q1 = add_metadata(msft_q1,q1_time)
msft_q2 = add_metadata(msft_q2,q2_time)

documents = msft_q1 + msft_q2

To create index in AI Search, we need to define fields and their types. We uses OpenAI embedding model to generate vectors. Initially, we define some of common fields:-

  1. id: # of chunks
  2. content: text content of chunks
  3. content_vector: generated embeddings of content(Searchable Field)
  4. metadata: to store metadata of chunks
embeddings = AzureOpenAIEmbeddings(
azure_deployment="text-embedding-ada-002",
openai_api_version="2023-05-15",
)
embedding_function=embeddings.embed_query

fields = [
SimpleField(
name="id",
type=SearchFieldDataType.String,
key=True,
filterable=True,
),
SearchableField(
name="content",
type=SearchFieldDataType.String,
searchable=True,
),
SearchField(
name="content_vector",
type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
searchable=True,
vector_search_dimensions=len(embedding_function("Text")),
vector_search_profile_name="myHnswProfile",
),
SearchableField(
name="metadata",
type=SearchFieldDataType.String,
searchable=True,
),
# Additional field for filtering on document source
SimpleField(
name="source",
type=SearchFieldDataType.String,
filterable=True,
),
# Additional data field for last doc update
SimpleField(
name="last_update",
type=SearchFieldDataType.DateTimeOffset,
searchable=True,
filterable=True,
),
]
# Adding a custom scoring profile with a freshness function
sc_name = "scoring_profile"
sc = ScoringProfile(
name=sc_name,
text_weights=TextWeights(weights={"content": 5}),
function_aggregation="sum",
functions=[
FreshnessScoringFunction(
field_name="last_update",
boost=100,
parameters=FreshnessScoringParameters(boosting_duration="P2D"),
interpolation="linear",
)
],
)

index_name = "earning_call-scoring-profile"

vector_store: AzureSearch = AzureSearch(
azure_search_endpoint=vector_store_address,
azure_search_key=vector_store_password,
index_name=index_name,
embedding_function=embeddings.embed_query,
fields=fields,
scoring_profiles=[sc],
default_scoring_profile=sc_name,
)

vector_store.add_documents(documents=documents)

azureai_retriever = vector_store.as_retriever()

Langchain provides wrapper to intrage easliy with langfuse tool. We can initialize langfuse handle with following. We need to setup langfuse public key and private key as environment variable

langfuse_handler = CallbackHandler()

We uses GPT-4-turbo model in retrieval chain to build QA chain. To integrate it with langfuse, we need to pass callback handler along with invoke() method.

llm = AzureChatOpenAI(temperature=0,openai_api_version="2023-07-01-preview",
azure_deployment="gpt-4-1106-preview")

chain_type = 'stuff'
chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type=chain_type,
retriever=azureai_retriever,
metadata={"application_type": "question_answering"},
)

query = "How is Windows OEM revenue growth?"

chain.invoke({"query": query}, config={"callbacks": [langfuse_handler]})

So far, we have build a RAG pipeline to ask query on our earning calls data. Now we are going to add tool/framework to build e2e pipeline

As of now our RAG chain doesn’t have any mechanism to flag query that is out of topic or inappropriate and control model’s response for bias/stereotype way. We are going to use nemo-guardrails that enable our solution to flag/avoid answering question which are out of scope.

We need to create ‘config’ file and define what type of rails we are going to use. For this example, we are only going to use ‘input rails’. Langchain provides wrapper to easy integrate with existing chain to nemo-guardrails

config = RailsConfig.from_path("./config")

guardrails = RunnableRails(config=config,llm=llm)

chain_with_guardrails = guardrails | chain

chain_with_guardrails.invoke({"query": query}, config={"callbacks": [langfuse_handler]})

Having well-defined dataset is the back-bone of robust evaluation of RAG pipeline. There are primarily two ways to generate such datasets:-

  1. Human Annotations:- More robust but slow
  2. LLM as Data Generations:- Faster but may hallucinate

In ideal scenario, we could use mix of both approach. For example, We could collect manual datasets and could generate variation of questions. This way we could augment our dataset. In this example, we are going to use ragas class to generate evaluation dataset. This allows us to generate 3 type of different quries (Simple, Reasoning and Multi-hop)

gpt4_llm = AzureChatOpenAI(temperature=0,openai_api_version="2024-02-01",
azure_deployment="gpt-4")

generator_llm = gpt4_llm
critic_llm = gpt4_llm
embeddings = embeddings

generator = TestsetGenerator.from_langchain(
generator_llm,
critic_llm,
embeddings
)

%%time
testset = generator.generate_with_langchain_docs(documents, test_size=10, distributions={simple: 1},is_async=False )

testset.to_pandas().to_excel('Ground_Truth_Dataset.xlsx',index=False)#['question'].tolist()

Once we have properly define dataset and evaluator (LLM), we are going to use ragas metrics to score our RAG pipeline performance. Key metrics that we are going to use –

  1. faithfullness
  2. answer_relevency
  3. context_precision
  4. harmfullness

While calucating these scores, we are simultaneously push these metrics and result to Langfuse to monitor and obseration

metrics = [faithfulness, answer_relevancy, context_precision, harmfulness]

langfuse = Langfuse()
langfuse.auth_check()

test_dataset = pd.read_excel('Ground_Truth_Dataset.xlsx')

test_dataset = test_dataset[~test_dataset.ground_truth.isna()]

def evaluate_rag(question,ground_truth):
trace = langfuse.trace(name = "rag")
contexts = azureai_retriever.invoke(question)
trace.span(
name = "retrieval", input={'question': question}, output={'contexts': contexts}
)
answer = chain.invoke(question)['result']
trace.span(
name = "generation", input={'question': question, 'contexts': contexts}, output={'answer': answer}
)
data = {
'question': [question],
'answer': [answer],
'contexts' : [[context.page_content for context in contexts]],
'ground_truth': [row['ground_truth']]
}
dataset = Dataset.from_dict(data)
result = evaluate(
dataset,
metrics=[
context_precision,
faithfulness,
answer_relevancy,
context_recall,
harmfulness
],
llm=gpt4_llm, embeddings=embeddings
)
for m in metrics:
print(metrics)
trace.score(name=m.name, value=result[m.name])

%%time
for _,row in tqdm(test_dataset.iterrows()):
evaluate_rag(row['question'],row['ground_truth'])

Now, as we pushed our metrics and query to Langfuse, we could use langfuse UI to understand the overall system performance (i.e. avg metrics score, latency, each component’s latency, query cost etc)

This is the example of langfuse UI

Recent Articles

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here