Solo Unicorn Club logoSolo Unicorn
2,640 words

Error Handling for AI Agents — The Framework Nobody Talks About

AI AgentError HandlingRetryFallbackFault ToleranceProduction Systems
Error Handling for AI Agents — The Framework Nobody Talks About

Error Handling for AI Agents — The Framework Nobody Talks About

Opening

During my Agent system's first week in production, roughly 12% of requests failed. The reasons were all over the map: API rate limits, malformed model responses, external tool timeouts, JSON parsing failures. The most absurd one: the model returned a Markdown table when my code expected JSON, and the whole thing crashed. Two months later, the same system's error rate was down to 1.8%. I didn't switch to a better model. I didn't rewrite the business logic. I just added a systematic error handling framework. This article breaks down that framework.

The Problem

Errors in Agent systems are fundamentally different from traditional software. Traditional software errors are deterministic — the same input always produces the same error. Agent system errors are probabilistic — the same prompt might return perfect JSON on the first try, free-form text on the second, and trigger a content filter rejection on the third.

This means the traditional error handling playbook (catch exception, return error code) isn't enough. You need:

  1. Classification: Know what type of error it is (retryable vs. non-retryable)
  2. Retry: Retry strategically (not brute-force loops)
  3. Fallback: Have backup plans when retries fail
  4. Circuit Breaking: Pause calls during consecutive failures to prevent cascading
  5. Monitoring: Know where and how often your system fails

Core Framework: Error Classification + Three Layers of Defense

Error Classification

Start by categorizing Agent system errors into four types:

from enum import Enum

class ErrorCategory(Enum):
    # Retryable transient errors
    TRANSIENT = "transient"       # API rate limit, network timeout, 503
    # Retryable format errors (a different prompt might fix it)
    FORMAT = "format"             # JSON parse failure, unexpected format
    # Non-retryable business errors
    BUSINESS = "business"         # Model refuses to answer, content filter
    # Non-retryable system errors
    FATAL = "fatal"               # Invalid API key, model doesn't exist, insufficient balance

def classify_error(error: Exception) -> ErrorCategory:
    """Error classifier"""
    error_msg = str(error).lower()

    # Rate limit and timeout: retryable
    if any(keyword in error_msg for keyword in ["rate_limit", "429", "timeout", "503", "502"]):
        return ErrorCategory.TRANSIENT

    # JSON parse errors: retryable with a different approach
    if any(keyword in error_msg for keyword in ["json", "parse", "decode", "validation"]):
        return ErrorCategory.FORMAT

    # Content moderation rejection: non-retryable
    if any(keyword in error_msg for keyword in ["content_filter", "moderation", "refused"]):
        return ErrorCategory.BUSINESS

    # Authentication/quota errors: fatal
    if any(keyword in error_msg for keyword in ["auth", "401", "403", "insufficient_quota"]):
        return ErrorCategory.FATAL

    # Default to transient
    return ErrorCategory.TRANSIENT

Layer 1: Smart Retry

Not a simple retry-N-times loop, but different strategies based on error type.

import asyncio
import random
from functools import wraps

class RetryConfig:
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
                 max_delay: float = 30.0, jitter: bool = True):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter

# Retry policies by error type
RETRY_POLICIES = {
    ErrorCategory.TRANSIENT: RetryConfig(
        max_retries=3,
        base_delay=2.0,    # Exponential backoff: 2s -> 4s -> 8s
        max_delay=30.0,
    ),
    ErrorCategory.FORMAT: RetryConfig(
        max_retries=2,
        base_delay=0.5,    # Format errors don't need long waits
        max_delay=2.0,
    ),
    ErrorCategory.BUSINESS: RetryConfig(max_retries=0),  # No retry
    ErrorCategory.FATAL: RetryConfig(max_retries=0),      # No retry
}

