Stage 06 — Evaluation, Deployment & Production

Evaluation, Deployment & Production  ·  Comprehensive Technical Training  ·  ⏱ 10–14 hours

Learning Objectives

By the end of this stage you will be able to:

  • Define and measure evaluation metrics that map to real business outcomes
  • Evaluate RAG pipelines using MRR, nDCG, and LLM-as-Judge patterns
  • Build a structured evaluation harness with golden test sets and Pydantic
  • Apply the five-step MLOps process to productionize any AI solution
  • Deploy fine-tuned LLMs to serverless cloud infrastructure using Modal
  • Monitor training runs in real time with Weights & Biases
  • Reduce cold-start latency with persistent volume caching in Modal
  • Instrument a production AI system with observability and retry logic

Section 1: The Evaluation Mindset

Evaluation is arguably the most important skill an AI engineer can develop — and the most overlooked. Without a clear metric you can measure, you cannot know which model is better, whether a RAG improvement actually helped, or when a fine-tuned model is ready to ship. Evaluation comes before everything else in a mature AI workflow.

Technical Metrics vs. Business Outcomes

There are two layers to every evaluation problem. Technical metrics like perplexity, BLEU, or embedding cosine similarity are fast to compute but often disconnected from what users actually care about. Business outcome metrics — task completion rate, answer acceptance rate, reduction in support tickets — are what actually matter, but they require real traffic to measure and are slow to iterate on.

The skill is in bridging these two layers: finding cheap proxy metrics that reliably predict the expensive business outcomes you care about. If you pick the wrong proxy, you'll optimize hard for something that doesn't move the needle. Pick the right one and you get rapid iteration cycles that still track to production performance.

Choosing What to Measure

Start by asking: what does it look like when this system fails? For a Q&A bot, failure is a wrong answer or a hallucination. For a code generator, it's syntactically broken or logically incorrect output. For a RAG pipeline, it's retrieving irrelevant chunks or failing to retrieve the right one at all. Each failure mode points to a different evaluation strategy.

System TypePrimary FailureProxy MetricBusiness Metric
Q&A / ChatWrong answersLLM-as-Judge scoreUser thumbs-up rate
RAG PipelineWrong documents retrievedMRR / nDCGQuery resolution rate
Code GeneratorCode doesn't runPass@k on unit testsDev time saved
Fine-Tuned ClassifierWrong categoryValidation accuracy / F1Error rate in downstream system

Building a Golden Test Set

Every production AI system needs a curated test set — a fixed collection of inputs with known correct outputs. This is your evaluation ground truth. Build it by sampling real user queries, having subject matter experts label the correct answers, and versioning it in source control. Never modify the test set without creating a new version; otherwise you lose the ability to compare runs over time.

import json
from pathlib import Path

# golden_test_set.json — your frozen evaluation benchmark
TEST_SET = [
    {
        "query": "What is the difference between RAG and fine-tuning?",
        "expected_topics": ["retrieval", "embedding", "parameter update", "training data"],
        "min_quality_score": 4  # out of 5, from LLM-as-Judge
    },
    {
        "query": "How do I reduce hallucinations in my LLM application?",
        "expected_topics": ["grounding", "retrieval", "temperature", "system prompt"],
        "min_quality_score": 4
    },
    # ... 50-200 more examples for a robust benchmark
]

def save_test_set(test_set: list, path: Path):
    with open(path, "w") as f:
        json.dump(test_set, f, indent=2)

def load_test_set(path: Path) -> list:
    with open(path) as f:
        return json.load(f)

Section 2: RAG Evaluation — MRR, nDCG, and LLM-as-Judge

RAG systems have two distinct components to evaluate: retrieval quality (did we get the right documents?) and answer quality (did the LLM produce a good response given those documents?). Most teams only evaluate the second, which means they miss retrieval failures entirely.

Retrieval Metrics: MRR and nDCG

Mean Reciprocal Rank (MRR) measures how high the first relevant result appears in your ranked list. If the correct document is returned first, MRR = 1.0. If it's returned second, MRR = 0.5. If third, 0.33. Average this across all queries and you get a single number that tells you how good your retrieval is.

Normalized Discounted Cumulative Gain (nDCG) goes further by giving partial credit to graded relevance — a highly relevant document is worth more than a somewhat relevant one, and documents returned higher in the list are worth more than ones buried at position 10. nDCG scores close to 1.0 indicate excellent retrieval.

