Stage 06 — Evaluation, Deployment & Production
Evaluation, Deployment & Production · Comprehensive Technical Training · ⏱ 10–14 hours
Learning Objectives
By the end of this stage you will be able to:
- Define and measure evaluation metrics that map to real business outcomes
- Evaluate RAG pipelines using MRR, nDCG, and LLM-as-Judge patterns
- Build a structured evaluation harness with golden test sets and Pydantic
- Apply the five-step MLOps process to productionize any AI solution
- Deploy fine-tuned LLMs to serverless cloud infrastructure using Modal
- Monitor training runs in real time with Weights & Biases
- Reduce cold-start latency with persistent volume caching in Modal
- Instrument a production AI system with observability and retry logic
Section 1: The Evaluation Mindset
Evaluation is arguably the most important skill an AI engineer can develop — and the most overlooked. Without a clear metric you can measure, you cannot know which model is better, whether a RAG improvement actually helped, or when a fine-tuned model is ready to ship. Evaluation comes before everything else in a mature AI workflow.
Technical Metrics vs. Business Outcomes
There are two layers to every evaluation problem. Technical metrics like perplexity, BLEU, or embedding cosine similarity are fast to compute but often disconnected from what users actually care about. Business outcome metrics — task completion rate, answer acceptance rate, reduction in support tickets — are what actually matter, but they require real traffic to measure and are slow to iterate on.
The skill is in bridging these two layers: finding cheap proxy metrics that reliably predict the expensive business outcomes you care about. If you pick the wrong proxy, you'll optimize hard for something that doesn't move the needle. Pick the right one and you get rapid iteration cycles that still track to production performance.
Choosing What to Measure
Start by asking: what does it look like when this system fails? For a Q&A bot, failure is a wrong answer or a hallucination. For a code generator, it's syntactically broken or logically incorrect output. For a RAG pipeline, it's retrieving irrelevant chunks or failing to retrieve the right one at all. Each failure mode points to a different evaluation strategy.
| System Type | Primary Failure | Proxy Metric | Business Metric |
|---|---|---|---|
| Q&A / Chat | Wrong answers | LLM-as-Judge score | User thumbs-up rate |
| RAG Pipeline | Wrong documents retrieved | MRR / nDCG | Query resolution rate |
| Code Generator | Code doesn't run | Pass@k on unit tests | Dev time saved |
| Fine-Tuned Classifier | Wrong category | Validation accuracy / F1 | Error rate in downstream system |
Building a Golden Test Set
Every production AI system needs a curated test set — a fixed collection of inputs with known correct outputs. This is your evaluation ground truth. Build it by sampling real user queries, having subject matter experts label the correct answers, and versioning it in source control. Never modify the test set without creating a new version; otherwise you lose the ability to compare runs over time.
import json
from pathlib import Path
# golden_test_set.json — your frozen evaluation benchmark
TEST_SET = [
{
"query": "What is the difference between RAG and fine-tuning?",
"expected_topics": ["retrieval", "embedding", "parameter update", "training data"],
"min_quality_score": 4 # out of 5, from LLM-as-Judge
},
{
"query": "How do I reduce hallucinations in my LLM application?",
"expected_topics": ["grounding", "retrieval", "temperature", "system prompt"],
"min_quality_score": 4
},
# ... 50-200 more examples for a robust benchmark
]
def save_test_set(test_set: list, path: Path):
with open(path, "w") as f:
json.dump(test_set, f, indent=2)
def load_test_set(path: Path) -> list:
with open(path) as f:
return json.load(f)
Section 2: RAG Evaluation — MRR, nDCG, and LLM-as-Judge
RAG systems have two distinct components to evaluate: retrieval quality (did we get the right documents?) and answer quality (did the LLM produce a good response given those documents?). Most teams only evaluate the second, which means they miss retrieval failures entirely.
Retrieval Metrics: MRR and nDCG
Mean Reciprocal Rank (MRR) measures how high the first relevant result appears in your ranked list. If the correct document is returned first, MRR = 1.0. If it's returned second, MRR = 0.5. If third, 0.33. Average this across all queries and you get a single number that tells you how good your retrieval is.
Normalized Discounted Cumulative Gain (nDCG) goes further by giving partial credit to graded relevance — a highly relevant document is worth more than a somewhat relevant one, and documents returned higher in the list are worth more than ones buried at position 10. nDCG scores close to 1.0 indicate excellent retrieval.
def mean_reciprocal_rank(results: list[list], relevant: list[set]) -> float:
"""
results: list of ranked document IDs per query
relevant: set of relevant document IDs per query
"""
rr_scores = []
for ranked_docs, rel_docs in zip(results, relevant):
rr = 0.0
for rank, doc_id in enumerate(ranked_docs, start=1):
if doc_id in rel_docs:
rr = 1.0 / rank
break
rr_scores.append(rr)
return sum(rr_scores) / len(rr_scores)
def ndcg_at_k(results: list[list], relevance_scores: list[dict], k: int = 10) -> float:
"""
relevance_scores: dict mapping doc_id → relevance grade (0, 1, 2, 3)
"""
import math
def dcg(ranked_docs, rel_scores, k):
score = 0.0
for i, doc in enumerate(ranked_docs[:k], start=1):
grade = rel_scores.get(doc, 0)
score += grade / math.log2(i + 1)
return score
ndcg_scores = []
for ranked_docs, rel_scores in zip(results, relevance_scores):
actual_dcg = dcg(ranked_docs, rel_scores, k)
ideal_ranked = sorted(rel_scores.keys(), key=lambda d: rel_scores[d], reverse=True)
ideal_dcg = dcg(ideal_ranked, rel_scores, k)
ndcg_scores.append(actual_dcg / ideal_dcg if ideal_dcg > 0 else 0.0)
return sum(ndcg_scores) / len(ndcg_scores)
# Example usage
queries = ["What is RAG?", "How does fine-tuning work?"]
results = [
["doc_3", "doc_1", "doc_7", "doc_2"], # retrieved for query 1
["doc_5", "doc_9", "doc_3", "doc_1"], # retrieved for query 2
]
relevant = [{"doc_1", "doc_3"}, {"doc_5"}]
mrr = mean_reciprocal_rank(results, relevant)
print(f"MRR: {mrr:.3f}")
Answer Quality: LLM-as-Judge
Once you have good retrieval, you need to evaluate whether the LLM's answer is actually correct and well-grounded. The most practical approach at scale is using a separate LLM — often GPT-4o or Claude — to act as a judge. The judge receives the query, the retrieved context, and the generated answer, then scores the answer on criteria like faithfulness, relevance, and completeness.
from openai import OpenAI
from pydantic import BaseModel
client = OpenAI()
JUDGE_PROMPT = """You are an expert evaluator for AI-generated answers.
Given:
- USER QUERY: {query}
- RETRIEVED CONTEXT: {context}
- GENERATED ANSWER: {answer}
Score the answer on these criteria (each 1–5):
1. Faithfulness: Is the answer grounded in the retrieved context?
2. Relevance: Does the answer directly address the query?
3. Completeness: Does it cover all key points from the context?
4. Clarity: Is it clearly written and well-structured?
Return JSON with keys: faithfulness, relevance, completeness, clarity, reasoning"""
class JudgeScore(BaseModel):
faithfulness: int
relevance: int
completeness: int
clarity: int
reasoning: str
def llm_judge(query: str, context: str, answer: str) -> JudgeScore:
response = client.beta.chat.completions.parse(
model="gpt-4o",
messages=[{
"role": "user",
"content": JUDGE_PROMPT.format(
query=query, context=context, answer=answer
)
}],
response_format=JudgeScore,
)
return response.choices[0].message.parsed
# Example
score = llm_judge(
query="What is the capital of France?",
context="France is a country in Western Europe. Its capital city is Paris, which is also the largest city.",
answer="The capital of France is Paris."
)
print(f"Faithfulness: {score.faithfulness}/5")
print(f"Overall avg: {(score.faithfulness + score.relevance + score.completeness + score.clarity) / 4:.1f}/5")
Running a Full RAG Evaluation
Wire these together into a repeatable evaluation loop. Each time you change your chunking strategy, swap embedding models, or tweak your prompt, run the full eval suite and compare the numbers. Iterating this way, teams routinely move MRR from 0.73 to 0.91+ over the course of a few experiments — measurable, real improvements that directly translate to better user experience.
import json
from dataclasses import dataclass
@dataclass
class EvalResult:
query: str
mrr: float
judge_score: float
retrieved_correct: bool
def run_rag_eval(rag_pipeline, test_set: list) -> dict:
results = []
for example in test_set:
query = example["query"]
expected_docs = set(example["relevant_doc_ids"])
# Get RAG output
rag_result = rag_pipeline.query(query)
retrieved_ids = [doc.id for doc in rag_result.documents]
answer = rag_result.answer
context = "\n\n".join(doc.content for doc in rag_result.documents[:3])
# Retrieval metric
rr = 0.0
for rank, doc_id in enumerate(retrieved_ids, 1):
if doc_id in expected_docs:
rr = 1.0 / rank
break
# Answer quality
score = llm_judge(query, context, answer)
avg_judge = (score.faithfulness + score.relevance + score.completeness + score.clarity) / 4
results.append(EvalResult(
query=query,
mrr=rr,
judge_score=avg_judge,
retrieved_correct=(rr > 0)
))
return {
"mrr": sum(r.mrr for r in results) / len(results),
"avg_judge_score": sum(r.judge_score for r in results) / len(results),
"retrieval_accuracy": sum(r.retrieved_correct for r in results) / len(results),
"n_queries": len(results),
}
# baseline = run_rag_eval(baseline_pipeline, TEST_SET)
# improved = run_rag_eval(improved_pipeline, TEST_SET)
# print(f"MRR: {baseline['mrr']:.3f} → {improved['mrr']:.3f}")
Section 3: The Five-Step MLOps Process
Production AI engineering is not just about training good models — it's about building repeatable processes that let you continuously improve them. The five-step MLOps process gives you a structured workflow that applies to any AI problem, from a simple classifier to a multi-agent system.
- Define the problem and collect data. Start with the business objective. What does success look like? Who labels the training data and how? Poor data quality at this stage compounds through every subsequent step.
- Build baseline models. Before training anything, get a simple baseline working — a rule-based system, a small traditional ML model (logistic regression, XGBoost), or just calling a frontier LLM with a basic prompt. This gives you a reference point and often reveals that the problem is simpler than expected.
- Evaluate systematically. Build your golden test set now, before you start iterating on models. Evaluate your baseline against it. This is the most skipped step and the most costly skip.
- Fine-tune or improve. Now that you have a baseline score, you have a target to beat. Fine-tune frontier models, adapt open-source models with QLoRA, or improve your prompting and retrieval. Run evaluation after every significant change.
- Deploy and monitor. Ship the best model to production with appropriate monitoring. Track the same metrics you optimized in evaluation. When performance drifts, you know exactly where to look and how to measure an improvement.
# Pseudocode: five-step MLOps loop
class AIProjectWorkflow:
def step1_define_and_collect(self):
"""Define success metric, collect and label training/eval data."""
self.test_set = load_test_set("golden_test_set.json")
self.train_set = load_training_data("training_data.jsonl")
def step2_build_baseline(self):
"""Establish baseline — simplest thing that could work."""
self.baseline_pipeline = SimpleRAGPipeline(model="gpt-4o-mini")
self.baseline_score = run_rag_eval(self.baseline_pipeline, self.test_set)
print(f"Baseline MRR: {self.baseline_score['mrr']:.3f}")
def step3_evaluate(self):
"""Systematic eval against frozen test set."""
return run_rag_eval(self.current_pipeline, self.test_set)
def step4_improve(self):
"""Iterate: better chunking, embedding model, fine-tuning, etc."""
self.current_pipeline = ImprovedRAGPipeline(
chunk_size=512, overlap=64,
embedding_model="text-embedding-3-large",
reranker="cross-encoder"
)
score = self.step3_evaluate()
print(f"Improved MRR: {score['mrr']:.3f}")
def step5_deploy(self):
"""Deploy best-performing pipeline with monitoring."""
deploy_to_production(self.current_pipeline)
setup_monitoring(metrics=["mrr", "judge_score", "latency_p95"])
Section 4: Monitoring Fine-Tuning with Weights & Biases
When you're fine-tuning any model — a frontier model via the OpenAI API or an open-source model locally with QLoRA — you need visibility into what's happening during training. Weights & Biases (W&B) is the standard tool for this. It tracks training loss, validation loss, learning rate schedules, and any custom metrics you define, all in a shareable web dashboard.
Setting Up W&B Integration
pip install wandb transformers trl peft bitsandbytes
import wandb
from transformers import TrainingArguments
from trl import SFTTrainer
# Initialize W&B run
wandb.init(
project="llm-fine-tuning",
name="llama-3.2-qlora-v1",
config={
"model": "meta-llama/Llama-3.2-3B-Instruct",
"lora_r": 8,
"lora_alpha": 16,
"learning_rate": 2e-4,
"batch_size": 4,
"num_epochs": 3,
}
)
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=4,
gradient_accumulation_steps=2,
learning_rate=2e-4,
warmup_steps=100,
logging_steps=10,
evaluation_strategy="steps",
eval_steps=100,
save_steps=500,
report_to="wandb", # send all metrics to W&B
run_name="llama-3.2-qlora-v1",
)
Key Metrics to Monitor
During training, watch for these signals in your W&B dashboard:
- Training loss should decrease steadily. A loss that stops decreasing after a few hundred steps is a sign your learning rate is too low or your data is exhausted.
- Validation loss tells you whether you're overfitting. If training loss keeps dropping but validation loss levels off or starts rising, you're memorizing instead of learning.
- Learning rate curve — with a warmup + cosine decay schedule, you should see the LR ramp up then smoothly decrease. Sudden spikes often indicate gradient instability.
- Gradient norm — if this is exploding (very large values), add gradient clipping:
max_grad_norm=0.3.
# Custom metric logging during training
class WandbCallback(transformers.TrainerCallback):
def on_evaluate(self, args, state, control, metrics, **kwargs):
# Run our custom eval on top of default HF metrics
custom_score = run_domain_specific_eval()
wandb.log({
"custom/eval_score": custom_score,
"custom/epoch": state.epoch,
})
# Add to trainer
trainer = SFTTrainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
callbacks=[WandbCallback()],
)
trainer.train()
When Fine-Tuning Doesn't Help
Fine-tuning a frontier model can sometimes make it worse, not better. This sounds counterintuitive but it's a real phenomenon. If your training data doesn't perfectly match the distribution of your evaluation queries, the model can lose some of its general-purpose capability in exchange for very local pattern matching. Common fixes: use more diverse training data, reduce the number of training steps, add a small amount of general instruction-following data to the mix, or switch to QLoRA fine-tuning of an open-source model where you have more control over the base.
Section 5: Serverless Deployment with Modal
Once you have a model worth running in production — whether it's a fine-tuned open-source model or a custom inference pipeline — you need somewhere to run it. Modal is a serverless AI platform that lets you define your infrastructure in Python code. You describe the GPU you need, the packages to install, and the function to run, and Modal handles provisioning, scaling, and cold-start optimization automatically.
Deploying a Fine-Tuned Model
pip install modal
import modal
# Define the cloud environment
image = (
modal.Image.debian_slim()
.pip_install(
"transformers==4.40.0",
"torch==2.2.0",
"peft==0.10.0",
"accelerate==0.29.0",
"huggingface_hub",
)
)
# Create persistent storage so model weights survive container restarts
volume = modal.Volume.from_name("model-weights", create_if_missing=True)
app = modal.App("pricer-service")
@app.cls(
gpu="T4", # request a T4 GPU
image=image,
volumes={"/cache": volume}, # mount persistent volume
timeout=600,
)
class PricerService:
@modal.enter()
def load_model(self):
"""Runs once when container starts — cached in volume after first run."""
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import PeftModel
import torch
base_model_name = "meta-llama/Llama-3.2-3B-Instruct"
lora_adapter_path = "/cache/my-fine-tuned-adapter"
self.tokenizer = AutoTokenizer.from_pretrained(
base_model_name, cache_dir="/cache/hf"
)
base_model = AutoModelForCausalLM.from_pretrained(
base_model_name,
torch_dtype=torch.float16,
device_map="auto",
cache_dir="/cache/hf",
)
self.model = PeftModel.from_pretrained(base_model, lora_adapter_path)
self.model.eval()
@modal.method()
def predict(self, text: str) -> str:
inputs = self.tokenizer(text, return_tensors="pt").to("cuda")
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=256,
temperature=0.1,
do_sample=True,
)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
Deploying and Calling the Service
# Deploy to Modal (run once, then it stays live)
# modal deploy pricer_service.py
# Call the deployed service from Python
import modal
# Get a handle to the deployed class
PricerService = modal.Cls.from_name("pricer-service", "PricerService")
pricer = PricerService()
# Call it — runs on a T4 in the cloud, returns locally
result = pricer.predict.remote("Quadcast HyperX Condenser Microphone, barely used")
print(result)
Reducing Cold-Start Latency
The biggest frustration with serverless GPU inference is the cold start. When a container goes to sleep and then receives a new request, it has to restart and reload the model — which can take 30–90 seconds for a 3B parameter model. Three techniques help:
- Persistent volumes — storing the model weights in a Modal volume means the container doesn't re-download from HuggingFace on every start. This alone cuts startup from 90s to ~30s.
- Minimum containers — tell Modal to keep at least one container always warm with
min_containers=1. You pay for idle time but eliminating cold starts is often worth it for production traffic. - Container keep-alive — extend the idle timeout from Modal's default 60 seconds to something longer:
scaler_cfg=modal.ScalerConfig(max_idle_secs=600). Good for bursty traffic patterns where requests cluster in time.
@app.cls(
gpu="T4",
image=image,
volumes={"/cache": volume},
timeout=600,
min_containers=1, # always keep one warm
scaler_cfg=modal.ScalerConfig(max_idle_secs=600), # 10-minute idle window
)
class PricerServiceProd:
# ... same as above
pass
Section 6: Production Observability
Shipping a model is not the end of the process — it's the beginning of a monitoring problem. In production, your model will encounter queries it was never trained on, exhibit performance drift as the world changes, and occasionally fail in ways that were impossible to predict from your test set. Observability gives you visibility into these failure modes before users do.
Logging Every Request
import time
import json
import logging
from functools import wraps
from datetime import datetime
logger = logging.getLogger("ai_production")
def log_inference(func):
"""Decorator that logs every model call with latency and token counts."""
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
request_id = f"req_{int(start * 1000)}"
try:
result = func(*args, **kwargs)
latency_ms = (time.time() - start) * 1000
logger.info(json.dumps({
"request_id": request_id,
"timestamp": datetime.utcnow().isoformat(),
"status": "success",
"latency_ms": round(latency_ms, 1),
"model": kwargs.get("model", "unknown"),
}))
return result
except Exception as e:
latency_ms = (time.time() - start) * 1000
logger.error(json.dumps({
"request_id": request_id,
"status": "error",
"error": str(e),
"latency_ms": round(latency_ms, 1),
}))
raise
return wrapper
@log_inference
def call_model(prompt: str, model: str = "gpt-4o-mini") -> str:
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
Production Retry Logic
APIs fail. Rate limits hit. Networks drop. Production code needs retry logic with exponential backoff so that transient errors don't become user-visible failures.
import time
import random
from openai import OpenAI, RateLimitError, APITimeoutError
client = OpenAI()
def call_with_retry(
prompt: str,
model: str = "gpt-4o",
max_retries: int = 4,
base_delay: float = 1.0,
) -> str:
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
timeout=30.0,
)
return response.choices[0].message.content
except RateLimitError:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limited — retrying in {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
except APITimeoutError:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
print(f"Timeout — retrying in {delay:.1f}s (attempt {attempt + 1})")
time.sleep(delay)
raise RuntimeError("Max retries exceeded")
Alerting on Performance Drift
Once you have logging in place, you can periodically re-run your evaluation harness against a sample of real production traffic. If your LLM-as-Judge scores drop significantly from your deployment baseline, that's a signal that something has changed — the world, the model, or the queries. Set up a weekly cron job that runs your eval suite and posts results to Slack or email.
import schedule
import time
def weekly_eval_job():
"""Run every Monday morning against last week's sampled traffic."""
sampled_queries = fetch_production_sample(days=7, n=100)
score = run_rag_eval(production_pipeline, sampled_queries)
baseline_mrr = 0.87 # what you measured at deploy time
if score["mrr"] < baseline_mrr * 0.95: # 5% degradation threshold
send_alert(
subject="⚠️ RAG performance degradation detected",
body=f"MRR dropped to {score['mrr']:.3f} (baseline: {baseline_mrr:.3f}). Investigate."
)
else:
print(f"Weekly eval OK — MRR: {score['mrr']:.3f}")
schedule.every().monday.at("08:00").do(weekly_eval_job)
What's Next
You've completed the AI Foundations & Engineering course. The next step is Applied & Agentic AI — where everything comes together: you'll build multi-agent systems that reason, plan, and act autonomously, deploy agentic pipelines with real tools and memory, and design production architectures using LangGraph, CrewAI, and the Model Context Protocol (MCP).
Lock In Founding Member Access
Get full access to every course on TechNodeX — AI, cybersecurity, Python, and everything we build next. $9/month, price locked forever.
Become a Founding Member →