async def smart_retry(func, *args, **kwargs):
    """Smart retry: select strategy based on error type"""
    last_error = None

    for attempt in range(4):  # Up to 4 attempts (1 original + 3 retries)
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            last_error = e
            category = classify_error(e)
            policy = RETRY_POLICIES[category]

            if attempt >= policy.max_retries:
                break  # Exceeded max retries for this error type

            # Calculate wait time (exponential backoff + random jitter)
            delay = min(
                policy.base_delay * (2 ** attempt),
                policy.max_delay
            )
            if policy.jitter:
                delay *= (0.5 + random.random())  # 50%-150% jitter

            # Format errors: modify request parameters before retrying
            if category == ErrorCategory.FORMAT:
                kwargs = _fix_format_request(kwargs, e)

            await asyncio.sleep(delay)

    raise last_error

def _fix_format_request(kwargs: dict, error: Exception) -> dict:
    """Auto-fix for format errors: adjust request parameters"""
    # Strategy 1: If JSON parsing failed, emphasize output format in the prompt
    if "json" in str(error).lower():
        messages = kwargs.get("messages", [])
        if messages:
            messages[-1]["content"] += "\n\nIMPORTANT: You must output valid JSON. Do not add any Markdown formatting or extra text."
    # Strategy 2: Lower temperature for more deterministic output
    kwargs["temperature"] = max(0, kwargs.get("temperature", 0.7) - 0.3)
    return kwargs

Layer 2: Fallback Chain

When all retries fail, activate the fallback plan.

from dataclasses import dataclass
from typing import Callable, Optional

@dataclass
class FallbackOption:
    name: str
    func: Callable
    quality_level: str   # "full" | "degraded" | "minimal"
    max_latency_ms: int

class FallbackChain:
    """Fallback chain: try alternatives in priority order"""

    def __init__(self, options: list[FallbackOption]):
        self.options = options  # Sorted by priority

    async def execute(self, *args, **kwargs) -> dict:
        errors = []
        for option in self.options:
            try:
                result = await asyncio.wait_for(
                    option.func(*args, **kwargs),
                    timeout=option.max_latency_ms / 1000
                )
                return {
                    "result": result,
                    "quality": option.quality_level,
                    "fallback_used": option.name,
                }
            except Exception as e:
                errors.append(f"{option.name}: {e}")
                continue

        # All options failed
        return {
            "result": "Sorry, the system is temporarily unable to process your request. Please try again later.",
            "quality": "failed",
            "errors": errors,
        }

# Example: Fallback chain for a Q&A Agent
qa_fallback = FallbackChain([
    FallbackOption(
        name="claude_sonnet",
        func=lambda q, ctx: call_llm("claude-sonnet-4-5", q, ctx),
        quality_level="full",
        max_latency_ms=10000,
    ),
    FallbackOption(
        name="gpt4.1",
        func=lambda q, ctx: call_llm("gpt-4.1", q, ctx),
        quality_level="full",
        max_latency_ms=8000,
    ),
    FallbackOption(
        name="gpt4.1_mini",
        func=lambda q, ctx: call_llm("gpt-4.1-mini", q, ctx),
        quality_level="degraded",
        max_latency_ms=5000,
    ),
    FallbackOption(
        name="cache_lookup",
        func=lambda q, ctx: search_cached_answers(q),
        quality_level="minimal",
        max_latency_ms=1000,
    ),
])

Key design insight: The fallback chain isn't just about swapping models. The last fallback searches cached answers for similar questions — no API calls at all, ultra-low latency. The answer might not be perfect, but it's infinitely better than returning an error.

Layer 3: Circuit Breaker

Pause calls during consecutive failures to: 1) stop wasting money; 2) reduce upstream pressure; 3) prevent blocking subsequent requests.

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"       # Normal operation
    OPEN = "open"           # Tripped — reject all requests
    HALF_OPEN = "half_open" # Tentatively allow a few requests through

class CircuitBreaker:
    """Circuit breaker: automatically trips on consecutive failures"""

    def __init__(self, failure_threshold: int = 5,
                 recovery_timeout: int = 60,
                 success_threshold: int = 3):
        self.failure_threshold = failure_threshold  # Trip after N consecutive failures
        self.recovery_timeout = recovery_timeout    # Wait N seconds before retrying
        self.success_threshold = success_threshold  # Close after N consecutive successes in half-open
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: Optional[datetime] = None

    async def call(self, func, *args, **kwargs):
        """Call a function through the circuit breaker"""
        # Check current state
        if self.state == CircuitState.OPEN:
            if self._should_try_recovery():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError(
                    f"Circuit breaker is open — retry in {self._time_until_recovery()} seconds"
                )

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                self.success_count = 0
        else:
            self.failure_count = 0

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        self.success_count = 0
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _should_try_recovery(self) -> bool:
        if self.last_failure_time is None:
            return True
        return datetime.now() - self.last_failure_time > timedelta(
            seconds=self.recovery_timeout
        )

