Solo Unicorn Club logoSolo Unicorn
2,750 words

Integrating an AI Agent with Slack, Email, and CRM

AI AgentSlackEmail IntegrationCRMWebhookSystem IntegrationHands-On
Integrating an AI Agent with Slack, Email, and CRM

Integrating an AI Agent with Slack, Email, and CRM

Opening

I built an Agent for a B2B SaaS company that connects to Slack, Gmail, and HubSpot CRM simultaneously. A customer asks a product question in Slack, the Agent automatically pulls customer data from the CRM, searches the knowledge base for an answer, sends the reply back to Slack, and logs the conversation in the CRM. This system has been running for 4 months, handling over 12,000 interactions with an average response time of 3.2 seconds. This article lays out the integration approach for all three channels, the Webhook architecture, and every pitfall we hit along the way.

Problem Background

Most Agent tutorials assume the user interacts with the Agent through a web interface or API. But in enterprise settings, users (whether customers or internal employees) are already using Slack, email, and CRM every day. Asking them to switch to a new interface is unrealistic.

The Agent must go where the users are. That means you need to:

  1. Listen for messages across multiple channels
  2. Unify the processing logic (Agent behavior stays consistent regardless of channel)
  3. Send replies back to the originating channel
  4. Log all interactions in the CRM

The core architectural challenge: the channel adaptation layer must be thin enough, and the business logic must be centralized enough.

Core Architecture

Layered Design

   Slack         Gmail        HubSpot
     │             │             │
     ▼             ▼             ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Slack    │ │ Email    │ │ CRM      │
│ Adapter  │ │ Adapter  │ │ Adapter  │
└────┬─────┘ └────┬─────┘ └────┬─────┘
     │             │             │
     └──────┬──────┘──────┬──────┘
            │             │
     ┌──────▼─────────────▼──────┐
     │     Message Router        │
     │  (Unified format + routing)│
     └────────────┬──────────────┘
                  │
     ┌────────────▼──────────────┐
     │      Agent Core           │
     │  (LLM + Tools + RAG)      │
     └────────────┬──────────────┘
                  │
     ┌────────────▼──────────────┐
     │    Response Dispatcher    │
     │  (Reply to source channel) │
     └───────────────────────────┘

Key design principles:

  1. Channel Adapters only do protocol conversion: Slack's message format, Email's MIME format, CRM's API format — all converted to a unified internal message format in the Adapter layer
  2. Agent Core is channel-agnostic: It only processes standardized input and produces standardized output
  3. Response Dispatcher handles write-back: Routes the reply back in the appropriate format based on the source channel

Implementation Details

Step 1: Unified Message Format

All channel messages get converted to this format:

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum

class Channel(Enum):
    SLACK = "slack"
    EMAIL = "email"
    CRM = "crm"

@dataclass
class UnifiedMessage:
    """Unified message format"""
    message_id: str              # Unique message ID
    channel: Channel             # Source channel
    channel_id: str              # Channel-specific identifier (Slack channel ID, email thread ID)
    sender_id: str               # Sender ID
    sender_name: str             # Sender name
    sender_email: str            # Sender email (for CRM correlation)
    content: str                 # Message body (plain text)
    attachments: list[str] = field(default_factory=list)  # Attachment URLs
    timestamp: datetime = field(default_factory=datetime.now)
    metadata: dict = field(default_factory=dict)  # Channel-specific metadata

@dataclass
class AgentResponse:
    """Agent response format"""
    content: str
    source_message_id: str
    channel: Channel
    channel_id: str
    metadata: dict = field(default_factory=dict)

Step 2: Slack Integration

Slack integration uses the Bolt framework + Socket Mode (no public URL required):

from slack_bolt import App
from slack_bolt.adapter.socket_mode import SocketModeHandler
import os

# Initialize Slack App
slack_app = App(token=os.environ["SLACK_BOT_TOKEN"])

