Solo Unicorn Club logoSolo Unicorn
2,780

怎么把 AI Agent 接入你的公司数据 — 手把手教程

AI AgentRAG数据接入向量数据库知识库实战
怎么把 AI Agent 接入你的公司数据 — 手把手教程

怎么把 AI Agent 接入你的公司数据 — 手把手教程

开场

我帮三家公司搭过 AI Agent 数据接入层,发现一个规律:90% 的 Agent 项目卡壳不是因为模型不够好,而是数据接不上。你的 Agent 能力上限不是 LLM 的能力上限,而是它能访问到的数据的质量和范围。这篇文章把数据接入的四种方式——数据库直连、API 集成、文件系统爬取、RAG 知识库——全部拆解给你看。

问题背景

一个典型的企业 AI Agent 需要访问以下数据源:

  • 结构化数据:PostgreSQL / MySQL 里的客户表、订单表、产品表
  • 半结构化数据:Notion 文档、Confluence 页面、Google Docs
  • 非结构化数据:PDF 合同、Slack 聊天记录、邮件附件
  • 实时数据:API 返回的库存信息、价格数据、用户行为

大多数教程只讲 RAG(把文档切块 → 向量化 → 检索),但实际生产中,RAG 只是四种数据接入方式之一。盲目把所有数据都塞进 RAG,会导致检索质量下降和成本飙升。

核心架构

设计原则

  1. 按数据类型选方案:结构化数据用 SQL 查询,不要硬塞 RAG
  2. 最小权限:Agent 只能访问它需要的数据,用只读账号 + 行级权限
  3. 缓存优先:频繁查询的数据做本地缓存,降低延迟和成本

四种接入方式

                    ┌─────────────────┐
                    │   AI Agent      │
                    │  (Claude API)   │
                    └───────┬─────────┘
                            │
        ┌───────────────────┼───────────────────┐
        │                   │                   │
   ┌────▼────┐      ┌──────▼──────┐     ┌──────▼──────┐
   │ SQL 查询 │      │  API 集成   │     │  RAG 检索   │
   │ (结构化) │      │ (实时数据)  │     │(非结构化)   │
   └────┬────┘      └──────┬──────┘     └──────┬──────┘
        │                   │                   │
   ┌────▼────┐      ┌──────▼──────┐     ┌──────▼──────┐
   │PostgreSQL│      │ 外部 API    │     │ Vector DB   │
   │  MySQL   │      │ (Stripe等)  │     │ (Qdrant)    │
   └─────────┘      └─────────────┘     └─────────────┘

实现细节

方案一:SQL 直连(结构化数据)

这是最直接也最容易出问题的方式。核心挑战:怎么让 LLM 生成安全且正确的 SQL 查询。

import anthropic
import psycopg2
from typing import Any

# 数据库连接(使用只读账号)
def get_db_connection():
    return psycopg2.connect(
        host="localhost",
        database="company_db",
        user="agent_readonly",  # 只读权限
        password="****"
    )

# 获取数据库 schema 信息(给 Agent 提供上下文)
def get_schema_context() -> str:
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute("""
        SELECT table_name, column_name, data_type
        FROM information_schema.columns
        WHERE table_schema = 'public'
        ORDER BY table_name, ordinal_position
    """)
    schema = cursor.fetchall()
    conn.close()

    # 格式化为 Agent 可理解的文本
    schema_text = "数据库表结构:\n"
    current_table = ""
    for table, column, dtype in schema:
        if table != current_table:
            schema_text += f"\n表: {table}\n"
            current_table = table
        schema_text += f"  - {column} ({dtype})\n"

    return schema_text

# Agent 生成 SQL 并执行
def query_with_agent(question: str) -> dict[str, Any]:
    client = anthropic.Anthropic()
    schema = get_schema_context()

    # 第一步:让 Agent 生成 SQL
    response = client.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=1024,
        system=f"""你是一个 SQL 生成器。
根据用户问题和数据库 schema 生成 PostgreSQL 查询。
只输出 SQL 语句,不要解释。
只能生成 SELECT 语句,禁止 INSERT/UPDATE/DELETE。

{schema}""",
        messages=[{"role": "user", "content": question}]
    )

    sql = response.content[0].text.strip()

    # 安全检查:只允许 SELECT
    if not sql.upper().startswith("SELECT"):
        raise ValueError(f"非法 SQL 操作: {sql[:50]}")

    # 执行查询
    conn = get_db_connection()
    cursor = conn.cursor()
    cursor.execute(sql)
    results = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]
    conn.close()

    return {"sql": sql, "columns": columns, "rows": results}

