Solo Unicorn Club logoSolo Unicorn
2,750

把 AI Agent 接入 Slack、邮件和 CRM

AI AgentSlack邮件集成CRMWebhook系统集成实战
把 AI Agent 接入 Slack、邮件和 CRM

把 AI Agent 接入 Slack、邮件和 CRM

开场

我给一个 B2B SaaS 公司搭了一个 Agent,同时接入 Slack、Gmail 和 HubSpot CRM。客户在 Slack 里问产品问题,Agent 自动从 CRM 拉取客户数据,在知识库里搜索答案,把回复发到 Slack 并在 CRM 里记录对话。整套系统跑了 4 个月,处理了超过 12,000 次交互,平均响应时间 3.2 秒。这篇文章把三个渠道的接入方式、Webhook 架构和实际踩过的坑全部说清楚。

问题背景

大多数 Agent 教程假设用户通过一个 Web 界面或 API 和 Agent 交互。但在企业场景中,用户(客户或内部员工)已经在用 Slack、邮件、CRM 这些工具了。让他们切换到一个新界面是不现实的。

Agent 必须去用户在的地方。这意味着你需要:

  1. 监听多个渠道的消息
  2. 统一处理逻辑(不管从哪个渠道来,Agent 的行为一致)
  3. 把回复发回到对应渠道
  4. 在 CRM 里记录所有交互

核心架构挑战是:渠道适配层要足够薄,业务逻辑要足够集中。

核心架构

分层设计

   Slack         Gmail        HubSpot
     │             │             │
     ▼             ▼             ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Slack    │ │ Email    │ │ CRM      │
│ Adapter  │ │ Adapter  │ │ Adapter  │
└────┬─────┘ └────┬─────┘ └────┬─────┘
     │             │             │
     └──────┬──────┘──────┬──────┘
            │             │
     ┌──────▼─────────────▼──────┐
     │     Message Router        │
     │  (统一消息格式 + 路由)     │
     └────────────┬──────────────┘
                  │
     ┌────────────▼──────────────┐
     │      Agent Core           │
     │  (LLM + Tools + RAG)      │
     └────────────┬──────────────┘
                  │
     ┌────────────▼──────────────┐
     │    Response Dispatcher    │
     │  (回复到对应渠道)          │
     └───────────────────────────┘

关键设计原则:

  1. Channel Adapter 只做协议转换:Slack 的消息格式、Email 的 MIME 格式、CRM 的 API 格式,都在 Adapter 层转成统一的内部消息格式
  2. Agent Core 不知道消息来自哪里:它只处理标准化的输入,输出标准化的回复
  3. Response Dispatcher 负责回写:根据来源渠道,把回复转成对应格式发回去

实现细节

Step 1: 统一消息格式

所有渠道的消息都转成这个格式:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

class Channel(Enum):
    SLACK = "slack"
    EMAIL = "email"
    CRM = "crm"

@dataclass
class UnifiedMessage:
    """统一消息格式"""
    message_id: str              # 消息唯一 ID
    channel: Channel             # 来源渠道
    channel_id: str              # 渠道内的标识(Slack channel ID, email thread ID)
    sender_id: str               # 发送者 ID
    sender_name: str             # 发送者名字
    sender_email: str            # 发送者邮箱(用于 CRM 关联)
    content: str                 # 消息正文(纯文本)
    attachments: list[str] = field(default_factory=list)  # 附件 URL
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)  # 渠道特定元数据

@dataclass
class AgentResponse:
    """Agent 回复格式"""
    content: str
    source_message_id: str
    channel: Channel
    channel_id: str
    metadata: dict = field(default_factory=dict)

Step 2: Slack 接入

Slack 的接入通过 Bolt 框架 + Socket Mode(不需要公网 URL):

from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
import os

# 初始化 Slack App
slack_app = App(token=os.environ["SLACK_BOT_TOKEN"])

