n8n / Make / Zapier 零代码工作流集成:自动化场景与 Webhook 触发设计
第六十三章:与数据库集成:Text-to-SQL、数据分析与 BI 增强
63.1 数据库集成的核心价值
大多数企业的核心业务数据存储在关系型数据库中。但能够编写 SQL 查询的人员远比能够解读业务数据的人员少——产品经理、销售、运营人员每天都需要数据洞察,却不得不依赖数据团队的协助,形成了显著的效率瓶颈。
Claude 的 Text-to-SQL 能力打破了这一瓶颈:
- 业务人员可以用自然语言描述需求,直接获得查询结果
- 数据分析师可以用 Claude 快速起草复杂查询,减少调试时间
- BI 团队可以用 Claude 增强报表工具,实现智能问答层
本章涵盖三个递进的实现层次:基础 Text-to-SQL、带验证的安全 SQL 生成、以及完整的自然语言 BI 系统。
63.2 Text-to-SQL 基础实现
63.2.1 Schema 注入策略
Text-to-SQL 的核心是将数据库 Schema 注入到 Claude 的上下文中。Schema 的质量直接决定生成 SQL 的准确性。
import anthropic
import sqlite3
import psycopg2
from typing import Optional
client = anthropic.Anthropic()
def get_schema_description(conn, db_type: str = "postgresql") -> str:
"""从数据库提取 Schema 描述(包含表结构、字段注释、外键关系)"""
if db_type == "postgresql":
# 获取所有表的结构信息
cursor = conn.cursor()
# 获取表和列信息
cursor.execute("""
SELECT
t.table_name,
c.column_name,
c.data_type,
c.is_nullable,
c.column_default,
pgd.description as column_comment
FROM information_schema.tables t
JOIN information_schema.columns c ON t.table_name = c.table_name
LEFT JOIN pg_catalog.pg_statio_all_tables st ON st.relname = t.table_name
LEFT JOIN pg_catalog.pg_description pgd ON pgd.objoid = st.relid
AND pgd.objsubid = c.ordinal_position
WHERE t.table_schema = 'public'
ORDER BY t.table_name, c.ordinal_position
""")
columns = cursor.fetchall()
# 获取外键关系
cursor.execute("""
SELECT
tc.table_name,
kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
""")
fk_rows = cursor.fetchall()
# 构建 Schema 描述
tables = {}
for row in columns:
table_name = row[0]
if table_name not in tables:
tables[table_name] = {"columns": [], "foreign_keys": []}
nullable = "NULL" if row[3] == "YES" else "NOT NULL"
comment = f" -- {row[5]}" if row[5] else ""
tables[table_name]["columns"].append(
f" {row[1]} {row[2].upper()} {nullable}{comment}"
)
for fk in fk_rows:
tables[fk[0]]["foreign_keys"].append(
f" FOREIGN KEY ({fk[1]}) REFERENCES {fk[2]}({fk[3]})"
)
schema_parts = []
for table_name, info in tables.items():
schema_parts.append(f"-- 表:{table_name}")
schema_parts.append(f"CREATE TABLE {table_name} (")
schema_parts.extend(info["columns"])
if info["foreign_keys"]:
schema_parts.extend(info["foreign_keys"])
schema_parts.append(");\n")
return "\n".join(schema_parts)
def get_sample_data(conn, table_name: str, limit: int = 3) -> str:
"""获取表的样例数据,帮助 Claude 理解数据格式"""
cursor = conn.cursor()
cursor.execute(f"SELECT * FROM {table_name} LIMIT {limit}")
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
if not rows:
return f"-- {table_name} 表为空"
result = f"-- {table_name} 样例数据:\n"
result += "-- " + " | ".join(columns) + "\n"
for row in rows:
result += "-- " + " | ".join(str(v) for v in row) + "\n"
return result
63.2.2 Text-to-SQL Prompt 模板
TEXT_TO_SQL_SYSTEM = """你是一个精确的 SQL 查询生成专家。
## 你的职责
将用户的自然语言问题转化为正确的 SQL 查询。
## 规则
1. 只生成 SELECT 查询,严禁生成 INSERT/UPDATE/DELETE/DROP/CREATE 等修改操作
2. 使用参数化查询占位符($1, $2...)处理用户输入的值,防止 SQL 注入
3. 对于可能返回大量数据的查询,自动添加 LIMIT 100
4. 如果问题无法用当前 Schema 回答,明确说明原因
5. 在生成 SQL 前,简要说明你的查询思路
## 输出格式
```sql
-- 查询思路:[简要说明]
SELECT ...
FROM ...
WHERE ...
如果无法生成 SQL,输出:
无法生成:[原因]
"""
def text_to_sql( question: str, schema: str, sample_data: Optional[str] = None, dialect: str = "PostgreSQL" ) -> dict: """将自然语言问题转化为 SQL 查询"""
user_content = f"""## 数据库 Schema({dialect})
{schema}
{f"## 样例数据\n{sample_data}" if sample_data else ""}
用户问题
{question}
请生成对应的 SQL 查询。"""
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
system=TEXT_TO_SQL_SYSTEM,
messages=[{"role": "user", "content": user_content}]
)
reply = response.content[0].text
# 提取 SQL 代码块
import re
sql_match = re.search(r'```sql\n(.*?)\n```', reply, re.DOTALL)
if sql_match:
sql = sql_match.group(1).strip()
return {"success": True, "sql": sql, "explanation": reply}
elif "无法生成" in reply:
return {"success": False, "reason": reply}
else:
return {"success": False, "reason": "未能提取 SQL 语句", "raw": reply}
## 63.3 SQL 安全验证层
直接执行 Claude 生成的 SQL 存在风险。必须在执行前进行多层验证。
### 63.3.1 SQL 验证器
```python
import re
from typing import Tuple
class SQLValidator:
"""SQL 安全验证器:确保只执行只读查询"""
# 危险关键字黑名单
DANGEROUS_KEYWORDS = [
r'\bINSERT\b', r'\bUPDATE\b', r'\bDELETE\b', r'\bDROP\b',
r'\bCREATE\b', r'\bALTER\b', r'\bTRUNCATE\b', r'\bEXEC\b',
r'\bEXECUTE\b', r'\bGRANT\b', r'\bREVOKE\b', r'\bXP_\w+',
r'--.*$', # 行内注释(可能用于注释掉 WHERE 子句)
]
# 允许的 SELECT 模式
ALLOWED_PATTERN = re.compile(
r'^\s*(--[^\n]*\n\s*)*SELECT\s',
re.IGNORECASE | re.MULTILINE
)
@classmethod
def validate(cls, sql: str) -> Tuple[bool, str]:
"""
验证 SQL 是否安全
返回 (is_safe, reason)
"""
# 检查是否以 SELECT 开头(忽略注释)
sql_no_comments = re.sub(r'--[^\n]*', '', sql) # 移除单行注释
sql_no_comments = re.sub(r'/\*.*?\*/', '', sql_no_comments, flags=re.DOTALL) # 移除多行注释
sql_stripped = sql_no_comments.strip()
if not re.match(r'^SELECT\b', sql_stripped, re.IGNORECASE):
return False, f"查询不以 SELECT 开头(实际:{sql_stripped[:50]})"
# 检查危险关键字
for pattern in cls.DANGEROUS_KEYWORDS:
if re.search(pattern, sql, re.IGNORECASE | re.MULTILINE):
return False, f"包含危险关键字:{pattern}"
# 检查分号(可能存在多语句注入)
# 允许字符串内的分号,但不允许语句分隔符
statements = [s.strip() for s in sql.split(';') if s.strip()]
if len(statements) > 1:
return False, "包含多条语句(可能是 SQL 注入)"
return True, "验证通过"
@classmethod
def extract_table_names(cls, sql: str) -> list[str]:
"""提取 SQL 中引用的表名"""
# 匹配 FROM 和 JOIN 后面的表名
pattern = r'(?:FROM|JOIN)\s+([a-zA-Z_][a-zA-Z0-9_]*)'
return re.findall(pattern, sql, re.IGNORECASE)
class SafeSQLExecutor:
"""安全的 SQL 执行器"""
def __init__(self, conn, allowed_tables: Optional[list] = None, max_rows: int = 1000):
self.conn = conn
self.allowed_tables = allowed_tables # None = 允许所有表
self.max_rows = max_rows
def execute(self, sql: str) -> dict:
"""执行经过验证的 SQL"""
# 安全验证
is_safe, reason = SQLValidator.validate(sql)
if not is_safe:
return {"success": False, "error": f"SQL 安全验证失败:{reason}"}
# 表权限验证
if self.allowed_tables:
referenced_tables = SQLValidator.extract_table_names(sql)
unauthorized = [t for t in referenced_tables if t not in self.allowed_tables]
if unauthorized:
return {"success": False, "error": f"无权访问表:{unauthorized}"}
# 强制 LIMIT(防止意外的全表扫描)
sql_with_limit = self._enforce_limit(sql)
try:
cursor = self.conn.cursor()
cursor.execute(sql_with_limit)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows),
"truncated": len(rows) == self.max_rows
}
except Exception as e:
return {"success": False, "error": str(e)}
def _enforce_limit(self, sql: str) -> str:
"""确保查询有 LIMIT 子句"""
if re.search(r'\bLIMIT\b', sql, re.IGNORECASE):
return sql
return f"{sql.rstrip(';')} LIMIT {self.max_rows}"
63.4 完整的自然语言 BI 系统
将 Text-to-SQL、安全执行、结果解读集成为完整的 BI 问答系统。
63.4.1 BI 问答系统
import pandas as pd
from io import StringIO
class NaturalLanguageBISystem:
"""自然语言 BI 系统:问题 → SQL → 执行 → 解读"""
def __init__(self, conn, schema: str, allowed_tables: Optional[list] = None):
self.conn = conn
self.schema = schema
self.executor = SafeSQLExecutor(conn, allowed_tables)
self.client = anthropic.Anthropic()
def ask(self, question: str, explain_results: bool = True) -> dict:
"""完整的问答流程"""
# Step 1: Text-to-SQL
sql_result = text_to_sql(question, self.schema)
if not sql_result["success"]:
return {
"question": question,
"error": sql_result.get("reason", "SQL 生成失败"),
"stage": "sql_generation"
}
sql = sql_result["sql"]
# Step 2: 安全执行
exec_result = self.executor.execute(sql)
if not exec_result["success"]:
# SQL 执行失败,尝试让 Claude 修正
corrected = self._try_fix_sql(sql, exec_result["error"], question)
if corrected:
sql = corrected
exec_result = self.executor.execute(sql)
if not exec_result["success"]:
return {
"question": question,
"sql": sql,
"error": exec_result["error"],
"stage": "sql_execution"
}
# Step 3: 结果解读(可选)
columns = exec_result["columns"]
rows = exec_result["rows"]
if not explain_results or not rows:
return {
"question": question,
"sql": sql,
"columns": columns,
"rows": rows,
"row_count": exec_result["row_count"],
"truncated": exec_result.get("truncated", False)
}
# 将结果转为 Markdown 表格供 Claude 解读
df = pd.DataFrame(rows, columns=columns)
table_str = df.head(20).to_markdown(index=False) # 最多 20 行
explanation_response = self.client.messages.create(
model="claude-haiku-4-5", # 解读任务用便宜模型
max_tokens=512,
messages=[{
"role": "user",
"content": f"""用户问题:{question}
查询结果(共 {exec_result['row_count']} 行{', 已截断至前20行' if exec_result.get('truncated') else ''}):
{table_str}
请用 2-3 句话解读这个结果,直接回答用户的问题。如果数据有值得注意的趋势或异常,也请指出。"""
}]
)
return {
"question": question,
"sql": sql,
"columns": columns,
"rows": rows,
"row_count": exec_result["row_count"],
"truncated": exec_result.get("truncated", False),
"interpretation": explanation_response.content[0].text
}
def _try_fix_sql(self, sql: str, error: str, original_question: str) -> Optional[str]:
"""尝试修正执行失败的 SQL"""
fix_response = self.client.messages.create(
model="claude-opus-4-5",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""以下 SQL 执行时出错,请修正:
原始问题:{original_question}
错误的 SQL:
```sql
{sql}
错误信息:{error}
Schema: {self.schema}
请只输出修正后的 SQL(```sql 代码块格式),不要解释:""" }] )
reply = fix_response.content[0].text
sql_match = re.search(r'```sql\n(.*?)\n```', reply, re.DOTALL)
return sql_match.group(1).strip() if sql_match else None
使用示例
def demo_bi_system(): conn = psycopg2.connect("postgresql://user:pass@localhost/salesdb") schema = get_schema_description(conn)
bi = NaturalLanguageBISystem(
conn=conn,
schema=schema,
allowed_tables=["orders", "customers", "products", "order_items"]
)
questions = [
"上个月销售额最高的前 5 个产品是哪些?",
"各省份的客户数量分布如何?",
"平均订单金额超过 500 元的客户有多少?",
"最近 7 天每天的订单数量趋势是怎样的?"
]
for question in questions:
print(f"\n问题:{question}")
result = bi.ask(question)
if result.get("error"):
print(f"错误:{result['error']}")
else:
print(f"SQL:{result['sql']}")
print(f"行数:{result['row_count']}")
if result.get("interpretation"):
print(f"解读:{result['interpretation']}")
## 63.5 多步分析:复杂商业智能问题
对于需要多步骤查询的复杂分析问题,可以让 Claude 分解任务。
```python
def multi_step_analysis(question: str, bi_system: NaturalLanguageBISystem) -> dict:
"""
多步骤分析:让 Claude 将复杂问题分解为多个子查询
"""
# Step 1:分解问题
decompose_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""以下业务分析问题可能需要多个 SQL 查询步骤来回答。
请将其分解为 2-4 个子问题,每个子问题都可以用单一 SQL 查询回答。
分析问题:{question}
数据库包含的表:orders(订单)、customers(客户)、products(产品)
输出格式(JSON 数组):
[
{{"step": 1, "sub_question": "子问题1"}},
{{"step": 2, "sub_question": "子问题2", "depends_on": 1}}
]"""
}]
)
import json
steps_text = decompose_response.content[0].text
# 提取 JSON
json_match = re.search(r'\[[\s\S]*\]', steps_text)
if not json_match:
# 无法分解,直接尝试单步查询
return bi_system.ask(question)
steps = json.loads(json_match.group(0))
# Step 2:逐步执行
step_results = {}
for step in steps:
step_num = step["step"]
sub_question = step["sub_question"]
result = bi_system.ask(sub_question, explain_results=False)
step_results[step_num] = {
"question": sub_question,
"result": result
}
# Step 3:综合分析
results_summary = "\n\n".join([
f"步骤 {k}: {v['question']}\n结果:{v['result'].get('rows', [])[:5]}"
for k, v in step_results.items()
])
final_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"""原始问题:{question}
各步骤查询结果:
{results_summary}
请综合以上数据,给出完整的分析报告(300字以内):"""
}]
)
return {
"question": question,
"steps": step_results,
"final_analysis": final_response.content[0].text
}
63.6 Schema 优化:让 Text-to-SQL 更准确
Schema 的质量对 Text-to-SQL 准确率有决定性影响。以下是优化建议:
63.6.1 为表和字段添加注释
-- 在 PostgreSQL 中为表添加注释
COMMENT ON TABLE orders IS '订单主表,记录每笔交易的基本信息';
COMMENT ON COLUMN orders.status IS '订单状态:pending=待支付, paid=已支付, shipped=已发货, completed=已完成, cancelled=已取消';
COMMENT ON COLUMN orders.total_amount IS '订单总金额(人民币分)';
-- Claude 生成 Schema 时会包含这些注释,大幅提升 SQL 准确率
63.6.2 提供查询示例(Few-Shot)
FEW_SHOT_EXAMPLES = """
## 查询示例
示例1:问"上个月销售额最高的产品"
```sql
SELECT p.name, SUM(oi.quantity * oi.unit_price) as revenue
FROM order_items oi
JOIN products p ON oi.product_id = p.id
JOIN orders o ON oi.order_id = o.id
WHERE o.created_at >= DATE_TRUNC('month', NOW() - INTERVAL '1 month')
AND o.created_at < DATE_TRUNC('month', NOW())
AND o.status = 'completed'
GROUP BY p.id, p.name
ORDER BY revenue DESC
LIMIT 10
示例2:问"每个城市的活跃用户数"
SELECT
c.city,
COUNT(DISTINCT c.id) as active_users
FROM customers c
JOIN orders o ON c.id = o.customer_id
WHERE o.created_at >= NOW() - INTERVAL '30 days'
GROUP BY c.city
ORDER BY active_users DESC
"""
在 Text-to-SQL prompt 中加入示例
def text_to_sql_with_examples(question: str, schema: str) -> dict: user_content = f"""## Schema\n{schema}\n\n{FEW_SHOT_EXAMPLES}\n\n## 用户问题\n{question}""" # ... 调用 Claude
## 63.7 与 BI 工具集成
### 63.7.1 作为 Metabase/Superset 的 AI 层
```python
from flask import Flask, request, jsonify
app = Flask(__name__)
bi_system = None # 初始化 BI 系统
@app.route("/api/nl-query", methods=["POST"])
def nl_query():
"""自然语言查询 REST API,可被 BI 工具调用"""
data = request.json
question = data.get("question")
if not question:
return jsonify({"error": "missing question"}), 400
result = bi_system.ask(question)
return jsonify({
"question": question,
"sql": result.get("sql"),
"data": {
"columns": result.get("columns", []),
"rows": [list(row) for row in result.get("rows", [])]
},
"interpretation": result.get("interpretation"),
"error": result.get("error")
})
小结
Text-to-SQL 系统的质量取决于三个核心要素:Schema 注入的完整性(表注释、字段注释、外键关系、样例数据)、SQL 安全验证层(只读强制、参数化、多语句检测),以及结果解读层(Claude 对查询结果的自然语言诠释)。生产系统中必须实现 SQL 验证器防止注入风险,通过表级权限控制限制数据访问范围,并在 SQL 执行失败时提供自动修正机制。完整的 BI 问答系统将 Text-to-SQL、安全执行、结果解读串联成流水线,是让非技术人员直接获取数据洞察的高效路径。