def mean_reciprocal_rank(results: list[list], relevant: list[set]) -> float:
    """
    results: list of ranked document IDs per query
    relevant: set of relevant document IDs per query
    """
    rr_scores = []
    for ranked_docs, rel_docs in zip(results, relevant):
        rr = 0.0
        for rank, doc_id in enumerate(ranked_docs, start=1):
            if doc_id in rel_docs:
                rr = 1.0 / rank
                break
        rr_scores.append(rr)
    return sum(rr_scores) / len(rr_scores)


def ndcg_at_k(results: list[list], relevance_scores: list[dict], k: int = 10) -> float:
    """
    relevance_scores: dict mapping doc_id → relevance grade (0, 1, 2, 3)
    """
    import math

    def dcg(ranked_docs, rel_scores, k):
        score = 0.0
        for i, doc in enumerate(ranked_docs[:k], start=1):
            grade = rel_scores.get(doc, 0)
            score += grade / math.log2(i + 1)
        return score

    ndcg_scores = []
    for ranked_docs, rel_scores in zip(results, relevance_scores):
        actual_dcg = dcg(ranked_docs, rel_scores, k)
        ideal_ranked = sorted(rel_scores.keys(), key=lambda d: rel_scores[d], reverse=True)
        ideal_dcg = dcg(ideal_ranked, rel_scores, k)
        ndcg_scores.append(actual_dcg / ideal_dcg if ideal_dcg > 0 else 0.0)
    return sum(ndcg_scores) / len(ndcg_scores)


# Example usage
queries = ["What is RAG?", "How does fine-tuning work?"]
results = [
    ["doc_3", "doc_1", "doc_7", "doc_2"],  # retrieved for query 1
    ["doc_5", "doc_9", "doc_3", "doc_1"],  # retrieved for query 2
]
relevant = [{"doc_1", "doc_3"}, {"doc_5"}]

mrr = mean_reciprocal_rank(results, relevant)
print(f"MRR: {mrr:.3f}")

Answer Quality: LLM-as-Judge

Once you have good retrieval, you need to evaluate whether the LLM's answer is actually correct and well-grounded. The most practical approach at scale is using a separate LLM — often GPT-4o or Claude — to act as a judge. The judge receives the query, the retrieved context, and the generated answer, then scores the answer on criteria like faithfulness, relevance, and completeness.

from openai import OpenAI
from pydantic import BaseModel

client = OpenAI()

JUDGE_PROMPT = """You are an expert evaluator for AI-generated answers.

Given:
- USER QUERY: {query}
- RETRIEVED CONTEXT: {context}
- GENERATED ANSWER: {answer}

Score the answer on these criteria (each 1–5):
1. Faithfulness: Is the answer grounded in the retrieved context?
2. Relevance: Does the answer directly address the query?
3. Completeness: Does it cover all key points from the context?
4. Clarity: Is it clearly written and well-structured?

Return JSON with keys: faithfulness, relevance, completeness, clarity, reasoning"""

class JudgeScore(BaseModel):
    faithfulness: int
    relevance: int
    completeness: int
    clarity: int
    reasoning: str

def llm_judge(query: str, context: str, answer: str) -> JudgeScore:
    response = client.beta.chat.completions.parse(
        model="gpt-4o",
        messages=[{
            "role": "user",
            "content": JUDGE_PROMPT.format(
                query=query, context=context, answer=answer
            )
        }],
        response_format=JudgeScore,
    )
    return response.choices[0].message.parsed

# Example
score = llm_judge(
    query="What is the capital of France?",
    context="France is a country in Western Europe. Its capital city is Paris, which is also the largest city.",
    answer="The capital of France is Paris."
)
print(f"Faithfulness: {score.faithfulness}/5")
print(f"Overall avg: {(score.faithfulness + score.relevance + score.completeness + score.clarity) / 4:.1f}/5")

Running a Full RAG Evaluation

Wire these together into a repeatable evaluation loop. Each time you change your chunking strategy, swap embedding models, or tweak your prompt, run the full eval suite and compare the numbers. Iterating this way, teams routinely move MRR from 0.73 to 0.91+ over the course of a few experiments — measurable, real improvements that directly translate to better user experience.

import json
from dataclasses import dataclass

@dataclass
class EvalResult:
    query: str
    mrr: float
    judge_score: float
    retrieved_correct: bool