关键安全措施:

  • 数据库账号只有 SELECT 权限
  • 代码层面再做一次 SQL 关键字白名单检查
  • 设置查询超时(防止生成了全表扫描的慢查询)
  • 限制返回行数(LIMIT 100

方案二:API 集成(实时数据)

API 集成的核心是 Tool Use。Claude API 原生支持 tool calling,你定义好工具的 schema,模型会决定何时调用:

import anthropic
import httpx

client = anthropic.Anthropic()

# 定义 Agent 可用的工具
tools = [
    {
        "name": "get_customer_info",
        "description": "从 CRM 获取客户详细信息",
        "input_schema": {
            "type": "object",
            "properties": {
                "customer_id": {
                    "type": "string",
                    "description": "客户 ID"
                }
            },
            "required": ["customer_id"]
        }
    },
    {
        "name": "get_order_status",
        "description": "查询订单状态",
        "input_schema": {
            "type": "object",
            "properties": {
                "order_id": {
                    "type": "string",
                    "description": "订单号"
                }
            },
            "required": ["order_id"]
        }
    }
]

# 工具执行函数
def execute_tool(name: str, params: dict) -> str:
    if name == "get_customer_info":
        resp = httpx.get(
            f"https://api.crm.com/v1/customers/{params['customer_id']}",
            headers={"Authorization": f"Bearer {CRM_API_KEY}"}
        )
        return resp.text
    elif name == "get_order_status":
        resp = httpx.get(
            f"https://api.orders.com/v1/orders/{params['order_id']}",
            headers={"Authorization": f"Bearer {ORDER_API_KEY}"}
        )
        return resp.text

# Agent 对话循环(含 tool use)
def agent_chat(user_message: str):
    messages = [{"role": "user", "content": user_message}]

    while True:
        response = client.messages.create(
            model="claude-sonnet-4-5-20250514",
            max_tokens=4096,
            tools=tools,
            messages=messages
        )

        # 如果模型决定调用工具
        if response.stop_reason == "tool_use":
            tool_block = next(
                b for b in response.content if b.type == "tool_use"
            )
            result = execute_tool(tool_block.name, tool_block.input)

            # 把工具结果反馈给模型
            messages.append({"role": "assistant", "content": response.content})
            messages.append({
                "role": "user",
                "content": [{
                    "type": "tool_result",
                    "tool_use_id": tool_block.id,
                    "content": result
                }]
            })
        else:
            # 模型直接回复,返回结果
            return response.content[0].text

方案三:RAG 知识库(非结构化数据)

RAG 适用于 PDF、文档、知识库等非结构化数据。2026 年的最佳实践已经和两年前不同,核心改进在于分块策略和混合检索。

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import anthropic
import hashlib

# 初始化 Qdrant 向量数据库
qdrant = QdrantClient(url="http://localhost:6333")
claude = anthropic.Anthropic()

# 创建集合
qdrant.create_collection(
    collection_name="company_docs",
    vectors_config=VectorParams(
        size=1024,      # embedding 维度
        distance=Distance.COSINE
    )
)

# 文档分块(语义分块,不是固定长度)
def semantic_chunk(text: str, max_tokens: int = 400) -> list[str]:
    """按段落和语义边界分块,而非固定字符数"""
    paragraphs = text.split("\n\n")
    chunks = []
    current_chunk = ""

    for para in paragraphs:
        # 估算 token 数(中文约 1.5 字/token)
        estimated_tokens = len(current_chunk) / 1.5
        if estimated_tokens + len(para) / 1.5 > max_tokens:
            if current_chunk:
                chunks.append(current_chunk.strip())
            current_chunk = para
        else:
            current_chunk += "\n\n" + para

    if current_chunk:
        chunks.append(current_chunk.strip())

    return chunks

# 生成 embedding(用 Voyage AI 或 OpenAI)
def get_embedding(text: str) -> list[float]:
    """调用 embedding API 获取向量"""
    import httpx
    resp = httpx.post(
        "https://api.voyageai.com/v1/embeddings",
        headers={"Authorization": f"Bearer {VOYAGE_API_KEY}"},
        json={
            "model": "voyage-3-large",
            "input": [text],
            "input_type": "document"
        }
    )
    return resp.json()["data"][0]["embedding"]

# 索引文档
def index_document(doc_path: str, doc_text: str):
    chunks = semantic_chunk(doc_text)
    points = []

    for i, chunk in enumerate(chunks):
        embedding = get_embedding(chunk)
        point_id = hashlib.md5(f"{doc_path}:{i}".encode()).hexdigest()
        points.append(PointStruct(
            id=point_id,
            vector=embedding,
            payload={
                "text": chunk,
                "source": doc_path,
                "chunk_index": i
            }
        ))

    qdrant.upsert(collection_name="company_docs", points=points)

# RAG 检索 + 生成
def rag_query(question: str) -> str:
    # 检索相关文档
    query_embedding = get_embedding(question)
    results = qdrant.search(
        collection_name="company_docs",
        query_vector=query_embedding,
        limit=5  # 返回 top 5
    )

    # 拼接上下文
    context = "\n\n---\n\n".join([
        f"来源: {r.payload['source']}\n{r.payload['text']}"
        for r in results
    ])

    # 调用 Claude 生成回答
    response = claude.messages.create(
        model="claude-sonnet-4-5-20250514",
        max_tokens=2048,
        system="""基于提供的文档内容回答问题。
如果文档中没有相关信息,明确说明"文档中未找到相关信息"。
回答时引用具体来源。""",
        messages=[{
            "role": "user",
            "content": f"文档内容:\n{context}\n\n问题:{question}"
        }]
    )

    return response.content[0].text

2026 年 RAG 最佳实践:

  1. 分块大小:300-500 token 是平衡点。太小丢失上下文,太大引入噪音
  2. Embedding 模型选择:Voyage AI voyage-3-large 在中英文混合场景表现优秀,维度 1024
  3. 混合检索:关键词搜索 (BM25) + 向量搜索结合,召回率提升约 15%
  4. 定期重建索引:源文档更新后必须重新分块和 embedding,否则检索质量会衰减

方案四:文件系统爬取

对于本地文件(产品手册 PDF、合同文档),可以用 LlamaIndex 或自建爬取器:

from pathlib import Path

# 支持的文件格式
SUPPORTED_EXTENSIONS = {".pdf", ".md", ".txt", ".docx", ".csv"}

def crawl_directory(root_dir: str) -> list[dict]:
    """递归爬取目录下所有支持的文件"""
    documents = []
    root = Path(root_dir)

    for file_path in root.rglob("*"):
        if file_path.suffix.lower() in SUPPORTED_EXTENSIONS:
            text = extract_text(file_path)  # 根据格式调用对应解析器
            documents.append({
                "path": str(file_path),
                "text": text,
                "modified": file_path.stat().st_mtime
            })

    return documents

实战经验

生产数据

我在一个电商公司的客服 Agent 项目中,同时使用了 SQL 直连 + API 集成 + RAG 三种方式:

数据源 接入方式 查询延迟 月成本
订单数据库 SQL 直连 120ms $0(自有基础设施)
物流 API Tool Use 350ms $50(API 调用费)
产品说明文档 RAG (Qdrant) 280ms $30(embedding + 存储)
客服话术库 RAG (Qdrant) 250ms 含在上面

总 RAG 知识库:1,200 个文档,42,000 个 chunk,向量维度 1024。

踩过的坑

坑 1:结构化数据不要走 RAG。 我最初把订单数据也做了 embedding 塞进向量库,查询 "用户 A 的最近 5 笔订单" 这种精确查询,RAG 的召回率不到 60%。换成 SQL 直连后,准确率 100%。

坑 2:embedding 模型和 LLM 要分开选。 不要因为用了 Claude 做生成就也用 Claude 做 embedding。专用 embedding 模型(如 Voyage AI)在检索任务上的表现远优于通用 LLM。

坑 3:文档更新后忘记重建索引。 产品价格更新了,但向量库里还是旧数据。Agent 用旧价格回答客户,造成了客诉。解决方案:设置定时任务,每天凌晨重新爬取和索引。

总结

三条核心 takeaway:

  1. 按数据类型选接入方式——结构化数据用 SQL,实时数据用 API Tool Use,非结构化文档用 RAG。不要把什么都塞进 RAG。

  2. 安全第一——数据库只读账号、API 最小权限、查询结果脱敏。Agent 有能力执行的操作范围决定了出事故时的影响范围。

  3. RAG 不是万能药——它解决的是 "在大量文档中找到相关信息" 的问题,但对于精确查询、实时数据、聚合计算,SQL 和 API 更合适。

如果你正在给 Agent 接入数据,从最简单的一种数据源开始——比如用 Tool Use 接一个 API——验证端到端的流程,然后再扩展到多数据源。

你的 Agent 项目中,数据接入遇到过哪些问题?欢迎讨论。