把 AI Agent 接入 Slack、邮件和 CRM

把 AI Agent 接入 Slack、邮件和 CRM
开场
我给一个 B2B SaaS 公司搭了一个 Agent,同时接入 Slack、Gmail 和 HubSpot CRM。客户在 Slack 里问产品问题,Agent 自动从 CRM 拉取客户数据,在知识库里搜索答案,把回复发到 Slack 并在 CRM 里记录对话。整套系统跑了 4 个月,处理了超过 12,000 次交互,平均响应时间 3.2 秒。这篇文章把三个渠道的接入方式、Webhook 架构和实际踩过的坑全部说清楚。
问题背景
大多数 Agent 教程假设用户通过一个 Web 界面或 API 和 Agent 交互。但在企业场景中,用户(客户或内部员工)已经在用 Slack、邮件、CRM 这些工具了。让他们切换到一个新界面是不现实的。
Agent 必须去用户在的地方。这意味着你需要:
- 监听多个渠道的消息
- 统一处理逻辑(不管从哪个渠道来,Agent 的行为一致)
- 把回复发回到对应渠道
- 在 CRM 里记录所有交互
核心架构挑战是:渠道适配层要足够薄,业务逻辑要足够集中。
核心架构
分层设计
Slack Gmail HubSpot
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Slack │ │ Email │ │ CRM │
│ Adapter │ │ Adapter │ │ Adapter │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
└──────┬──────┘──────┬──────┘
│ │
┌──────▼─────────────▼──────┐
│ Message Router │
│ (统一消息格式 + 路由) │
└────────────┬──────────────┘
│
┌────────────▼──────────────┐
│ Agent Core │
│ (LLM + Tools + RAG) │
└────────────┬──────────────┘
│
┌────────────▼──────────────┐
│ Response Dispatcher │
│ (回复到对应渠道) │
└───────────────────────────┘
关键设计原则:
- Channel Adapter 只做协议转换:Slack 的消息格式、Email 的 MIME 格式、CRM 的 API 格式,都在 Adapter 层转成统一的内部消息格式
- Agent Core 不知道消息来自哪里:它只处理标准化的输入,输出标准化的回复
- Response Dispatcher 负责回写:根据来源渠道,把回复转成对应格式发回去
实现细节
Step 1: 统一消息格式
所有渠道的消息都转成这个格式:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
class Channel(Enum):
SLACK = "slack"
EMAIL = "email"
CRM = "crm"
@dataclass
class UnifiedMessage:
"""统一消息格式"""
message_id: str # 消息唯一 ID
channel: Channel # 来源渠道
channel_id: str # 渠道内的标识(Slack channel ID, email thread ID)
sender_id: str # 发送者 ID
sender_name: str # 发送者名字
sender_email: str # 发送者邮箱(用于 CRM 关联)
content: str # 消息正文(纯文本)
attachments: list[str] = field(default_factory=list) # 附件 URL
timestamp: datetime = field(default_factory=datetime.now)
metadata: dict = field(default_factory=dict) # 渠道特定元数据
@dataclass
class AgentResponse:
"""Agent 回复格式"""
content: str
source_message_id: str
channel: Channel
channel_id: str
metadata: dict = field(default_factory=dict)
Step 2: Slack 接入
Slack 的接入通过 Bolt 框架 + Socket Mode(不需要公网 URL):
from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
import os
# 初始化 Slack App
slack_app = App(token=os.environ["SLACK_BOT_TOKEN"])
class SlackAdapter:
"""Slack 渠道适配器"""
def __init__(self, agent_core, crm_adapter):
self.agent = agent_core
self.crm = crm_adapter
def register_handlers(self, app: App):
"""注册 Slack 事件处理器"""
# 处理 @mention 消息
@app.event("app_mention")
def handle_mention(event, say):
self._process_message(event, say)
# 处理 DM 消息
@app.event("message")
def handle_dm(event, say):
# 只处理 DM,忽略 channel 消息(用 @mention)
if event.get("channel_type") == "im":
self._process_message(event, say)
def _process_message(self, event: dict, say):
"""处理 Slack 消息"""
# 忽略 bot 自己的消息
if event.get("bot_id"):
return
# 转换为统一格式
message = UnifiedMessage(
message_id=event["ts"],
channel=Channel.SLACK,
channel_id=event["channel"],
sender_id=event["user"],
sender_name=self._get_user_name(event["user"]),
sender_email=self._get_user_email(event["user"]),
content=self._clean_mention(event["text"]),
timestamp=datetime.fromtimestamp(float(event["ts"]))
)
# 发送 "正在思考" 的即时反馈
say("正在处理您的问题,请稍候...", thread_ts=event["ts"])
# 调用 Agent Core
response = self.agent.handle(message)
# 回复到 Slack(在原消息的 thread 里)
say(
text=response.content,
thread_ts=event["ts"]
)
# 在 CRM 记录这次交互
self.crm.log_interaction(message, response)
def _clean_mention(self, text: str) -> str:
"""去掉 @mention 标记,只保留问题内容"""
import re
return re.sub(r"<@[A-Z0-9]+>", "", text).strip()
def _get_user_email(self, user_id: str) -> str:
"""从 Slack API 获取用户邮箱(用于 CRM 关联)"""
from slack_sdk import WebClient
client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
result = client.users_info(user=user_id)
return result["user"]["profile"].get("email", "")
def _get_user_name(self, user_id: str) -> str:
from slack_sdk import WebClient
client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
result = client.users_info(user=user_id)
return result["user"]["real_name"]
# 启动
adapter = SlackAdapter(agent_core, crm_adapter)
adapter.register_handlers(slack_app)
handler = SocketModeHandler(slack_app, os.environ["SLACK_APP_TOKEN"])
handler.start()
Slack 接入的坑:
- Slack 要求 3 秒内响应 event,否则会重试(导致重复处理)。解决方案:先立刻回复 "正在处理",然后异步处理 Agent 逻辑
- Slack 的消息格式有 mrkdwn(不是 Markdown),链接和粗体的语法不同
Step 3: 邮件接入
邮件接入用 Gmail API + Pub/Sub 做实时监听:
import base64
from email.mime.text import MIMEText
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
class EmailAdapter:
"""邮件渠道适配器"""
def __init__(self, agent_core, crm_adapter):
self.agent = agent_core
self.crm = crm_adapter
self.service = self._init_gmail_service()
def _init_gmail_service(self):
"""初始化 Gmail API 客户端"""
creds = Credentials.from_authorized_user_file("credentials/gmail_token.json")
return build("gmail", "v1", credentials=creds)
async def process_new_email(self, message_id: str):
"""处理新收到的邮件"""
# 获取邮件内容
msg = self.service.users().messages().get(
userId="me", id=message_id, format="full"
).execute()
headers = {h["name"]: h["value"] for h in msg["payload"]["headers"]}
# 提取纯文本内容
body = self._extract_body(msg["payload"])
# 转换为统一格式
message = UnifiedMessage(
message_id=message_id,
channel=Channel.EMAIL,
channel_id=headers.get("Message-ID", message_id),
sender_id=headers.get("From", ""),
sender_name=self._extract_name(headers.get("From", "")),
sender_email=self._extract_email(headers.get("From", "")),
content=body,
metadata={
"subject": headers.get("Subject", ""),
"thread_id": msg.get("threadId", ""),
"in_reply_to": headers.get("In-Reply-To", "")
}
)
# 调用 Agent
response = self.agent.handle(message)
# 发送回复邮件
self._send_reply(message, response)
# CRM 记录
self.crm.log_interaction(message, response)
def _send_reply(self, original: UnifiedMessage, response: AgentResponse):
"""回复邮件"""
reply = MIMEText(response.content, "plain", "utf-8")
reply["To"] = original.sender_email
reply["Subject"] = f"Re: {original.metadata.get('subject', '')}"
reply["In-Reply-To"] = original.metadata.get("thread_id", "")
reply["References"] = original.metadata.get("thread_id", "")
raw = base64.urlsafe_b64encode(reply.as_bytes()).decode()
self.service.users().messages().send(
userId="me",
body={
"raw": raw,
"threadId": original.metadata.get("thread_id", "")
}
).execute()
def _extract_body(self, payload: dict) -> str:
"""从邮件 payload 中提取纯文本"""
if payload.get("mimeType") == "text/plain":
data = payload["body"].get("data", "")
return base64.urlsafe_b64decode(data).decode("utf-8")
# 多部分邮件,递归查找 text/plain
for part in payload.get("parts", []):
result = self._extract_body(part)
if result:
return result
return ""
@staticmethod
def _extract_email(from_header: str) -> str:
import re
match = re.search(r"<(.+?)>", from_header)
return match.group(1) if match else from_header
@staticmethod
def _extract_name(from_header: str) -> str:
import re
match = re.search(r"^(.+?)\s*<", from_header)
return match.group(1).strip('" ') if match else ""
Step 4: CRM 集成(HubSpot)
CRM 集成有两个方向:从 CRM 读取客户数据(给 Agent 提供上下文),以及往 CRM 写入交互记录。
import httpx
class HubSpotAdapter:
"""HubSpot CRM 适配器"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.hubapi.com"
async def get_customer_context(self, email: str) -> dict | None:
"""从 CRM 获取客户上下文(给 Agent 用)"""
async with httpx.AsyncClient() as client:
# 按邮箱搜索联系人
resp = await client.post(
f"{self.base_url}/crm/v3/objects/contacts/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"filterGroups": [{
"filters": [{
"propertyName": "email",
"operator": "EQ",
"value": email
}]
}],
"properties": [
"firstname", "lastname", "company",
"lifecyclestage", "hs_lead_status"
]
}
)
if resp.status_code != 200:
return None
results = resp.json().get("results", [])
if not results:
return None
contact = results[0]["properties"]
return {
"name": f"{contact.get('firstname', '')} {contact.get('lastname', '')}",
"company": contact.get("company", ""),
"stage": contact.get("lifecyclestage", ""),
"status": contact.get("hs_lead_status", "")
}
async def log_interaction(
self, message: UnifiedMessage, response: AgentResponse
):
"""在 CRM 中记录交互"""
async with httpx.AsyncClient() as client:
# 查找或创建联系人
contact_id = await self._find_or_create_contact(
client, message.sender_email, message.sender_name
)
if not contact_id:
return
# 创建 note
await client.post(
f"{self.base_url}/crm/v3/objects/notes",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"properties": {
"hs_note_body": (
f"[AI Agent - {message.channel.value}]\n\n"
f"客户问题: {message.content[:500]}\n\n"
f"Agent 回复: {response.content[:500]}"
),
"hs_timestamp": message.timestamp.isoformat()
},
"associations": [{
"to": {"id": contact_id},
"types": [{
"associationCategory": "HUBSPOT_DEFINED",
"associationTypeId": 202 # note-to-contact
}]
}]
}
)
async def _find_or_create_contact(
self, client: httpx.AsyncClient, email: str, name: str
) -> str | None:
"""查找联系人,不存在则创建"""
# 搜索
resp = await client.post(
f"{self.base_url}/crm/v3/objects/contacts/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"filterGroups": [{
"filters": [{
"propertyName": "email",
"operator": "EQ",
"value": email
}]
}]
}
)
results = resp.json().get("results", [])
if results:
return results[0]["id"]
# 创建
parts = name.split(" ", 1)
resp = await client.post(
f"{self.base_url}/crm/v3/objects/contacts",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"properties": {
"email": email,
"firstname": parts[0] if parts else "",
"lastname": parts[1] if len(parts) > 1 else ""
}
}
)
if resp.status_code == 201:
return resp.json()["id"]
return None
Step 5: 消息路由器
class MessageRouter:
"""统一消息路由器"""
def __init__(self, agent_core, crm: HubSpotAdapter):
self.agent = agent_core
self.crm = crm
async def route(self, message: UnifiedMessage) -> AgentResponse:
"""路由消息到 Agent,附加 CRM 上下文"""
# 从 CRM 获取客户上下文
customer_context = await self.crm.get_customer_context(
message.sender_email
)
# 给 Agent 注入客户上下文
enriched_content = message.content
if customer_context:
enriched_content = (
f"[客户信息: {customer_context['name']}, "
f"公司: {customer_context['company']}, "
f"阶段: {customer_context['stage']}]\n\n"
f"客户问题: {message.content}"
)
# 调用 Agent
agent_reply = await self.agent.handle(enriched_content)
return AgentResponse(
content=agent_reply,
source_message_id=message.message_id,
channel=message.channel,
channel_id=message.channel_id
)
实战经验
生产数据(运行 4 个月)
| 指标 | Slack | 邮件 | CRM |
|---|---|---|---|
| 月均交互量 | 2,100 | 850 | 200(主动触发) |
| 平均响应时间 | 3.2s | 45s(含邮件发送) | 2.8s |
| 自动解决率 | 74% | 61% | 82% |
| 月 API 成本 | $180 | $95 | $35 |
邮件的自动解决率低于 Slack,主要因为邮件内容更长、更正式、上下文更复杂。
踩过的坑
坑 1:Slack 的 3 秒超时是硬限制。 Slack 的 Events API 要求你在 3 秒内返回 200 OK,否则会重发事件。如果 Agent 处理需要 5 秒,你会收到重复事件。解决方案:立刻返回 200,把消息丢进队列异步处理。用 event ts 做去重。
坑 2:邮件线程的上下文管理。 Gmail 的 thread ID 可以把同一主题的邮件串起来。但如果客户换了主题继续聊,就变成了新 thread。Agent 丢失了之前的上下文。解决方案:在 CRM 里按客户 email 维护对话历史,不依赖邮件 thread。
坑 3:CRM 写入的 rate limit。 HubSpot API 的免费版限制 100 requests/10s。Agent 高峰期可以突破这个限制。解决方案:CRM 写入用队列异步处理,设置 rate limiter。交互记录延迟写入不影响用户体验。
坑 4:跨渠道客户识别。 同一个客户可能在 Slack 用公司邮箱,在邮件用个人邮箱。Agent 不知道是同一个人。解决方案:在 CRM 里做客户合并,用 Slack user ID + email 做联合匹配。
总结
三条核心 takeaway:
-
Channel Adapter 要足够薄——Slack、邮件、CRM 的协议细节全部在 Adapter 层处理。Agent Core 只看到统一格式的消息。这样加一个新渠道(比如微信、WhatsApp)只需要写一个新 Adapter,不用改 Agent 逻辑。
-
CRM 集成是价值放大器——Agent 读 CRM 数据来个性化回答("张总,您的企业版订阅下个月到期"),写 CRM 数据来留下完整记录。没有 CRM 集成的 Agent 就是一个高级 FAQ 机器人。
-
异步处理是多渠道 Agent 的必需品——Slack 有 3 秒超时,邮件处理可能要 10 秒,CRM 写入有 rate limit。所有耗时操作必须异步化。消息队列(Redis Queue 或 Celery)是必备组件。
如果你要给 Agent 接入业务工具,建议先接 Slack(最简单,反馈最快),跑通后再接邮件和 CRM。三个渠道的核心 Agent 逻辑是完全一样的,只有 Adapter 层不同。
你的 Agent 接入了哪些渠道?遇到了什么问题?欢迎交流。