def run_rag_eval(rag_pipeline, test_set: list) -> dict:
    results = []
    for example in test_set:
        query = example["query"]
        expected_docs = set(example["relevant_doc_ids"])

        # Get RAG output
        rag_result = rag_pipeline.query(query)
        retrieved_ids = [doc.id for doc in rag_result.documents]
        answer = rag_result.answer
        context = "\n\n".join(doc.content for doc in rag_result.documents[:3])

        # Retrieval metric
        rr = 0.0
        for rank, doc_id in enumerate(retrieved_ids, 1):
            if doc_id in expected_docs:
                rr = 1.0 / rank
                break

        # Answer quality
        score = llm_judge(query, context, answer)
        avg_judge = (score.faithfulness + score.relevance + score.completeness + score.clarity) / 4

        results.append(EvalResult(
            query=query,
            mrr=rr,
            judge_score=avg_judge,
            retrieved_correct=(rr > 0)
        ))

    return {
        "mrr": sum(r.mrr for r in results) / len(results),
        "avg_judge_score": sum(r.judge_score for r in results) / len(results),
        "retrieval_accuracy": sum(r.retrieved_correct for r in results) / len(results),
        "n_queries": len(results),
    }

# baseline = run_rag_eval(baseline_pipeline, TEST_SET)
# improved = run_rag_eval(improved_pipeline, TEST_SET)
# print(f"MRR: {baseline['mrr']:.3f} → {improved['mrr']:.3f}")

Section 3: The Five-Step MLOps Process

Production AI engineering is not just about training good models — it's about building repeatable processes that let you continuously improve them. The five-step MLOps process gives you a structured workflow that applies to any AI problem, from a simple classifier to a multi-agent system.

  1. Define the problem and collect data. Start with the business objective. What does success look like? Who labels the training data and how? Poor data quality at this stage compounds through every subsequent step.
  2. Build baseline models. Before training anything, get a simple baseline working — a rule-based system, a small traditional ML model (logistic regression, XGBoost), or just calling a frontier LLM with a basic prompt. This gives you a reference point and often reveals that the problem is simpler than expected.
  3. Evaluate systematically. Build your golden test set now, before you start iterating on models. Evaluate your baseline against it. This is the most skipped step and the most costly skip.
  4. Fine-tune or improve. Now that you have a baseline score, you have a target to beat. Fine-tune frontier models, adapt open-source models with QLoRA, or improve your prompting and retrieval. Run evaluation after every significant change.
  5. Deploy and monitor. Ship the best model to production with appropriate monitoring. Track the same metrics you optimized in evaluation. When performance drifts, you know exactly where to look and how to measure an improvement.
# Pseudocode: five-step MLOps loop
class AIProjectWorkflow:

    def step1_define_and_collect(self):
        """Define success metric, collect and label training/eval data."""
        self.test_set = load_test_set("golden_test_set.json")
        self.train_set = load_training_data("training_data.jsonl")

    def step2_build_baseline(self):
        """Establish baseline — simplest thing that could work."""
        self.baseline_pipeline = SimpleRAGPipeline(model="gpt-4o-mini")
        self.baseline_score = run_rag_eval(self.baseline_pipeline, self.test_set)
        print(f"Baseline MRR: {self.baseline_score['mrr']:.3f}")

    def step3_evaluate(self):
        """Systematic eval against frozen test set."""
        return run_rag_eval(self.current_pipeline, self.test_set)

    def step4_improve(self):
        """Iterate: better chunking, embedding model, fine-tuning, etc."""
        self.current_pipeline = ImprovedRAGPipeline(
            chunk_size=512, overlap=64,
            embedding_model="text-embedding-3-large",
            reranker="cross-encoder"
        )
        score = self.step3_evaluate()
        print(f"Improved MRR: {score['mrr']:.3f}")

    def step5_deploy(self):
        """Deploy best-performing pipeline with monitoring."""
        deploy_to_production(self.current_pipeline)
        setup_monitoring(metrics=["mrr", "judge_score", "latency_p95"])

Section 4: Monitoring Fine-Tuning with Weights & Biases

When you're fine-tuning any model — a frontier model via the OpenAI API or an open-source model locally with QLoRA — you need visibility into what's happening during training. Weights & Biases (W&B) is the standard tool for this. It tracks training loss, validation loss, learning rate schedules, and any custom metrics you define, all in a shareable web dashboard.

Setting Up W&B Integration

pip install wandb transformers trl peft bitsandbytes
import wandb
from transformers import TrainingArguments
from trl import SFTTrainer

# Initialize W&B run
wandb.init(
    project="llm-fine-tuning",
    name="llama-3.2-qlora-v1",
    config={
        "model": "meta-llama/Llama-3.2-3B-Instruct",
        "lora_r": 8,
        "lora_alpha": 16,
        "learning_rate": 2e-4,
        "batch_size": 4,
        "num_epochs": 3,
    }
)