class SlackAdapter:
    """Slack channel adapter"""

    def __init__(self, agent_core, crm_adapter):
        self.agent = agent_core
        self.crm = crm_adapter

    def register_handlers(self, app: App):
        """Register Slack event handlers"""

        # Handle @mention messages
        @app.event("app_mention")
        def handle_mention(event, say):
            self._process_message(event, say)

        # Handle DM messages
        @app.event("message")
        def handle_dm(event, say):
            # Only process DMs, ignore channel messages (use @mention for those)
            if event.get("channel_type") == "im":
                self._process_message(event, say)

    def _process_message(self, event: dict, say):
        """Process a Slack message"""
        # Ignore the bot's own messages
        if event.get("bot_id"):
            return

        # Convert to unified format
        message = UnifiedMessage(
            message_id=event["ts"],
            channel=Channel.SLACK,
            channel_id=event["channel"],
            sender_id=event["user"],
            sender_name=self._get_user_name(event["user"]),
            sender_email=self._get_user_email(event["user"]),
            content=self._clean_mention(event["text"]),
            timestamp=datetime.fromtimestamp(float(event["ts"]))
        )

        # Send immediate "thinking" feedback
        say("Processing your question, please hold on...", thread_ts=event["ts"])

        # Call Agent Core
        response = self.agent.handle(message)

        # Reply in the original message's thread
        say(
            text=response.content,
            thread_ts=event["ts"]
        )

        # Log this interaction in CRM
        self.crm.log_interaction(message, response)

    def _clean_mention(self, text: str) -> str:
        """Strip @mention markup, keeping only the question content"""
        import re
        return re.sub(r"<@[A-Z0-9]+>", "", text).strip()

    def _get_user_email(self, user_id: str) -> str:
        """Get user email from Slack API (for CRM correlation)"""
        from slack_sdk import WebClient
        client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
        result = client.users_info(user=user_id)
        return result["user"]["profile"].get("email", "")

    def _get_user_name(self, user_id: str) -> str:
        from slack_sdk import WebClient
        client = WebClient(token=os.environ["SLACK_BOT_TOKEN"])
        result = client.users_info(user=user_id)
        return result["user"]["real_name"]

# Startup
adapter = SlackAdapter(agent_core, crm_adapter)
adapter.register_handlers(slack_app)
handler = SocketModeHandler(slack_app, os.environ["SLACK_APP_TOKEN"])
handler.start()

Slack integration pitfalls:

  • Slack requires a response within 3 seconds, otherwise it retries the event (causing duplicate processing). Solution: Reply immediately with "Processing...", then handle the Agent logic asynchronously
  • Slack's message format uses mrkdwn (not Markdown) — link and bold syntax differ

Step 3: Email Integration

Email integration uses Gmail API + Pub/Sub for real-time listening:

import base64
from email.mime.text import MIMEText
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build

