Solo Unicorn Club logoSolo Unicorn
2,640

AI Agent 错误处理 — 没人谈的框架

AI Agent错误处理RetryFallback容错生产系统
AI Agent 错误处理 — 没人谈的框架

AI Agent 错误处理 — 没人谈的框架

开场

我的 Agent 系统上线第一周,每天大概有 12% 的请求失败。原因五花八门:API rate limit、模型返回格式错误、外部工具超时、JSON 解析失败。最离谱的一次是模型返回了一段 Markdown 表格,我的代码期望 JSON,直接崩了。两个月后,同一个系统的错误率降到了 1.8%。不是换了更好的模型,也不是重写了业务逻辑,纯粹是加了一套系统化的错误处理框架。这篇文章拆解这个框架。

问题背景

Agent 系统的错误和传统软件不同。传统软件的错误是确定性的——同样的输入总是产生同样的错误。Agent 系统的错误是概率性的——同样的 prompt 可能第一次返回完美的 JSON,第二次返回一段自由文本,第三次触发 content filter 被拒绝。

这意味着传统的错误处理思路(捕获异常 → 返回错误码)不够用。你需要:

  1. 分类:知道是什么类型的错误(可重试的 vs 不可重试的)
  2. 重试:有策略地重试(不是无脑循环)
  3. 降级:重试失败后有备选方案
  4. 熔断:连续失败时暂停调用,避免雪崩
  5. 监控:知道系统在哪里、多频繁地出错

核心框架:错误分类 + 三层防御

错误分类

先把 Agent 系统的错误分成四类:

from enum import Enum

class ErrorCategory(Enum):
    # 可重试的临时性错误
    TRANSIENT = "transient"       # API rate limit、网络超时、503
    # 可重试的格式错误(换个 prompt 可能就好了)
    FORMAT = "format"             # JSON 解析失败、格式不符合预期
    # 不可重试的业务错误
    BUSINESS = "business"         # 模型拒绝回答、content filter
    # 不可重试的系统错误
    FATAL = "fatal"               # API key 无效、模型不存在、余额不足

def classify_error(error: Exception) -> ErrorCategory:
    """错误分类器"""
    error_msg = str(error).lower()

    # Rate limit 和超时:可重试
    if any(keyword in error_msg for keyword in ["rate_limit", "429", "timeout", "503", "502"]):
        return ErrorCategory.TRANSIENT

    # JSON 解析错误:可用不同方式重试
    if any(keyword in error_msg for keyword in ["json", "parse", "decode", "validation"]):
        return ErrorCategory.FORMAT

    # 内容审核被拒:不可重试
    if any(keyword in error_msg for keyword in ["content_filter", "moderation", "refused"]):
        return ErrorCategory.BUSINESS

    # 认证/配额错误:致命
    if any(keyword in error_msg for keyword in ["auth", "401", "403", "insufficient_quota"]):
        return ErrorCategory.FATAL

    # 默认当临时性错误处理
    return ErrorCategory.TRANSIENT

第一层防御:Smart Retry

不是简单的重试 N 次,而是根据错误类型采用不同策略。

import asyncio
import random
from functools import wraps

class RetryConfig:
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0,
                 max_delay: float = 30.0, jitter: bool = True):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter

# 不同错误类型的重试策略
RETRY_POLICIES = {
    ErrorCategory.TRANSIENT: RetryConfig(
        max_retries=3,
        base_delay=2.0,    # 指数退避:2s → 4s → 8s
        max_delay=30.0,
    ),
    ErrorCategory.FORMAT: RetryConfig(
        max_retries=2,
        base_delay=0.5,    # 格式错误不需要长等待
        max_delay=2.0,
    ),
    ErrorCategory.BUSINESS: RetryConfig(max_retries=0),  # 不重试
    ErrorCategory.FATAL: RetryConfig(max_retries=0),      # 不重试
}

