从原型到生产 — 企业 AI Agent 上线清单

从原型到生产 — 企业 AI Agent 上线清单
开场
我见过太多 Agent 项目死在从原型到生产的路上。不是因为模型不行或功能没做完,而是因为缺了 "无聊但关键" 的基础设施——日志、监控、限流、错误处理、安全审计。一个能在 Jupyter notebook 里跑的 Agent 和一个能在生产环境持续运行的 Agent,中间差了至少 40% 的工作量。这篇文章是我帮 3 个企业客户做 Agent 上线时总结的完整清单。
问题背景
原型阶段和生产阶段的核心区别:
| 维度 | 原型 | 生产 |
|---|---|---|
| 用户量 | 你自己 | 几百到几万 |
| 错误容忍度 | 错了重跑 | 错了丢客户 |
| 运行时间 | 跑一次 | 7x24 |
| 数据敏感度 | 测试数据 | 真实用户数据 |
| 成本控制 | 无所谓 | 每分钱都要算 |
| 可观测性 | print 调试 | 完整的 tracing |
企业客户特别关心三个问题:这个 Agent 出了问题我怎么知道?它会不会泄露数据?月成本能不能控制在预算内?
上线清单
我把清单分成 6 个模块,按优先级排序。
模块 1: 可观测性(Observability)
这是最重要的模块。一个你看不到内部状态的 Agent,和一个黑盒没区别。
import logging
import time
import json
from contextlib import contextmanager
from dataclasses import dataclass, field
from uuid import uuid4
# 结构化日志配置
logging.basicConfig(
format='{"timestamp":"%(asctime)s","level":"%(levelname)s","message":%(message)s}',
level=logging.INFO
)
logger = logging.getLogger("agent")
@dataclass
class AgentTrace:
"""Agent 执行的完整追踪记录"""
trace_id: str = field(default_factory=lambda: str(uuid4()))
steps: list[dict] = field(default_factory=list)
total_tokens: int = 0
total_cost: float = 0.0
start_time: float = 0.0
def add_step(self, step_type: str, **kwargs):
self.steps.append({
"type": step_type,
"timestamp": time.time(),
**kwargs
})
def log_llm_call(self, model: str, input_tokens: int, output_tokens: int):
"""记录每次 LLM 调用的 token 消耗"""
cost = self._calculate_cost(model, input_tokens, output_tokens)
self.total_tokens += input_tokens + output_tokens
self.total_cost += cost
self.add_step(
"llm_call",
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost
)
def log_tool_call(self, tool_name: str, input_data: dict, output: str, latency_ms: float):
"""记录工具调用"""
self.add_step(
"tool_call",
tool=tool_name,
input=input_data,
output_length=len(output),
latency_ms=latency_ms
)
def finalize(self) -> dict:
"""生成最终追踪报告"""
elapsed = time.time() - self.start_time
return {
"trace_id": self.trace_id,
"total_steps": len(self.steps),
"total_tokens": self.total_tokens,
"total_cost": self.total_cost,
"total_time_seconds": elapsed,
"steps": self.steps
}
@staticmethod
def _calculate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
# 2026 年 3 月 Claude API 定价
pricing = {
"claude-sonnet-4-5-20250514": {"input": 3.0, "output": 15.0},
"claude-haiku-4-5-20250514": {"input": 1.0, "output": 5.0},
}
rates = pricing.get(model, {"input": 3.0, "output": 15.0})
return (input_tokens * rates["input"] + output_tokens * rates["output"]) / 1_000_000
# 使用方式
@contextmanager
def traced_agent_call(user_id: str):
"""上下文管理器:自动追踪 Agent 调用"""
trace = AgentTrace(start_time=time.time())
try:
yield trace
finally:
report = trace.finalize()
logger.info(json.dumps({
"event": "agent_call_complete",
"user_id": user_id,
**report
}))
# 写入监控系统(Prometheus/DataDog/自建)
metrics.record_agent_call(report)
必须监控的指标:
- 每次请求的 token 消耗和成本
- 端到端延迟(P50, P95, P99)
- 工具调用成功率和延迟
- LLM API 错误率
- 人工介入率
模块 2: 错误处理(Error Handling)
Agent 的错误处理比传统应用复杂——LLM 的 "错误" 可能不是异常,而是输出了错误的内容。
import anthropic
from enum import Enum
class AgentError(Enum):
LLM_API_ERROR = "llm_api_error" # API 调用失败
LLM_TIMEOUT = "llm_timeout" # API 超时
TOOL_ERROR = "tool_error" # 工具执行失败
PARSE_ERROR = "parse_error" # 输出解析失败
SAFETY_VIOLATION = "safety_violation" # 安全规则触发
BUDGET_EXCEEDED = "budget_exceeded" # 成本超限
LOOP_DETECTED = "loop_detected" # 检测到循环
class ResilientAgent:
"""带完整错误处理的 Agent"""
def __init__(self, max_retries: int = 3, max_cost_per_request: float = 0.50):
self.client = anthropic.Anthropic()
self.max_retries = max_retries
self.max_cost = max_cost_per_request
async def handle_request(self, message: str, user_id: str) -> str:
trace = AgentTrace(start_time=time.time())
try:
return await self._execute_with_retry(message, user_id, trace)
except Exception as e:
logger.error(json.dumps({
"event": "agent_error",
"user_id": user_id,
"error_type": type(e).__name__,
"error_message": str(e),
"trace": trace.finalize()
}))
# 优雅降级:返回预设的兜底回复
return self._fallback_response(e)
async def _execute_with_retry(
self, message: str, user_id: str, trace: AgentTrace
) -> str:
last_error = None
for attempt in range(self.max_retries):
try:
# 成本检查
if trace.total_cost >= self.max_cost:
raise BudgetExceededError(
f"单次请求成本已达 ${trace.total_cost:.3f},超过上限 ${self.max_cost}"
)
response = await self._call_llm(message, trace)
return response
except anthropic.RateLimitError:
# 指数退避
wait_time = 2 ** attempt
logger.warning(f"Rate limited, waiting {wait_time}s (attempt {attempt + 1})")
await asyncio.sleep(wait_time)
last_error = "rate_limit"
except anthropic.APITimeoutError:
logger.warning(f"API timeout (attempt {attempt + 1})")
last_error = "timeout"
except anthropic.APIError as e:
if e.status_code >= 500:
# 服务端错误,重试
logger.warning(f"Server error {e.status_code} (attempt {attempt + 1})")
last_error = f"server_error_{e.status_code}"
else:
# 客户端错误(400, 401 等),不重试
raise
raise MaxRetriesExceededError(f"重试 {self.max_retries} 次后仍然失败: {last_error}")
def _fallback_response(self, error: Exception) -> str:
"""根据错误类型返回不同的兜底回复"""
if isinstance(error, BudgetExceededError):
return "您的问题比较复杂,我已转给人工客服为您处理。"
elif isinstance(error, MaxRetriesExceededError):
return "系统暂时繁忙,请稍后再试,或联系人工客服。"
else:
return "抱歉,处理遇到了问题。我已记录下您的问题,客服团队会尽快跟进。"
模块 3: 速率限制和成本控制
from collections import defaultdict
import time
class CostController:
"""Agent 成本控制器"""
def __init__(self):
# 按用户限制
self.user_daily_cost: dict[str, float] = defaultdict(float)
self.user_daily_requests: dict[str, int] = defaultdict(int)
# 全局限制
self.global_hourly_cost: float = 0.0
self.last_reset: float = time.time()
def check_limits(self, user_id: str) -> tuple[bool, str]:
"""检查是否超过限制"""
self._maybe_reset()
# 用户级限制:每天最多 $2,50 次请求
if self.user_daily_cost[user_id] >= 2.0:
return False, "您今日的使用额度已用完"
if self.user_daily_requests[user_id] >= 50:
return False, "请求频率过高,请稍后再试"
# 全局限制:每小时最多 $100
if self.global_hourly_cost >= 100.0:
return False, "系统繁忙,请稍后再试"
return True, ""
def record_cost(self, user_id: str, cost: float):
self.user_daily_cost[user_id] += cost
self.user_daily_requests[user_id] += 1
self.global_hourly_cost += cost
def _maybe_reset(self):
now = time.time()
# 每小时重置全局计数
if now - self.last_reset > 3600:
self.global_hourly_cost = 0.0
self.last_reset = now
模块 4: 安全加固
import re
class SecurityLayer:
"""Agent 安全层"""
# PII 检测正则
PII_PATTERNS = {
"phone": r"1[3-9]\d{9}",
"id_card": r"\d{17}[\dXx]",
"email": r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}",
"credit_card": r"\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}",
}
@staticmethod
def sanitize_output(text: str) -> str:
"""脱敏 Agent 输出中的 PII"""
for pii_type, pattern in SecurityLayer.PII_PATTERNS.items():
text = re.sub(pattern, f"[{pii_type}_REDACTED]", text)
return text
@staticmethod
def detect_prompt_injection(user_input: str) -> bool:
"""检测 prompt injection 攻击"""
injection_patterns = [
r"ignore\s+(previous|above|all)\s+(instructions|prompts)",
r"忽略(之前|上面|所有)(的)?(指令|规则|提示)",
r"system\s*prompt",
r"你的(系统|初始)提示",
r"IMPORTANT:\s*NEW\s*INSTRUCTIONS",
]
for pattern in injection_patterns:
if re.search(pattern, user_input, re.IGNORECASE):
return True
return False
@staticmethod
def validate_tool_output(tool_name: str, output: str) -> str:
"""验证工具输出,防止数据泄露"""
# 移除可能的内部 URL
output = re.sub(r"https?://internal\.[^\s]+", "[INTERNAL_URL_REDACTED]", output)
# 移除数据库连接字符串
output = re.sub(r"(postgresql|mysql|mongodb)://[^\s]+", "[DB_URL_REDACTED]", output)
return output
模块 5: 监控告警
# 监控指标和告警规则(概念代码,实际用 Prometheus + Grafana)
ALERT_RULES = {
"high_error_rate": {
"condition": "error_rate_5min > 5%",
"severity": "critical",
"action": "通知值班工程师,自动切换到兜底模式"
},
"cost_spike": {
"condition": "hourly_cost > 2x 历史均值",
"severity": "warning",
"action": "通知负责人,检查是否有异常流量"
},
"high_latency": {
"condition": "p95_latency > 10s",
"severity": "warning",
"action": "检查 LLM API 状态,考虑降级到更快模型"
},
"low_satisfaction": {
"condition": "csat_daily < 3.5/5",
"severity": "warning",
"action": "拉取低分对话日志,分析原因"
},
"api_quota_approaching": {
"condition": "daily_tokens > 80% of quota",
"severity": "info",
"action": "准备切换到 Batch API 或降级模型"
}
}
模块 6: 上线前检查清单
## 上线前必须完成的检查
### 基础设施
- [ ] 日志系统配置完成,结构化日志可搜索
- [ ] 监控 dashboard 搭建完成
- [ ] 告警规则配置完成并测试过
- [ ] 错误处理和兜底回复覆盖所有异常路径
### 安全
- [ ] PII 脱敏逻辑已实现
- [ ] Prompt injection 检测已部署
- [ ] API key 使用环境变量或 secret manager,不硬编码
- [ ] Agent 使用的数据库账号为只读 + 最小权限
- [ ] 工具输出经过验证和清洗
### 成本
- [ ] 用户级和全局级成本限制已配置
- [ ] 单次请求的最大 token 数已设置
- [ ] 成本监控 dashboard 可用
- [ ] Batch API 降级方案已准备
### 测试
- [ ] 单元测试 100% 通过
- [ ] 集成测试覆盖所有工具调用路径
- [ ] LLM-as-Judge 评估通过率 > 85%
- [ ] 回归测试套件可自动运行
- [ ] 边界场景测试(空输入、超长输入、恶意输入)
### 运维
- [ ] 灰度发布策略确定(先放 5% 流量)
- [ ] 回滚方案已验证
- [ ] 值班制度和 escalation 路径明确
- [ ] 知识库更新流程已建立
实战经验
生产数据
我帮企业客户上线的 Agent,上线前后的对比:
| 指标 | 没有清单时(V1) | 按清单上线(V2) |
|---|---|---|
| 上线后第一周事故数 | 7 次 | 0 次 |
| 平均故障发现时间 | 4.5 小时 | 3 分钟(告警触发) |
| 平均故障恢复时间 | 2 小时 | 15 分钟(有回滚方案) |
| 月成本超预算 | 是(2.3 倍) | 否(控制在 ±10%) |
| 安全事件 | 1 次(PII 泄露) | 0 次 |
踩过的坑
坑 1:日志太多等于没有日志。 最初我记录了每次 LLM 调用的完整 prompt 和 response,单日日志量 50GB。查问题时根本无法搜索。解决方案:正常请求只记录 metadata(trace_id, token 数, 延迟, 工具调用),完整内容只在错误时记录。
坑 2:灰度发布比直接上线多花一周,但值得。 我们先让 5% 的流量走新 Agent,观察一周后扩到 20%,再一周后全量。在 5% 阶段就发现了一个时区处理 bug,如果直接全量发布,会影响所有用户。
坑 3:成本控制要在请求级别而非月度级别。 一个 Agent 如果进入了 tool call 死循环,一次请求可能消耗几万 token(几美元)。月度成本限制没用,必须有单次请求的上限。
总结
三条核心 takeaway:
-
可观测性是第一优先级——你可以没有最好的模型、最完美的 prompt,但你不能没有日志和监控。出了问题,你需要在 5 分钟内知道发生了什么、影响了多少用户、根因是什么。
-
安全和成本控制不是 "以后再加" 的东西——上线前就要做好 PII 脱敏、prompt injection 防护、成本限制。生产环境的第一次安全事故可能就是最后一次——你会失去客户的信任。
-
清单比经验可靠——飞行员有起飞清单,医生有手术清单,Agent 上线也需要清单。不是因为你不够聪明,而是因为要检查的东西太多,人的注意力有限。把这个清单固化到发布流程中。
如果你正在准备 Agent 上线,建议先把这个清单过一遍,标出还没做的项目,按优先级逐个解决。清单上标红的项目必须全部完成才能上线。
你的 Agent 上线过程中遇到过哪些意外?欢迎分享经验。