class EmailAdapter:
    """Email channel adapter"""

    def __init__(self, agent_core, crm_adapter):
        self.agent = agent_core
        self.crm = crm_adapter
        self.service = self._init_gmail_service()

    def _init_gmail_service(self):
        """Initialize Gmail API client"""
        creds = Credentials.from_authorized_user_file("credentials/gmail_token.json")
        return build("gmail", "v1", credentials=creds)

    async def process_new_email(self, message_id: str):
        """Process a newly received email"""
        # Get the email content
        msg = self.service.users().messages().get(
            userId="me", id=message_id, format="full"
        ).execute()

        headers = {h["name"]: h["value"] for h in msg["payload"]["headers"]}

        # Extract plain text content
        body = self._extract_body(msg["payload"])

        # Convert to unified format
        message = UnifiedMessage(
            message_id=message_id,
            channel=Channel.EMAIL,
            channel_id=headers.get("Message-ID", message_id),
            sender_id=headers.get("From", ""),
            sender_name=self._extract_name(headers.get("From", "")),
            sender_email=self._extract_email(headers.get("From", "")),
            content=body,
            metadata={
                "subject": headers.get("Subject", ""),
                "thread_id": msg.get("threadId", ""),
                "in_reply_to": headers.get("In-Reply-To", "")
            }
        )

        # Call the Agent
        response = self.agent.handle(message)

        # Send reply email
        self._send_reply(message, response)

        # Log to CRM
        self.crm.log_interaction(message, response)

    def _send_reply(self, original: UnifiedMessage, response: AgentResponse):
        """Reply to an email"""
        reply = MIMEText(response.content, "plain", "utf-8")
        reply["To"] = original.sender_email
        reply["Subject"] = f"Re: {original.metadata.get('subject', '')}"
        reply["In-Reply-To"] = original.metadata.get("thread_id", "")
        reply["References"] = original.metadata.get("thread_id", "")

        raw = base64.urlsafe_b64encode(reply.as_bytes()).decode()
        self.service.users().messages().send(
            userId="me",
            body={
                "raw": raw,
                "threadId": original.metadata.get("thread_id", "")
            }
        ).execute()

    def _extract_body(self, payload: dict) -> str:
        """Extract plain text from the email payload"""
        if payload.get("mimeType") == "text/plain":
            data = payload["body"].get("data", "")
            return base64.urlsafe_b64decode(data).decode("utf-8")

        # Multi-part email — recursively search for text/plain
        for part in payload.get("parts", []):
            result = self._extract_body(part)
            if result:
                return result
        return ""

    @staticmethod
    def _extract_email(from_header: str) -> str:
        import re
        match = re.search(r"<(.+?)>", from_header)
        return match.group(1) if match else from_header

    @staticmethod
    def _extract_name(from_header: str) -> str:
        import re
        match = re.search(r"^(.+?)\s*<", from_header)
        return match.group(1).strip('" ') if match else ""

Step 4: CRM Integration (HubSpot)

CRM integration works in two directions: reading customer data from the CRM (to give the Agent context), and writing interaction records back to the CRM.

import httpx

class HubSpotAdapter:
    """HubSpot CRM adapter"""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.hubapi.com"

    async def get_customer_context(self, email: str) -> dict | None:
        """Get customer context from CRM (for the Agent to use)"""
        async with httpx.AsyncClient() as client:
            # Search for contact by email
            resp = await client.post(
                f"{self.base_url}/crm/v3/objects/contacts/search",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "filterGroups": [{
                        "filters": [{
                            "propertyName": "email",
                            "operator": "EQ",
                            "value": email
                        }]
                    }],
                    "properties": [
                        "firstname", "lastname", "company",
                        "lifecyclestage", "hs_lead_status"
                    ]
                }
            )

            if resp.status_code != 200:
                return None

            results = resp.json().get("results", [])
            if not results:
                return None

            contact = results[0]["properties"]
            return {
                "name": f"{contact.get('firstname', '')} {contact.get('lastname', '')}",
                "company": contact.get("company", ""),
                "stage": contact.get("lifecyclestage", ""),
                "status": contact.get("hs_lead_status", "")
            }

    async def log_interaction(
        self, message: UnifiedMessage, response: AgentResponse
    ):
        """Log an interaction in the CRM"""
        async with httpx.AsyncClient() as client:
            # Find or create contact
            contact_id = await self._find_or_create_contact(
                client, message.sender_email, message.sender_name
            )

            if not contact_id:
                return

            # Create a note
            await client.post(
                f"{self.base_url}/crm/v3/objects/notes",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "properties": {
                        "hs_note_body": (
                            f"[AI Agent - {message.channel.value}]\n\n"
                            f"Customer question: {message.content[:500]}\n\n"
                            f"Agent reply: {response.content[:500]}"
                        ),
                        "hs_timestamp": message.timestamp.isoformat()
                    },
                    "associations": [{
                        "to": {"id": contact_id},
                        "types": [{
                            "associationCategory": "HUBSPOT_DEFINED",
                            "associationTypeId": 202  # note-to-contact
                        }]
                    }]
                }
            )

    async def _find_or_create_contact(
        self, client: httpx.AsyncClient, email: str, name: str
    ) -> str | None:
        """Find a contact; create one if not found"""
        # Search
        resp = await client.post(
            f"{self.base_url}/crm/v3/objects/contacts/search",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "filterGroups": [{
                    "filters": [{
                        "propertyName": "email",
                        "operator": "EQ",
                        "value": email
                    }]
                }]
            }
        )

        results = resp.json().get("results", [])
        if results:
            return results[0]["id"]

        # Create
        parts = name.split(" ", 1)
        resp = await client.post(
            f"{self.base_url}/crm/v3/objects/contacts",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "properties": {
                    "email": email,
                    "firstname": parts[0] if parts else "",
                    "lastname": parts[1] if len(parts) > 1 else ""
                }
            }
        )

        if resp.status_code == 201:
            return resp.json()["id"]
        return None