async def smart_retry(func, *args, **kwargs):
    """智能重试:根据错误类型选择策略"""
    last_error = None

    for attempt in range(4):  # 最多尝试 4 次(1 次原始 + 3 次重试)
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            last_error = e
            category = classify_error(e)
            policy = RETRY_POLICIES[category]

            if attempt >= policy.max_retries:
                break  # 超过该类型的最大重试次数

            # 计算等待时间(指数退避 + 随机抖动)
            delay = min(
                policy.base_delay * (2 ** attempt),
                policy.max_delay
            )
            if policy.jitter:
                delay *= (0.5 + random.random())  # 50%-150% 的抖动

            # 格式错误:修改请求参数再重试
            if category == ErrorCategory.FORMAT:
                kwargs = _fix_format_request(kwargs, e)

            await asyncio.sleep(delay)

    raise last_error

def _fix_format_request(kwargs: dict, error: Exception) -> dict:
    """格式错误的自动修复:调整请求参数"""
    # 策略 1:如果 JSON 解析失败,在 prompt 中强调输出格式
    if "json" in str(error).lower():
        messages = kwargs.get("messages", [])
        if messages:
            messages[-1]["content"] += "\n\n注意:你必须输出合法的 JSON 格式。不要添加任何 Markdown 标记或额外文字。"
    # 策略 2:降低 temperature 增加确定性
    kwargs["temperature"] = max(0, kwargs.get("temperature", 0.7) - 0.3)
    return kwargs

第二层防御:Fallback Chain

重试都失败了,启动降级方案。

from dataclasses import dataclass
from typing import Callable, Optional

@dataclass
class FallbackOption:
    name: str
    func: Callable
    quality_level: str   # "full" | "degraded" | "minimal"
    max_latency_ms: int

class FallbackChain:
    """降级链:按优先级尝试不同方案"""

    def __init__(self, options: list[FallbackOption]):
        self.options = options  # 按优先级排序

    async def execute(self, *args, **kwargs) -> dict:
        errors = []
        for option in self.options:
            try:
                result = await asyncio.wait_for(
                    option.func(*args, **kwargs),
                    timeout=option.max_latency_ms / 1000
                )
                return {
                    "result": result,
                    "quality": option.quality_level,
                    "fallback_used": option.name,
                }
            except Exception as e:
                errors.append(f"{option.name}: {e}")
                continue

        # 所有方案都失败
        return {
            "result": "抱歉,系统暂时无法处理您的请求,请稍后再试。",
            "quality": "failed",
            "errors": errors,
        }

# 使用示例:Q&A Agent 的降级链
qa_fallback = FallbackChain([
    FallbackOption(
        name="claude_sonnet",
        func=lambda q, ctx: call_llm("claude-sonnet-4-5", q, ctx),
        quality_level="full",
        max_latency_ms=10000,
    ),
    FallbackOption(
        name="gpt4.1",
        func=lambda q, ctx: call_llm("gpt-4.1", q, ctx),
        quality_level="full",
        max_latency_ms=8000,
    ),
    FallbackOption(
        name="gpt4.1_mini",
        func=lambda q, ctx: call_llm("gpt-4.1-mini", q, ctx),
        quality_level="degraded",
        max_latency_ms=5000,
    ),
    FallbackOption(
        name="cache_lookup",
        func=lambda q, ctx: search_cached_answers(q),
        quality_level="minimal",
        max_latency_ms=1000,
    ),
])

关键设计:降级链不只是换模型。最后一个 fallback 是从缓存中查找相似问题的历史回答——完全不调 API,延迟极低,虽然回答可能不完美,但总比返回错误好。

第三层防御:Circuit Breaker

连续失败时暂停调用,避免:1)浪费钱;2)加剧上游压力;3)阻塞后续请求。

from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"       # 正常运行
    OPEN = "open"           # 熔断,拒绝所有请求
    HALF_OPEN = "half_open" # 试探性地放行少量请求

