Productionizing a RAG App with Prefect, Weave, and RAGAS | by Ed Izaguirre | Aug, 2024

Published:


Adding evaluation, automated data pulling, and other improvements.

Towards Data Science

12 min read

15 hours ago

From Film Search to Rosebud 🌹. Image from Unsplash.

Table of Contents

  1. Introduction
  2. Offline Evaluation
  3. Online Evaluation
  4. Automated Data Pulling with Prefect
  5. Summary

Relevant Links

A few months ago, I released the Film Search app, a Retrieval-Augmented Generation (RAG) application designed to recommend films based on user queries. For example, a user may ask: “Find me drama movies in English that are less than 2 hours long and feature dogs.” and receive a recommendation like:

Title of Film: Hachi: A Dog’s Tale

Runtime: 93 minutes
Release Year: 2009
Streaming: Not available for streaming
This film tells the poignant true story of Hachiko, an Akita dog known for his remarkable loyalty to his owner. The emotional depth and the themes of friendship and loyalty resonate strongly, making it a touching drama that showcases the profound bond between humans and dogs. It’s perfect for anyone looking for a heartfelt story that highlights the importance of companionship.

This was not just a simple RAG app, however. It included what is known as self-querying retrieval. This means that the bot takes the user’s query and transforms it by adding metadata filters. This ensures any documents pulled into the chat model’s context respects the constraints set by the user’s query. For more information, I recommend checking out my earlier article linked above.

Unfortunately, there were some issues with the app:

  • There was no offline evaluation done, besides passing the ‘eye test’. This test is necessary, but not sufficient.
  • Observability was non-existent. If a query went poorly, you had to manually pull up the project and run some ad-hoc scripts in an attempt to see what went wrong.
  • The Pinecone vector database had to be pulled manually. This meant the documents would quickly be out of date if, say, a film got pulled from a streaming service.

In this article, I will briefly cover some of the improvements made to the Film Search app. This will cover:

  • Offline Evaluation using RAGAS and Weave
  • Online Evaluation and Observability
  • Automated Data Pulling using Prefect

One thing before we jump in: I found the name Film Search to be a bit generic, so I rebranded the app as Rosebud 🌹, hence the image shown above. Real film geeks will understand the reference.

It is important to be able to judge if a change made to your LLM application improves or degrades its performance. Unfortunately, evaluation of LLM apps is a difficult and novel space. There is simply not much agreement on what constitutes a good evaluation.

For Rosebud 🌹, I decided to tackle what is known as the RAG triad. This approach is promoted by TruLens, a platform to evaluate and track LLM applications.

The RAG Triad. Image by author.

The triad covers three aspects of a RAG app:

  • Context Relevancy: When a query is made by the user, documents fill the context of the chat model. Is the retrieved context actually useful? If not, you may need to tweak things like document embedding, chunking, or metadata filtering.
  • Faithfulness: Is the model’s response actually grounded in the retrieved documents? You don’t want the model making up facts; the whole point of RAG is to help reduce hallucinations by using retrieved documents.
  • Answer Relevancy: Does the model’s response actually answer the user’s query? If the user asks for “Comedy films made in the 1990s?”, the model’s answer better contain only comedy films made in the 1990s.

There are a few ways to attempt to assess these three functions of a RAG app. One way would be to use human expert evaluators. Unfortunately, this would be expensive and wouldn’t scale. For Rosebud 🌹 I decided to use LLMs-as-a-judges. This means using a chat model to look at each of the three criteria above and assigning a score of 0 to 1 for each. This method has the advantage of being cheap and scaling well. To accomplish this, I used RAGAS, a popular framework that helps you evaluate your RAG applications. The RAGAS framework includes the three metrics mentioned above and makes it fairly easy to use them to evaluate your apps. Below is a code snippet demonstrating how I conducted this offline evaluation:

from ragas import evaluate
from ragas.metrics import AnswerRelevancy, ContextRelevancy, Faithfulness
import weave

@weave.op()
def evaluate_with_ragas(query, model_output):
# Put data into a Dataset object
data = {
"question": [query],
"contexts": [[model_output['context']]],
"answer": [model_output['answer']]
}
dataset = Dataset.from_dict(data)

# Define metrics to judge
metrics = [
AnswerRelevancy(),
ContextRelevancy(),
Faithfulness(),
]

judge_model = ChatOpenAI(model=config['JUDGE_MODEL_NAME'])
embeddings_model = OpenAIEmbeddings(model=config['EMBEDDING_MODEL_NAME'])