class SlackAdapter:
    """Slack 渠道适配器"""

    def __init__(self, agent_core, crm_adapter):
        self.agent = agent_core
        self.crm = crm_adapter

    def register_handlers(self, app: App):
        """注册 Slack 事件处理器"""

        # 处理 @mention 消息
        @app.event("app_mention")
        def handle_mention(event, say):
            self._process_message(event, say)

        # 处理 DM 消息
        @app.event("message")
        def handle_dm(event, say):
            # 只处理 DM,忽略 channel 消息(用 @mention)
            if event.get("channel_type") == "im":
                self._process_message(event, say)

    def _process_message(self, event: dict, say):
        """处理 Slack 消息"""
        # 忽略 bot 自己的消息
        if event.get("bot_id"):
            return

        # 转换为统一格式
        message = UnifiedMessage(
            message_id=event["ts"],
            channel=Channel.SLACK,
            channel_id=event["channel"],
            sender_id=event["user"],
            sender_name=self._get_user_name(event["user"]),
            sender_email=self._get_user_email(event["user"]),
            content=self._clean_mention(event["text"]),
            timestamp=datetime.fromtimestamp(float(event["ts"]))
        )

        # 发送 "正在思考" 的即时反馈
        say("正在处理您的问题,请稍候...", thread_ts=event["ts"])

        # 调用 Agent Core
        response = self.agent.handle(message)

        # 回复到 Slack(在原消息的 thread 里)
        say(
            text=response.content,
            thread_ts=event["ts"]
        )

        # 在 CRM 记录这次交互
        self.crm.log_interaction(message, response)

    def _clean_mention(self, text: str) -> str:
        """去掉 @mention 标记,只保留问题内容"""
        import re
        return re.sub(r"<@[A-Z0-9]+>", "", text).strip()

    def _get_user_email(self, user_id: str) -> str:
        """从 Slack API 获取用户邮箱(用于 CRM 关联)"""
        from slack_sdk import WebClient
        client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
        result = client.users_info(user=user_id)
        return result["user"]["profile"].get("email", "")

    def _get_user_name(self, user_id: str) -> str:
        from slack_sdk import WebClient
        client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
        result = client.users_info(user=user_id)
        return result["user"]["real_name"]

# 启动
adapter = SlackAdapter(agent_core, crm_adapter)
adapter.register_handlers(slack_app)
handler = SocketModeHandler(slack_app, os.environ["SLACK_APP_TOKEN"])
handler.start()

Slack 接入的坑:

  • Slack 要求 3 秒内响应 event,否则会重试(导致重复处理)。解决方案:先立刻回复 "正在处理",然后异步处理 Agent 逻辑
  • Slack 的消息格式有 mrkdwn(不是 Markdown),链接和粗体的语法不同

Step 3: 邮件接入

邮件接入用 Gmail API + Pub/Sub 做实时监听:

import base64
from email.mime.text import MIMEText
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

class EmailAdapter:
    """邮件渠道适配器"""

    def __init__(self, agent_core, crm_adapter):
        self.agent = agent_core
        self.crm = crm_adapter
        self.service = self._init_gmail_service()

    def _init_gmail_service(self):
        """初始化 Gmail API 客户端"""
        creds = Credentials.from_authorized_user_file("credentials/gmail_token.json")
        return build("gmail", "v1", credentials=creds)

    async def process_new_email(self, message_id: str):
        """处理新收到的邮件"""
        # 获取邮件内容
        msg = self.service.users().messages().get(
            userId="me", id=message_id, format="full"
        ).execute()

        headers = {h["name"]: h["value"] for h in msg["payload"]["headers"]}

        # 提取纯文本内容
        body = self._extract_body(msg["payload"])

        # 转换为统一格式
        message = UnifiedMessage(
            message_id=message_id,
            channel=Channel.EMAIL,
            channel_id=headers.get("Message-ID", message_id),
            sender_id=headers.get("From", ""),
            sender_name=self._extract_name(headers.get("From", "")),
            sender_email=self._extract_email(headers.get("From", "")),
            content=body,
            metadata={
                "subject": headers.get("Subject", ""),
                "thread_id": msg.get("threadId", ""),
                "in_reply_to": headers.get("In-Reply-To", "")
            }
        )

        # 调用 Agent
        response = self.agent.handle(message)

        # 发送回复邮件
        self._send_reply(message, response)

        # CRM 记录
        self.crm.log_interaction(message, response)

    def _send_reply(self, original: UnifiedMessage, response: AgentResponse):
        """回复邮件"""
        reply = MIMEText(response.content, "plain", "utf-8")
        reply["To"] = original.sender_email
        reply["Subject"] = f"Re: {original.metadata.get('subject', '')}"
        reply["In-Reply-To"] = original.metadata.get("thread_id", "")
        reply["References"] = original.metadata.get("thread_id", "")

        raw = base64.urlsafe_b64encode(reply.as_bytes()).decode()
        self.service.users().messages().send(
            userId="me",
            body={
                "raw": raw,
                "threadId": original.metadata.get("thread_id", "")
            }
        ).execute()

    def _extract_body(self, payload: dict) -> str:
        """从邮件 payload 中提取纯文本"""
        if payload.get("mimeType") == "text/plain":
            data = payload["body"].get("data", "")
            return base64.urlsafe_b64decode(data).decode("utf-8")

        # 多部分邮件,递归查找 text/plain
        for part in payload.get("parts", []):
            result = self._extract_body(part)
            if result:
                return result
        return ""

    @staticmethod
    def _extract_email(from_header: str) -> str:
        import re
        match = re.search(r"<(.+?)>", from_header)
        return match.group(1) if match else from_header

    @staticmethod
    def _extract_name(from_header: str) -> str:
        import re
        match = re.search(r"^(.+?)\s*<", from_header)
        return match.group(1).strip('" ') if match else ""