class CircuitBreaker:
    """熔断器:连续失败时自动断开"""

    def __init__(self, failure_threshold: int = 5,
                 recovery_timeout: int = 60,
                 success_threshold: int = 3):
        self.failure_threshold = failure_threshold  # 连续失败 N 次后熔断
        self.recovery_timeout = recovery_timeout    # 熔断后等待 N 秒再试
        self.success_threshold = success_threshold  # 半开状态连续成功 N 次后恢复
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: Optional[datetime] = None

    async def call(self, func, *args, **kwargs):
        """通过熔断器调用函数"""
        # 检查当前状态
        if self.state == CircuitState.OPEN:
            if self._should_try_recovery():
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpenError(
                    f"熔断器打开中,{self._time_until_recovery()}秒后重试"
                )

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                self.success_count = 0
        else:
            self.failure_count = 0

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        self.success_count = 0
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

    def _should_try_recovery(self) -> bool:
        if self.last_failure_time is None:
            return True
        return datetime.now() - self.last_failure_time > timedelta(
            seconds=self.recovery_timeout
        )

把三层防御组合起来

class ResilientAgent:
    """带完整错误处理的 Agent"""

    def __init__(self):
        self.breakers = {
            "openai": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
            "anthropic": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
        }
        self.fallback_chain = qa_fallback
        self.error_log = []

    async def handle_request(self, user_message: str, context: dict) -> dict:
        """处理请求:重试 → 降级 → 熔断"""
        try:
            # 检查熔断器状态
            breaker = self.breakers["anthropic"]
            result = await breaker.call(
                smart_retry,
                self._primary_handler,
                user_message, context
            )
            return {"result": result, "quality": "full"}

        except CircuitBreakerOpenError:
            # 主服务熔断,走降级链
            return await self.fallback_chain.execute(user_message, context)

        except Exception as e:
            # 重试也失败了,走降级链
            self.error_log.append({
                "timestamp": datetime.now().isoformat(),
                "error": str(e),
                "category": classify_error(e).value,
            })
            return await self.fallback_chain.execute(user_message, context)

实战经验

错误率变化

阶段 总错误率 对用户可见的错误率
无错误处理 12.3% 12.3%
加 Retry 5.1% 5.1%
加 Fallback 5.1% 1.8%
加 Circuit Breaker 4.8% 1.2%

注意"总错误率"和"对用户可见的错误率"的区别。Retry 减少了总错误率,Fallback 让剩余的错误对用户不可见(降级服务但不报错),Circuit Breaker 进一步减少了无效重试。

错误类型分布(过去 30 天)

错误类型 占比 处理方式
Rate limit (429) 42% Retry with backoff
JSON 格式错误 23% Retry with format fix
外部工具超时 18% Fallback to cache
Content filter 9% 不重试,返回提示
API 认证错误 5% 不重试,告警
其他 3% 默认重试

踩过的坑

坑 1:重试风暴。所有 Agent 同时遇到 rate limit,全部开始重试,等待时间一到又同时重试,制造更大的 rate limit 压力。解决方案:加 jitter(随机抖动),让重试时间分散开。

坑 2:Fallback 掩盖了问题。Fallback 太好用了,以至于主服务挂了一天我都没发现——因为降级服务一直在兜底。解决方案:加监控告警,fallback 触发次数超过阈值立即通知。

坑 3:错误日志爆炸。每个错误都详细记录,一天产生 50MB 的日志。解决方案:按错误类型聚合,相同错误只记录首次和计数,每小时汇总一次。

坑 4:熔断器的阈值太敏感。failure_threshold 设为 3,偶尔连续三个慢请求就触发熔断,然后所有请求都被拒绝 60 秒。解决方案:把 threshold 调到 5,并且只统计真正的错误(不计超时)。

总结

三条 takeaway:

  1. 错误处理不是 nice-to-have,是 Agent 上生产的前提——12% 的错误率意味着每 8 个用户里有 1 个遇到问题。加上三层防御后,对用户可见的错误率降到 1.2%
  2. 先分类,再处理——不同类型的错误需要不同的策略。Rate limit 要重试,content filter 不要重试,认证错误要告警。分错类比不处理更危险
  3. Fallback 链是性价比最高的投入——不需要完美的降级方案。从缓存里找一个 80 分的历史回答,比返回"系统错误"好一万倍

如果你的 Agent 系统在生产中跑着,先加一层 Retry with exponential backoff(1 小时搞定),然后加 Fallback(半天),最后加 Circuit Breaker(半天)。每一步都能看到错误率的下降。

你的 Agent 系统遇到过什么奇葩错误?怎么处理的?来一人独角兽俱乐部交流。