evaluation = evaluate(dataset=dataset, metrics=metrics, llm=judge_model, embeddings=embeddings_model)

return {
"answer_relevancy": float(evaluation['answer_relevancy']),
"context_relevancy": float(evaluation['context_relevancy']),
"faithfulness": float(evaluation['faithfulness']),
}

def run_evaluation():
# Initialize chat model
model = rosebud_chat_model()

# Define evaluation questions
questions = [
{"query": "Suggest a good movie based on a book."}, # Adaptations
{"query": "Suggest a film for a cozy night in."}, # Mood-Based
{"query": "What are some must-watch horror movies?"}, # Genre-Specific
...
# Total of 20 questions
]

# Create Weave Evaluation object
evaluation = weave.Evaluation(dataset=questions, scorers=[evaluate_with_ragas])

# Run the evaluation
asyncio.run(evaluation.evaluate(model))

if __name__ == "__main__":
weave.init('film-search')
run_evaluation()

A few notes:

  • With twenty questions and three criteria to judge across, you’re looking at sixty LLM calls for a single evaluation! It gets even worse though; with the rosebud_chat_model , there are two calls for every query: one to construct the metadata filter and another to provide the answer, so really this is 120 calls for a single eval! All models used my evaluation are the new gpt-4o-mini , which I strongly recommend. In my experience the calls cost $0.05 per evaluation.
  • Note that we are using asyncio.run to run the evals. It is ideal to use asynchronous calls because you don’t want to evaluate each question sequentially one after the other. Instead, with asyncio we can begin evaluating other questions as we wait for previous I/O operations to finish.
  • There are a total of twenty questions for a single evaluation. These span a variety of typical film queries a user may ask. I mostly came up with these myself, but in practice it would be better to use queries actually asked by users in production.
  • Notice the weave.init and the @weave.op decorator that are being used. These are part of the new Weave library from Weights & Biases (W&B). Weave is a complement to the traditional W&B library, with a focus on LLM applications. It allows you to capture inputs and outputs of LLMs by using a the simple @weave.op decorator. It also allows you to capture the results of evaluations using weave.Evaluation(…) . By integrating RAGAS to perform evaluations and Weave to capture and log them, we get a powerful duo that helps GenAI developers iteratively improve their applications. You also get to log the model latency, cost, and more.
Example of Weave + RAGAS integration. Image by author.

In theory, one can now tweak a hyperparameter (e.g. temperature), re-run the evaluation, and see if the adjustment has a positive or negative impact. Unfortunately, in practice I found the LLM judging to be finicky, and I am not the only one. LLM judges seem to be fairly bad at using a floating point value to assess these metrics. Instead, it appears they seem to do better at classification e.g. a thumbs up/thumbs down. RAGAS doesn’t yet support LLM judges performing classification. Writing it by hand doesn’t seem too difficult, and perhaps in a future update I may attempt this myself.

Offline evaluation is good for seeing how tweaking hyperparameters affects performance, but in my opinion online evaluation is far more useful. In Rosebud 🌹 I have now incorporated the use of 👍/👎 buttons at the bottom of every response to provide feedback.

Example of online feedback. Image by author.

When a user clicks on either button they are told that their feedback was logged. Below is a snippet of how this was accomplished in the Streamlit interface:

def start_log_feedback(feedback):
print("Logging feedback.")
st.session_state.feedback_given = True
st.session_state.sentiment = feedback
thread = threading.Thread(target=log_feedback, args=(st.session_state.sentiment,
st.session_state.query,
st.session_state.query_constructor,
st.session_state.context,
st.session_state.response))
thread.start()

def log_feedback(sentiment, query, query_constructor, context, response):
ct = datetime.datetime.now()
wandb.init(project="film-search",
name=f"query: {ct}")
table = wandb.Table(columns=["sentiment", "query", "query_constructor", "context", "response"])
table.add_data(sentiment,
query,
query_constructor,
context,
response
)
wandb.log({"Query Log": table})
wandb.finish()

Note that the process of sending the feedback to W&B runs on a separate thread rather than on the main thread. This is to prevent the user from getting stuck for a few seconds waiting for the logging to complete.