Step 5: Message Router

class MessageRouter:
    """Unified message router"""

    def __init__(self, agent_core, crm: HubSpotAdapter):
        self.agent = agent_core
        self.crm = crm

    async def route(self, message: UnifiedMessage) -> AgentResponse:
        """Route a message to the Agent, enriched with CRM context"""

        # Get customer context from CRM
        customer_context = await self.crm.get_customer_context(
            message.sender_email
        )

        # Inject customer context for the Agent
        enriched_content = message.content
        if customer_context:
            enriched_content = (
                f"[Customer info: {customer_context['name']}, "
                f"Company: {customer_context['company']}, "
                f"Stage: {customer_context['stage']}]\n\n"
                f"Customer question: {message.content}"
            )

        # Call the Agent
        agent_reply = await self.agent.handle(enriched_content)

        return AgentResponse(
            content=agent_reply,
            source_message_id=message.message_id,
            channel=message.channel,
            channel_id=message.channel_id
        )

Lessons from the Field

Production Data (4 Months in Production)

Metric Slack Email CRM
Monthly interactions 2,100 850 200 (proactively triggered)
Avg response time 3.2s 45s (incl. email sending) 2.8s
Auto-resolution rate 74% 61% 82%
Monthly API cost $180 $95 $35

Email's auto-resolution rate is lower than Slack's, mainly because email content tends to be longer, more formal, and carries more complex context.

Pitfalls We Hit

Pitfall 1: Slack's 3-second timeout is a hard limit. Slack's Events API requires you to return a 200 OK within 3 seconds; otherwise it resends the event. If Agent processing takes 5 seconds, you get duplicate events. Solution: Return 200 immediately, push the message into a queue for async processing. Use the event timestamp for deduplication.

Pitfall 2: Email thread context management. Gmail's thread ID can link emails on the same subject. But if a customer changes the subject and continues chatting, it becomes a new thread and the Agent loses prior context. Solution: Maintain conversation history in the CRM by customer email, rather than relying on email threads.

Pitfall 3: CRM write rate limits. HubSpot API's free tier limits you to 100 requests per 10 seconds. During peak hours, the Agent can exceed this. Solution: Queue CRM writes for async processing with a rate limiter. Delayed interaction logging doesn't affect user experience.

Pitfall 4: Cross-channel customer identification. The same customer might use a company email on Slack and a personal email for email. The Agent doesn't know it's the same person. Solution: Merge customers in the CRM using a combined match on Slack user ID + email address.

Conclusion

Three core takeaways:

  1. Channel Adapters must be thin — All the protocol details of Slack, email, and CRM should be handled in the Adapter layer. Agent Core only sees uniformly formatted messages. This way, adding a new channel (say, WeChat or WhatsApp) only requires writing a new Adapter — no changes to Agent logic.

  2. CRM integration is a value multiplier — The Agent reads CRM data to personalize responses ("Hi Zhang, your Enterprise subscription expires next month"), and writes CRM data to maintain a complete record. An Agent without CRM integration is just a glorified FAQ bot.

  3. Async processing is a must for multi-channel Agents — Slack has a 3-second timeout, email processing can take 10 seconds, CRM writes have rate limits. All time-consuming operations must be async. A message queue (Redis Queue or Celery) is an essential component.

If you're looking to integrate an Agent with business tools, start with Slack (simplest, fastest feedback loop), get it working, then add email and CRM. The core Agent logic is identical across all three channels — only the Adapter layer differs.

Which channels have you integrated your Agent with? What issues did you run into? I'd love to hear about it.