training_args = TrainingArguments(
    output_dir="./results",
    num_train_epochs=3,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=2,
    learning_rate=2e-4,
    warmup_steps=100,
    logging_steps=10,
    evaluation_strategy="steps",
    eval_steps=100,
    save_steps=500,
    report_to="wandb",  # send all metrics to W&B
    run_name="llama-3.2-qlora-v1",
)

Key Metrics to Monitor

During training, watch for these signals in your W&B dashboard:

  • Training loss should decrease steadily. A loss that stops decreasing after a few hundred steps is a sign your learning rate is too low or your data is exhausted.
  • Validation loss tells you whether you're overfitting. If training loss keeps dropping but validation loss levels off or starts rising, you're memorizing instead of learning.
  • Learning rate curve — with a warmup + cosine decay schedule, you should see the LR ramp up then smoothly decrease. Sudden spikes often indicate gradient instability.
  • Gradient norm — if this is exploding (very large values), add gradient clipping: max_grad_norm=0.3.
# Custom metric logging during training
class WandbCallback(transformers.TrainerCallback):
    def on_evaluate(self, args, state, control, metrics, **kwargs):
        # Run our custom eval on top of default HF metrics
        custom_score = run_domain_specific_eval()
        wandb.log({
            "custom/eval_score": custom_score,
            "custom/epoch": state.epoch,
        })

# Add to trainer
trainer = SFTTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    callbacks=[WandbCallback()],
)
trainer.train()

When Fine-Tuning Doesn't Help

Fine-tuning a frontier model can sometimes make it worse, not better. This sounds counterintuitive but it's a real phenomenon. If your training data doesn't perfectly match the distribution of your evaluation queries, the model can lose some of its general-purpose capability in exchange for very local pattern matching. Common fixes: use more diverse training data, reduce the number of training steps, add a small amount of general instruction-following data to the mix, or switch to QLoRA fine-tuning of an open-source model where you have more control over the base.


Section 5: Serverless Deployment with Modal

Once you have a model worth running in production — whether it's a fine-tuned open-source model or a custom inference pipeline — you need somewhere to run it. Modal is a serverless AI platform that lets you define your infrastructure in Python code. You describe the GPU you need, the packages to install, and the function to run, and Modal handles provisioning, scaling, and cold-start optimization automatically.

Deploying a Fine-Tuned Model

pip install modal
import modal

# Define the cloud environment
image = (
    modal.Image.debian_slim()
    .pip_install(
        "transformers==4.40.0",
        "torch==2.2.0",
        "peft==0.10.0",
        "accelerate==0.29.0",
        "huggingface_hub",
    )
)

# Create persistent storage so model weights survive container restarts
volume = modal.Volume.from_name("model-weights", create_if_missing=True)

app = modal.App("pricer-service")

@app.cls(
    gpu="T4",           # request a T4 GPU
    image=image,
    volumes={"/cache": volume},   # mount persistent volume
    timeout=600,
)
class PricerService:
    @modal.enter()
    def load_model(self):
        """Runs once when container starts — cached in volume after first run."""
        from transformers import AutoModelForCausalLM, AutoTokenizer
        from peft import PeftModel
        import torch

        base_model_name = "meta-llama/Llama-3.2-3B-Instruct"
        lora_adapter_path = "/cache/my-fine-tuned-adapter"

        self.tokenizer = AutoTokenizer.from_pretrained(
            base_model_name, cache_dir="/cache/hf"
        )
        base_model = AutoModelForCausalLM.from_pretrained(
            base_model_name,
            torch_dtype=torch.float16,
            device_map="auto",
            cache_dir="/cache/hf",
        )
        self.model = PeftModel.from_pretrained(base_model, lora_adapter_path)
        self.model.eval()

    @modal.method()
    def predict(self, text: str) -> str:
        inputs = self.tokenizer(text, return_tensors="pt").to("cuda")
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=256,
                temperature=0.1,
                do_sample=True,
            )
        return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

Deploying and Calling the Service

# Deploy to Modal (run once, then it stays live)
# modal deploy pricer_service.py

# Call the deployed service from Python
import modal

# Get a handle to the deployed class
PricerService = modal.Cls.from_name("pricer-service", "PricerService")
pricer = PricerService()

# Call it — runs on a T4 in the cloud, returns locally
result = pricer.predict.remote("Quadcast HyperX Condenser Microphone, barely used")
print(result)

Reducing Cold-Start Latency