A W&B table is used to store the feedback. Five quantities are logged in the table:

  • Sentiment: Whether the user clicked thumbs up or thumbs down
  • Query: The user’s query, e.g. Find me drama movies in English that are less than 2 hours long and feature dogs.
  • Query_Constructor: The results of the query constructor, which rewrites the user’s query and includes metadata filtering if necessary, e.g.
{
"query": "drama English dogs",
"filter": {
"operator": "and",
"arguments": [
{
"comparator": "eq", "attribute": "Genre", "value": "Drama"
},
{
"comparator": "eq", "attribute": "Language", "value": "English"
},

{
"comparator": "lt", "attribute": "Runtime (minutes)", "value": 120
}
]
},
}

  • Context: The retrieved context based on the reconstructed query, e.g. Title: Hachi: A Dog’s Tale. Overview: A drama based on the true story of a college professor’s…
  • Response: The model’s response

All of this is logged conveniently in the same project as the Weave evaluations shown earlier. Now, when a query goes south it is as simple as hitting the thumbs down button to see exactly what happened. This will allow much faster iteration and improvement of the Rosebud 🌹 recommendation application.

Image showing observability of the model’s response. Note on the left-hand side how it is seamless to transition between W&B and Weave. Image by author.

To ensure recommendations from Rosebud 🌹 continue to stay accurate it was important to automate the process of pulling data and uploading them to Pinecone. For this task, I chose Prefect. Prefect is a popular workflow orchestration tool. I was looking for something lightweight, easy to learn, and Pythonic. I found all of this in Prefect.

Automated flow for pulling and updating Pinecone vector store provided by Prefect. Image by author.

Prefect offers a variety of ways to schedule your workflows. I decided to use the push work pools with automatic infrastructure provisioning. I found that this setup balances simplicity with configurability. It allows the user to task Prefect with automatically provisioning all of the infrastructure needed to run your flow in your cloud provider of choice. I chose to deploy on Azure, but deploying on GCP or AWS only requires changing a few lines of code. Refer to the pinecone_flow.py file for more details. A simplified flow is provided below:

@task
def start():
"""
Start-up: check everything works or fail fast!
"""

# Print out some debug info
print("Starting flow!")

# Ensure user has set the appropriate env variables
assert os.environ['LANGCHAIN_API_KEY']
assert os.environ['OPENAI_API_KEY']
...

@task(retries=3, retry_delay_seconds=[1, 10, 100])
def pull_data_to_csv(config):
TMBD_API_KEY = os.getenv('TMBD_API_KEY')
YEARS = range(config["years"][0], config["years"][-1] + 1)
CSV_HEADER = ['Title', 'Runtime (minutes)', 'Language', 'Overview', ...]

for year in YEARS:
# Grab list of ids for all films made in {YEAR}
movie_list = list(set(get_id_list(TMBD_API_KEY, year)))

FILE_NAME = f'./data/{year}_movie_collection_data.csv'

# Creating file
with open(FILE_NAME, 'w') as f:
writer = csv.writer(f)
writer.writerow(CSV_HEADER)

...

print("Successfully pulled data from TMDB and created csv files in data/")

@task
def convert_csv_to_docs():
# Loading in data from all csv files
loader = DirectoryLoader(
...
show_progress=True)

docs = loader.load()

metadata_field_info = [
AttributeInfo(name="Title",
description="The title of the movie", type="string"),
AttributeInfo(name="Runtime (minutes)",
description="The runtime of the movie in minutes", type="integer"),
...
]

def convert_to_list(doc, field):
if field in doc.metadata and doc.metadata[field] is not None:
doc.metadata[field] = [item.strip()
for item in doc.metadata[field].split(',')]

...

fields_to_convert_list = ['Genre', 'Actors', 'Directors',
'Production Companies', 'Stream', 'Buy', 'Rent']
...

# Set 'overview' and 'keywords' as 'page_content' and other fields as 'metadata'
for doc in docs:
# Parse the page_content string into a dictionary
page_content_dict = dict(line.split(": ", 1)
for line in doc.page_content.split("\n") if ": " in line)

doc.page_content = (
'Title: ' + page_content_dict.get('Title') +
'. Overview: ' + page_content_dict.get('Overview') +
...
)

...

print("Successfully took csv files and created docs")

return docs

@task
def upload_docs_to_pinecone(docs, config):
# Create empty index
PINECONE_KEY, PINECONE_INDEX_NAME = os.getenv(
'PINECONE_API_KEY'), os.getenv('PINECONE_INDEX_NAME')

pc = Pinecone(api_key=PINECONE_KEY)