Step 4: CRM 集成(HubSpot)

CRM 集成有两个方向:从 CRM 读取客户数据(给 Agent 提供上下文),以及往 CRM 写入交互记录。

import httpx

class HubSpotAdapter:
    """HubSpot CRM 适配器"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.hubapi.com"

    async def get_customer_context(self, email: str) -> dict | None:
        """从 CRM 获取客户上下文(给 Agent 用)"""
        async with httpx.AsyncClient() as client:
            # 按邮箱搜索联系人
            resp = await client.post(
                f"{self.base_url}/crm/v3/objects/contacts/search",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "filterGroups": [{
                        "filters": [{
                            "propertyName": "email",
                            "operator": "EQ",
                            "value": email
                        }]
                    }],
                    "properties": [
                        "firstname", "lastname", "company",
                        "lifecyclestage", "hs_lead_status"
                    ]
                }
            )

            if resp.status_code != 200:
                return None

            results = resp.json().get("results", [])
            if not results:
                return None

            contact = results[0]["properties"]
            return {
                "name": f"{contact.get('firstname', '')} {contact.get('lastname', '')}",
                "company": contact.get("company", ""),
                "stage": contact.get("lifecyclestage", ""),
                "status": contact.get("hs_lead_status", "")
            }

    async def log_interaction(
        self, message: UnifiedMessage, response: AgentResponse
    ):
        """在 CRM 中记录交互"""
        async with httpx.AsyncClient() as client:
            # 查找或创建联系人
            contact_id = await self._find_or_create_contact(
                client, message.sender_email, message.sender_name
            )

            if not contact_id:
                return

            # 创建 note
            await client.post(
                f"{self.base_url}/crm/v3/objects/notes",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "properties": {
                        "hs_note_body": (
                            f"[AI Agent - {message.channel.value}]\n\n"
                            f"客户问题: {message.content[:500]}\n\n"
                            f"Agent 回复: {response.content[:500]}"
                        ),
                        "hs_timestamp": message.timestamp.isoformat()
                    },
                    "associations": [{
                        "to": {"id": contact_id},
                        "types": [{
                            "associationCategory": "HUBSPOT_DEFINED",
                            "associationTypeId": 202  # note-to-contact
                        }]
                    }]
                }
            )

    async def _find_or_create_contact(
        self, client: httpx.AsyncClient, email: str, name: str
    ) -> str | None:
        """查找联系人,不存在则创建"""
        # 搜索
        resp = await client.post(
            f"{self.base_url}/crm/v3/objects/contacts/search",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "filterGroups": [{
                    "filters": [{
                        "propertyName": "email",
                        "operator": "EQ",
                        "value": email
                    }]
                }]
            }
        )

        results = resp.json().get("results", [])
        if results:
            return results[0]["id"]

        # 创建
        parts = name.split(" ", 1)
        resp = await client.post(
            f"{self.base_url}/crm/v3/objects/contacts",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "properties": {
                    "email": email,
                    "firstname": parts[0] if parts else "",
                    "lastname": parts[1] if len(parts) > 1 else ""
                }
            }
        )

        if resp.status_code == 201:
            return resp.json()["id"]
        return None

Step 5: 消息路由器

class MessageRouter:
    """统一消息路由器"""

    def __init__(self, agent_core, crm: HubSpotAdapter):
        self.agent = agent_core
        self.crm = crm

    async def route(self, message: UnifiedMessage) -> AgentResponse:
        """路由消息到 Agent,附加 CRM 上下文"""

        # 从 CRM 获取客户上下文
        customer_context = await self.crm.get_customer_context(
            message.sender_email
        )

        # 给 Agent 注入客户上下文
        enriched_content = message.content
        if customer_context:
            enriched_content = (
                f"[客户信息: {customer_context['name']}, "
                f"公司: {customer_context['company']}, "
                f"阶段: {customer_context['stage']}]\n\n"
                f"客户问题: {message.content}"
            )

        # 调用 Agent
        agent_reply = await self.agent.handle(enriched_content)

        return AgentResponse(
            content=agent_reply,
            source_message_id=message.message_id,
            channel=message.channel,
            channel_id=message.channel_id
        )

实战经验

生产数据(运行 4 个月)

指标 Slack 邮件 CRM
月均交互量 2,100 850 200(主动触发)
平均响应时间 3.2s 45s(含邮件发送) 2.8s
自动解决率 74% 61% 82%
月 API 成本 $180 $95 $35

邮件的自动解决率低于 Slack,主要因为邮件内容更长、更正式、上下文更复杂。

踩过的坑

坑 1:Slack 的 3 秒超时是硬限制。 Slack 的 Events API 要求你在 3 秒内返回 200 OK,否则会重发事件。如果 Agent 处理需要 5 秒,你会收到重复事件。解决方案:立刻返回 200,把消息丢进队列异步处理。用 event ts 做去重。

坑 2:邮件线程的上下文管理。 Gmail 的 thread ID 可以把同一主题的邮件串起来。但如果客户换了主题继续聊,就变成了新 thread。Agent 丢失了之前的上下文。解决方案:在 CRM 里按客户 email 维护对话历史,不依赖邮件 thread。

坑 3:CRM 写入的 rate limit。 HubSpot API 的免费版限制 100 requests/10s。Agent 高峰期可以突破这个限制。解决方案:CRM 写入用队列异步处理,设置 rate limiter。交互记录延迟写入不影响用户体验。

坑 4:跨渠道客户识别。 同一个客户可能在 Slack 用公司邮箱,在邮件用个人邮箱。Agent 不知道是同一个人。解决方案:在 CRM 里做客户合并,用 Slack user ID + email 做联合匹配。

总结

三条核心 takeaway:

  1. Channel Adapter 要足够薄——Slack、邮件、CRM 的协议细节全部在 Adapter 层处理。Agent Core 只看到统一格式的消息。这样加一个新渠道(比如微信、WhatsApp)只需要写一个新 Adapter,不用改 Agent 逻辑。

  2. CRM 集成是价值放大器——Agent 读 CRM 数据来个性化回答("张总,您的企业版订阅下个月到期"),写 CRM 数据来留下完整记录。没有 CRM 集成的 Agent 就是一个高级 FAQ 机器人。

  3. 异步处理是多渠道 Agent 的必需品——Slack 有 3 秒超时,邮件处理可能要 10 秒,CRM 写入有 rate limit。所有耗时操作必须异步化。消息队列(Redis Queue 或 Celery)是必备组件。

如果你要给 Agent 接入业务工具,建议先接 Slack(最简单,反馈最快),跑通后再接邮件和 CRM。三个渠道的核心 Agent 逻辑是完全一样的,只有 Adapter 层不同。

你的 Agent 接入了哪些渠道?遇到了什么问题?欢迎交流。