The biggest frustration with serverless GPU inference is the cold start. When a container goes to sleep and then receives a new request, it has to restart and reload the model — which can take 30–90 seconds for a 3B parameter model. Three techniques help:

  • Persistent volumes — storing the model weights in a Modal volume means the container doesn't re-download from HuggingFace on every start. This alone cuts startup from 90s to ~30s.
  • Minimum containers — tell Modal to keep at least one container always warm with min_containers=1. You pay for idle time but eliminating cold starts is often worth it for production traffic.
  • Container keep-alive — extend the idle timeout from Modal's default 60 seconds to something longer: scaler_cfg=modal.ScalerConfig(max_idle_secs=600). Good for bursty traffic patterns where requests cluster in time.
@app.cls(
    gpu="T4",
    image=image,
    volumes={"/cache": volume},
    timeout=600,
    min_containers=1,       # always keep one warm
    scaler_cfg=modal.ScalerConfig(max_idle_secs=600),  # 10-minute idle window
)
class PricerServiceProd:
    # ... same as above
    pass

Section 6: Production Observability

Shipping a model is not the end of the process — it's the beginning of a monitoring problem. In production, your model will encounter queries it was never trained on, exhibit performance drift as the world changes, and occasionally fail in ways that were impossible to predict from your test set. Observability gives you visibility into these failure modes before users do.

Logging Every Request

import time
import json
import logging
from functools import wraps
from datetime import datetime

logger = logging.getLogger("ai_production")

def log_inference(func):
    """Decorator that logs every model call with latency and token counts."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = time.time()
        request_id = f"req_{int(start * 1000)}"
        try:
            result = func(*args, **kwargs)
            latency_ms = (time.time() - start) * 1000
            logger.info(json.dumps({
                "request_id": request_id,
                "timestamp": datetime.utcnow().isoformat(),
                "status": "success",
                "latency_ms": round(latency_ms, 1),
                "model": kwargs.get("model", "unknown"),
            }))
            return result
        except Exception as e:
            latency_ms = (time.time() - start) * 1000
            logger.error(json.dumps({
                "request_id": request_id,
                "status": "error",
                "error": str(e),
                "latency_ms": round(latency_ms, 1),
            }))
            raise
    return wrapper

@log_inference
def call_model(prompt: str, model: str = "gpt-4o-mini") -> str:
    from openai import OpenAI
    client = OpenAI()
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

Production Retry Logic

APIs fail. Rate limits hit. Networks drop. Production code needs retry logic with exponential backoff so that transient errors don't become user-visible failures.

import time
import random
from openai import OpenAI, RateLimitError, APITimeoutError

client = OpenAI()

def call_with_retry(
    prompt: str,
    model: str = "gpt-4o",
    max_retries: int = 4,
    base_delay: float = 1.0,
) -> str:
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                timeout=30.0,
            )
            return response.choices[0].message.content

        except RateLimitError:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Rate limited — retrying in {delay:.1f}s (attempt {attempt + 1})")
            time.sleep(delay)

        except APITimeoutError:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt)
            print(f"Timeout — retrying in {delay:.1f}s (attempt {attempt + 1})")
            time.sleep(delay)

    raise RuntimeError("Max retries exceeded")

Alerting on Performance Drift

Once you have logging in place, you can periodically re-run your evaluation harness against a sample of real production traffic. If your LLM-as-Judge scores drop significantly from your deployment baseline, that's a signal that something has changed — the world, the model, or the queries. Set up a weekly cron job that runs your eval suite and posts results to Slack or email.

import schedule
import time

def weekly_eval_job():
    """Run every Monday morning against last week's sampled traffic."""
    sampled_queries = fetch_production_sample(days=7, n=100)
    score = run_rag_eval(production_pipeline, sampled_queries)

    baseline_mrr = 0.87  # what you measured at deploy time
    if score["mrr"] < baseline_mrr * 0.95:  # 5% degradation threshold
        send_alert(
            subject="⚠️ RAG performance degradation detected",
            body=f"MRR dropped to {score['mrr']:.3f} (baseline: {baseline_mrr:.3f}). Investigate."
        )
    else:
        print(f"Weekly eval OK — MRR: {score['mrr']:.3f}")

schedule.every().monday.at("08:00").do(weekly_eval_job)

What's Next

You've completed the AI Foundations & Engineering course. The next step is Applied & Agentic AI — where everything comes together: you'll build multi-agent systems that reason, plan, and act autonomously, deploy agentic pipelines with real tools and memory, and design production architectures using LangGraph, CrewAI, and the Model Context Protocol (MCP).

Lock In Founding Member Access

Get full access to every course on TechNodeX — AI, cybersecurity, Python, and everything we build next. $9/month, price locked forever.

Become a Founding Member →