# Target index and check status
pc_index = pc.Index(PINECONE_INDEX_NAME)
print(pc_index.describe_index_stats())

embeddings = OpenAIEmbeddings(model=config['EMBEDDING_MODEL_NAME'])
namespace = "film_search_prod"

PineconeVectorStore.from_documents(
docs,
...
)

print("Successfully uploaded docs to Pinecone vector store")

@task
def publish_dataset_to_weave(docs):
# Initialize Weave
weave.init('film-search')

rows = []
for doc in docs:
row = {
'Title': doc.metadata.get('Title'),
'Runtime (minutes)': doc.metadata.get('Runtime (minutes)'),
...
}
rows.append(row)

dataset = Dataset(name='Movie Collection', rows=rows)
weave.publish(dataset)
print("Successfully published dataset to Weave")

@flow(log_prints=True)
def pinecone_flow():
with open('./config.json') as f:
config = json.load(f)

start()
pull_data_to_csv(config)
docs = convert_csv_to_docs()
upload_docs_to_pinecone(docs, config)
publish_dataset_to_weave(docs)

if __name__ == "__main__":
pinecone_flow.deploy(
name="pinecone-flow-deployment",
work_pool_name="my-aci-pool",
cron="0 0 * * 0",
image=DeploymentImage(
name="prefect-flows:latest",
platform="linux/amd64",
)
)

Notice how simple it is to turn Python functions into a Prefect flow. All you need are some sub-functions styled with @task decorators and a @flow decorator on the main function. Also note that after uploading the documents to Pinecone, the last step of our flow publishes the dataset to Weave. This is important for reproducibility purposes.

At the bottom of the script we see how deployment is done in Prefect.

  • We need to provide a name for the deployment. This is arbitrary.
  • We also need to specify a work_pool_name . Push work pools in Prefect automatically send tasks to serverless computers without needing a middleman. This name needs to match the name used to create the pool, which we’ll see below.
  • You also need to specify a cron , which is short for chronograph. This allows you to specify how often to repeat a workflow. The value “0 0 * * 0” means repeat this workflow every week. Check out this website for details on how the cron syntax works.
  • Finally, you need to specify a DeploymentImage . Here you specify both a name and a platform . The name is arbitrary, but the platform is not. Since I want to deploy to Azure compute instances, and these instances run Linux, it’s important I specify that in the DeploymentImage .

To deploy this flow on Azure using the CLI, run the following commands:

prefect work-pool create --type azure-container-instance:push --provision-infra my-aci-pool
prefect deployment run 'get_repo_info/my-deployment'

These commands will automatically provision all of the necessary infrastructure on Azure. This includes an Azure Container Registry (ACR) that will hold a Docker image containing all files in your directory as well as any necessary libraries listed in a requirements.txt . It will also include an Azure Container Instance (ACI) Identity that will have permissions necessary to deploy a container with the aforementioned Docker image. Finally, the deployment run command will schedule the code to be run every week. You can check the Prefect dashboard to see your flow get run:

Image of a flow in Prefect being successfully run. Image by author.

By updating my Pinecone vector store weekly, I can ensure that the recommendations from Rosebud 🌹 remain accurate.

In this article, I discussed my experience improving the Rosebud 🌹 app. This included the process of incorporating offline and online evaluation, as well as automating the update of my Pinecone vector store.

Some other improvements not mentioned in this article:

  • Including ratings from The Movie Database in the film data. You can now ask for “highly rated films” and the chat model will filter for films above a 7/10.
  • Upgraded chat models. Now the query and summary models are using gpt-4o-mini . Recall that the LLM judge model is also using gpt-4o-mini .
  • Embedding model upgraded to text-embedding-3-small from text-embedding-ada-002 .
  • Years now span 1950–2023, instead of starting at 1920. Film data from 1920–1950 was not high quality, and only messed up recommendations.
  • UI is cleaner, with all details regarding the project relegated to a sidebar.
  • Vastly improved documentation on GitHub.
  • Bug fixes.

As mentioned at the top of the article, the app is now 100% free to use! I will foot the bill for queries for the foreseeable future (hence the choice of gpt-4o-mini instead of the more expensive gpt-4o). I really want to get the experience of running an app in production, and having my readers test out Rosebud 🌹 is a great way to do this. In the unlikely event that the app really blows up, I will have to come up with some other model of funding. But that would a great problem to have.

Enjoy discovering awesome films! 🎥

Related Updates

Recent Updates