Combining All Three Layers

class ResilientAgent:
    """Agent with full error handling"""

    def __init__(self):
        self.breakers = {
            "openai": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
            "anthropic": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
        }
        self.fallback_chain = qa_fallback
        self.error_log = []

    async def handle_request(self, user_message: str, context: dict) -> dict:
        """Handle request: retry -> fallback -> circuit break"""
        try:
            # Check circuit breaker state
            breaker = self.breakers["anthropic"]
            result = await breaker.call(
                smart_retry,
                self._primary_handler,
                user_message, context
            )
            return {"result": result, "quality": "full"}

        except CircuitBreakerOpenError:
            # Primary service tripped — use fallback chain
            return await self.fallback_chain.execute(user_message, context)

        except Exception as e:
            # Retries also failed — use fallback chain
            self.error_log.append({
                "timestamp": datetime.now().isoformat(),
                "error": str(e),
                "category": classify_error(e).value,
            })
            return await self.fallback_chain.execute(user_message, context)

Lessons from the Field

Error Rate Progression

Stage Total Error Rate User-Visible Error Rate
No error handling 12.3% 12.3%
Added Retry 5.1% 5.1%
Added Fallback 5.1% 1.8%
Added Circuit Breaker 4.8% 1.2%

Note the distinction between "total error rate" and "user-visible error rate." Retry reduces the total error rate. Fallback makes the remaining errors invisible to users (degraded service, but no error messages). Circuit Breaker further eliminates wasteful retries.

Error Type Distribution (Last 30 Days)

Error Type Share Handling
Rate limit (429) 42% Retry with backoff
JSON format errors 23% Retry with format fix
External tool timeouts 18% Fallback to cache
Content filter 9% No retry, return notice
API auth errors 5% No retry, alert
Other 3% Default retry

Pitfalls I Encountered

Pitfall 1: Retry storms. All Agents hit a rate limit simultaneously, all started retrying, and when their wait times expired they all retried again at the same time — creating even more rate limit pressure. Solution: add jitter (random variation) to spread retry times apart.

Pitfall 2: Fallback masking real problems. The fallback worked so well that the primary service was down for a full day before I noticed — because the degraded service kept covering for it. Solution: add monitoring alerts so that when fallback trigger counts exceed a threshold, I'm notified immediately.

Pitfall 3: Error log explosion. Every error was logged in full detail, producing 50MB of logs per day. Solution: aggregate by error type, only record the first occurrence plus a count for duplicates, and summarize hourly.

Pitfall 4: Over-sensitive circuit breaker thresholds. With failure_threshold set to 3, a few consecutive slow requests would trip the breaker, rejecting all requests for 60 seconds. Solution: raise the threshold to 5 and only count actual errors (not timeouts).

Takeaways

Three key takeaways:

  1. Error handling isn't nice-to-have — it's a prerequisite for production Agents. A 12% error rate means 1 in 8 users hits a problem. After adding all three defense layers, the user-visible error rate dropped to 1.2%.
  2. Classify first, then handle. Different error types need different strategies. Rate limits should be retried, content filter rejections should not, auth errors should trigger alerts. Misclassifying is more dangerous than not handling at all.
  3. The fallback chain delivers the best ROI. You don't need a perfect degradation strategy. Pulling an 80-point cached answer from history is ten thousand times better than returning "System Error."

If your Agent system is running in production, start by adding Retry with exponential backoff (1 hour of work), then Fallback (half a day), then Circuit Breaker (half a day). Each step will show a measurable drop in error rates.

What's the strangest error your Agent system has encountered? How did you handle it? Come share at the Solo Unicorn Club.