怎么把 AI Agent 接入你的公司数据 — 手把手教程

怎么把 AI Agent 接入你的公司数据 — 手把手教程
开场
我帮三家公司搭过 AI Agent 数据接入层,发现一个规律:90% 的 Agent 项目卡壳不是因为模型不够好,而是数据接不上。你的 Agent 能力上限不是 LLM 的能力上限,而是它能访问到的数据的质量和范围。这篇文章把数据接入的四种方式——数据库直连、API 集成、文件系统爬取、RAG 知识库——全部拆解给你看。
问题背景
一个典型的企业 AI Agent 需要访问以下数据源:
- 结构化数据:PostgreSQL / MySQL 里的客户表、订单表、产品表
- 半结构化数据:Notion 文档、Confluence 页面、Google Docs
- 非结构化数据:PDF 合同、Slack 聊天记录、邮件附件
- 实时数据:API 返回的库存信息、价格数据、用户行为
大多数教程只讲 RAG(把文档切块 → 向量化 → 检索),但实际生产中,RAG 只是四种数据接入方式之一。盲目把所有数据都塞进 RAG,会导致检索质量下降和成本飙升。
核心架构
设计原则
- 按数据类型选方案:结构化数据用 SQL 查询,不要硬塞 RAG
- 最小权限:Agent 只能访问它需要的数据,用只读账号 + 行级权限
- 缓存优先:频繁查询的数据做本地缓存,降低延迟和成本
四种接入方式
┌─────────────────┐
│ AI Agent │
│ (Claude API) │
└───────┬─────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌────▼────┐ ┌──────▼──────┐ ┌──────▼──────┐
│ SQL 查询 │ │ API 集成 │ │ RAG 检索 │
│ (结构化) │ │ (实时数据) │ │(非结构化) │
└────┬────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
┌────▼────┐ ┌──────▼──────┐ ┌──────▼──────┐
│PostgreSQL│ │ 外部 API │ │ Vector DB │
│ MySQL │ │ (Stripe等) │ │ (Qdrant) │
└─────────┘ └─────────────┘ └─────────────┘
实现细节
方案一:SQL 直连(结构化数据)
这是最直接也最容易出问题的方式。核心挑战:怎么让 LLM 生成安全且正确的 SQL 查询。
import anthropic
import psycopg2
from typing import Any
# 数据库连接(使用只读账号)
def get_db_connection():
return psycopg2.connect(
host="localhost",
database="company_db",
user="agent_readonly", # 只读权限
password="****"
)
# 获取数据库 schema 信息(给 Agent 提供上下文)
def get_schema_context() -> str:
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
SELECT table_name, column_name, data_type
FROM information_schema.columns
WHERE table_schema = 'public'
ORDER BY table_name, ordinal_position
""")
schema = cursor.fetchall()
conn.close()
# 格式化为 Agent 可理解的文本
schema_text = "数据库表结构:\n"
current_table = ""
for table, column, dtype in schema:
if table != current_table:
schema_text += f"\n表: {table}\n"
current_table = table
schema_text += f" - {column} ({dtype})\n"
return schema_text
# Agent 生成 SQL 并执行
def query_with_agent(question: str) -> dict[str, Any]:
client = anthropic.Anthropic()
schema = get_schema_context()
# 第一步:让 Agent 生成 SQL
response = client.messages.create(
model="claude-sonnet-4-5-20250514",
max_tokens=1024,
system=f"""你是一个 SQL 生成器。
根据用户问题和数据库 schema 生成 PostgreSQL 查询。
只输出 SQL 语句,不要解释。
只能生成 SELECT 语句,禁止 INSERT/UPDATE/DELETE。
{schema}""",
messages=[{"role": "user", "content": question}]
)
sql = response.content[0].text.strip()
# 安全检查:只允许 SELECT
if not sql.upper().startswith("SELECT"):
raise ValueError(f"非法 SQL 操作: {sql[:50]}")
# 执行查询
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute(sql)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
conn.close()
return {"sql": sql, "columns": columns, "rows": results}
关键安全措施:
- 数据库账号只有 SELECT 权限
- 代码层面再做一次 SQL 关键字白名单检查
- 设置查询超时(防止生成了全表扫描的慢查询)
- 限制返回行数(
LIMIT 100)
方案二:API 集成(实时数据)
API 集成的核心是 Tool Use。Claude API 原生支持 tool calling,你定义好工具的 schema,模型会决定何时调用:
import anthropic
import httpx
client = anthropic.Anthropic()
# 定义 Agent 可用的工具
tools = [
{
"name": "get_customer_info",
"description": "从 CRM 获取客户详细信息",
"input_schema": {
"type": "object",
"properties": {
"customer_id": {
"type": "string",
"description": "客户 ID"
}
},
"required": ["customer_id"]
}
},
{
"name": "get_order_status",
"description": "查询订单状态",
"input_schema": {
"type": "object",
"properties": {
"order_id": {
"type": "string",
"description": "订单号"
}
},
"required": ["order_id"]
}
}
]
# 工具执行函数
def execute_tool(name: str, params: dict) -> str:
if name == "get_customer_info":
resp = httpx.get(
f"https://api.crm.com/v1/customers/{params['customer_id']}",
headers={"Authorization": f"Bearer {CRM_API_KEY}"}
)
return resp.text
elif name == "get_order_status":
resp = httpx.get(
f"https://api.orders.com/v1/orders/{params['order_id']}",
headers={"Authorization": f"Bearer {ORDER_API_KEY}"}
)
return resp.text
# Agent 对话循环(含 tool use)
def agent_chat(user_message: str):
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-sonnet-4-5-20250514",
max_tokens=4096,
tools=tools,
messages=messages
)
# 如果模型决定调用工具
if response.stop_reason == "tool_use":
tool_block = next(
b for b in response.content if b.type == "tool_use"
)
result = execute_tool(tool_block.name, tool_block.input)
# 把工具结果反馈给模型
messages.append({"role": "assistant", "content": response.content})
messages.append({
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": tool_block.id,
"content": result
}]
})
else:
# 模型直接回复,返回结果
return response.content[0].text
方案三:RAG 知识库(非结构化数据)
RAG 适用于 PDF、文档、知识库等非结构化数据。2026 年的最佳实践已经和两年前不同,核心改进在于分块策略和混合检索。
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import anthropic
import hashlib
# 初始化 Qdrant 向量数据库
qdrant = QdrantClient(url="http://localhost:6333")
claude = anthropic.Anthropic()
# 创建集合
qdrant.create_collection(
collection_name="company_docs",
vectors_config=VectorParams(
size=1024, # embedding 维度
distance=Distance.COSINE
)
)
# 文档分块(语义分块,不是固定长度)
def semantic_chunk(text: str, max_tokens: int = 400) -> list[str]:
"""按段落和语义边界分块,而非固定字符数"""
paragraphs = text.split("\n\n")
chunks = []
current_chunk = ""
for para in paragraphs:
# 估算 token 数(中文约 1.5 字/token)
estimated_tokens = len(current_chunk) / 1.5
if estimated_tokens + len(para) / 1.5 > max_tokens:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = para
else:
current_chunk += "\n\n" + para
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
# 生成 embedding(用 Voyage AI 或 OpenAI)
def get_embedding(text: str) -> list[float]:
"""调用 embedding API 获取向量"""
import httpx
resp = httpx.post(
"https://api.voyageai.com/v1/embeddings",
headers={"Authorization": f"Bearer {VOYAGE_API_KEY}"},
json={
"model": "voyage-3-large",
"input": [text],
"input_type": "document"
}
)
return resp.json()["data"][0]["embedding"]
# 索引文档
def index_document(doc_path: str, doc_text: str):
chunks = semantic_chunk(doc_text)
points = []
for i, chunk in enumerate(chunks):
embedding = get_embedding(chunk)
point_id = hashlib.md5(f"{doc_path}:{i}".encode()).hexdigest()
points.append(PointStruct(
id=point_id,
vector=embedding,
payload={
"text": chunk,
"source": doc_path,
"chunk_index": i
}
))
qdrant.upsert(collection_name="company_docs", points=points)
# RAG 检索 + 生成
def rag_query(question: str) -> str:
# 检索相关文档
query_embedding = get_embedding(question)
results = qdrant.search(
collection_name="company_docs",
query_vector=query_embedding,
limit=5 # 返回 top 5
)
# 拼接上下文
context = "\n\n---\n\n".join([
f"来源: {r.payload['source']}\n{r.payload['text']}"
for r in results
])
# 调用 Claude 生成回答
response = claude.messages.create(
model="claude-sonnet-4-5-20250514",
max_tokens=2048,
system="""基于提供的文档内容回答问题。
如果文档中没有相关信息,明确说明"文档中未找到相关信息"。
回答时引用具体来源。""",
messages=[{
"role": "user",
"content": f"文档内容:\n{context}\n\n问题:{question}"
}]
)
return response.content[0].text
2026 年 RAG 最佳实践:
- 分块大小:300-500 token 是平衡点。太小丢失上下文,太大引入噪音
- Embedding 模型选择:Voyage AI voyage-3-large 在中英文混合场景表现优秀,维度 1024
- 混合检索:关键词搜索 (BM25) + 向量搜索结合,召回率提升约 15%
- 定期重建索引:源文档更新后必须重新分块和 embedding,否则检索质量会衰减
方案四:文件系统爬取
对于本地文件(产品手册 PDF、合同文档),可以用 LlamaIndex 或自建爬取器:
from pathlib import Path
# 支持的文件格式
SUPPORTED_EXTENSIONS = {".pdf", ".md", ".txt", ".docx", ".csv"}
def crawl_directory(root_dir: str) -> list[dict]:
"""递归爬取目录下所有支持的文件"""
documents = []
root = Path(root_dir)
for file_path in root.rglob("*"):
if file_path.suffix.lower() in SUPPORTED_EXTENSIONS:
text = extract_text(file_path) # 根据格式调用对应解析器
documents.append({
"path": str(file_path),
"text": text,
"modified": file_path.stat().st_mtime
})
return documents
实战经验
生产数据
我在一个电商公司的客服 Agent 项目中,同时使用了 SQL 直连 + API 集成 + RAG 三种方式:
| 数据源 | 接入方式 | 查询延迟 | 月成本 |
|---|---|---|---|
| 订单数据库 | SQL 直连 | 120ms | $0(自有基础设施) |
| 物流 API | Tool Use | 350ms | $50(API 调用费) |
| 产品说明文档 | RAG (Qdrant) | 280ms | $30(embedding + 存储) |
| 客服话术库 | RAG (Qdrant) | 250ms | 含在上面 |
总 RAG 知识库:1,200 个文档,42,000 个 chunk,向量维度 1024。
踩过的坑
坑 1:结构化数据不要走 RAG。 我最初把订单数据也做了 embedding 塞进向量库,查询 "用户 A 的最近 5 笔订单" 这种精确查询,RAG 的召回率不到 60%。换成 SQL 直连后,准确率 100%。
坑 2:embedding 模型和 LLM 要分开选。 不要因为用了 Claude 做生成就也用 Claude 做 embedding。专用 embedding 模型(如 Voyage AI)在检索任务上的表现远优于通用 LLM。
坑 3:文档更新后忘记重建索引。 产品价格更新了,但向量库里还是旧数据。Agent 用旧价格回答客户,造成了客诉。解决方案:设置定时任务,每天凌晨重新爬取和索引。
总结
三条核心 takeaway:
-
按数据类型选接入方式——结构化数据用 SQL,实时数据用 API Tool Use,非结构化文档用 RAG。不要把什么都塞进 RAG。
-
安全第一——数据库只读账号、API 最小权限、查询结果脱敏。Agent 有能力执行的操作范围决定了出事故时的影响范围。
-
RAG 不是万能药——它解决的是 "在大量文档中找到相关信息" 的问题,但对于精确查询、实时数据、聚合计算,SQL 和 API 更合适。
如果你正在给 Agent 接入数据,从最简单的一种数据源开始——比如用 Tool Use 接一个 API——验证端到端的流程,然后再扩展到多数据源。
你的 Agent 项目中,数据接入遇到过哪些问题?欢迎讨论。