第 63 章

n8n / Make / Zapier 零代码工作流集成:自动化场景与 Webhook 触发设计

第六十三章:与数据库集成:Text-to-SQL、数据分析与 BI 增强

63.1 数据库集成的核心价值

大多数企业的核心业务数据存储在关系型数据库中。但能够编写 SQL 查询的人员远比能够解读业务数据的人员少——产品经理、销售、运营人员每天都需要数据洞察,却不得不依赖数据团队的协助,形成了显著的效率瓶颈。

Claude 的 Text-to-SQL 能力打破了这一瓶颈:

本章涵盖三个递进的实现层次:基础 Text-to-SQL、带验证的安全 SQL 生成、以及完整的自然语言 BI 系统。

63.2 Text-to-SQL 基础实现

63.2.1 Schema 注入策略

Text-to-SQL 的核心是将数据库 Schema 注入到 Claude 的上下文中。Schema 的质量直接决定生成 SQL 的准确性。

import anthropic
import sqlite3
import psycopg2
from typing import Optional

client = anthropic.Anthropic()

def get_schema_description(conn, db_type: str = "postgresql") -> str:
    """从数据库提取 Schema 描述(包含表结构、字段注释、外键关系)"""
    
    if db_type == "postgresql":
        # 获取所有表的结构信息
        cursor = conn.cursor()
        
        # 获取表和列信息
        cursor.execute("""
            SELECT 
                t.table_name,
                c.column_name,
                c.data_type,
                c.is_nullable,
                c.column_default,
                pgd.description as column_comment
            FROM information_schema.tables t
            JOIN information_schema.columns c ON t.table_name = c.table_name
            LEFT JOIN pg_catalog.pg_statio_all_tables st ON st.relname = t.table_name
            LEFT JOIN pg_catalog.pg_description pgd ON pgd.objoid = st.relid 
                AND pgd.objsubid = c.ordinal_position
            WHERE t.table_schema = 'public'
            ORDER BY t.table_name, c.ordinal_position
        """)
        
        columns = cursor.fetchall()
        
        # 获取外键关系
        cursor.execute("""
            SELECT
                tc.table_name,
                kcu.column_name,
                ccu.table_name AS foreign_table_name,
                ccu.column_name AS foreign_column_name
            FROM information_schema.table_constraints AS tc
            JOIN information_schema.key_column_usage AS kcu
                ON tc.constraint_name = kcu.constraint_name
            JOIN information_schema.constraint_column_usage AS ccu
                ON ccu.constraint_name = tc.constraint_name
            WHERE tc.constraint_type = 'FOREIGN KEY'
        """)
        
        fk_rows = cursor.fetchall()
        
    # 构建 Schema 描述
    tables = {}
    for row in columns:
        table_name = row[0]
        if table_name not in tables:
            tables[table_name] = {"columns": [], "foreign_keys": []}
        nullable = "NULL" if row[3] == "YES" else "NOT NULL"
        comment = f"  -- {row[5]}" if row[5] else ""
        tables[table_name]["columns"].append(
            f"  {row[1]} {row[2].upper()} {nullable}{comment}"
        )
    
    for fk in fk_rows:
        tables[fk[0]]["foreign_keys"].append(
            f"  FOREIGN KEY ({fk[1]}) REFERENCES {fk[2]}({fk[3]})"
        )
    
    schema_parts = []
    for table_name, info in tables.items():
        schema_parts.append(f"-- 表:{table_name}")
        schema_parts.append(f"CREATE TABLE {table_name} (")
        schema_parts.extend(info["columns"])
        if info["foreign_keys"]:
            schema_parts.extend(info["foreign_keys"])
        schema_parts.append(");\n")
    
    return "\n".join(schema_parts)


def get_sample_data(conn, table_name: str, limit: int = 3) -> str:
    """获取表的样例数据,帮助 Claude 理解数据格式"""
    cursor = conn.cursor()
    cursor.execute(f"SELECT * FROM {table_name} LIMIT {limit}")
    rows = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]
    
    if not rows:
        return f"-- {table_name} 表为空"
    
    result = f"-- {table_name} 样例数据:\n"
    result += "-- " + " | ".join(columns) + "\n"
    for row in rows:
        result += "-- " + " | ".join(str(v) for v in row) + "\n"
    return result

63.2.2 Text-to-SQL Prompt 模板

TEXT_TO_SQL_SYSTEM = """你是一个精确的 SQL 查询生成专家。

## 你的职责
将用户的自然语言问题转化为正确的 SQL 查询。

## 规则
1. 只生成 SELECT 查询,严禁生成 INSERT/UPDATE/DELETE/DROP/CREATE 等修改操作
2. 使用参数化查询占位符($1, $2...)处理用户输入的值,防止 SQL 注入
3. 对于可能返回大量数据的查询,自动添加 LIMIT 100
4. 如果问题无法用当前 Schema 回答,明确说明原因
5. 在生成 SQL 前,简要说明你的查询思路

## 输出格式
```sql
-- 查询思路:[简要说明]
SELECT ...
FROM ...
WHERE ...

如果无法生成 SQL,输出:

无法生成:[原因]

"""

def text_to_sql( question: str, schema: str, sample_data: Optional[str] = None, dialect: str = "PostgreSQL" ) -> dict: """将自然语言问题转化为 SQL 查询"""

user_content = f"""## 数据库 Schema({dialect})

{schema}

{f"## 样例数据\n{sample_data}" if sample_data else ""}

用户问题

{question}

请生成对应的 SQL 查询。"""

response = client.messages.create(
    model="claude-opus-4-5",
    max_tokens=1024,
    system=TEXT_TO_SQL_SYSTEM,
    messages=[{"role": "user", "content": user_content}]
)

reply = response.content[0].text

# 提取 SQL 代码块
import re
sql_match = re.search(r'```sql\n(.*?)\n```', reply, re.DOTALL)

if sql_match:
    sql = sql_match.group(1).strip()
    return {"success": True, "sql": sql, "explanation": reply}
elif "无法生成" in reply:
    return {"success": False, "reason": reply}
else:
    return {"success": False, "reason": "未能提取 SQL 语句", "raw": reply}

## 63.3 SQL 安全验证层

直接执行 Claude 生成的 SQL 存在风险。必须在执行前进行多层验证。

### 63.3.1 SQL 验证器

```python
import re
from typing import Tuple

class SQLValidator:
    """SQL 安全验证器:确保只执行只读查询"""
    
    # 危险关键字黑名单
    DANGEROUS_KEYWORDS = [
        r'\bINSERT\b', r'\bUPDATE\b', r'\bDELETE\b', r'\bDROP\b',
        r'\bCREATE\b', r'\bALTER\b', r'\bTRUNCATE\b', r'\bEXEC\b',
        r'\bEXECUTE\b', r'\bGRANT\b', r'\bREVOKE\b', r'\bXP_\w+',
        r'--.*$',  # 行内注释(可能用于注释掉 WHERE 子句)
    ]
    
    # 允许的 SELECT 模式
    ALLOWED_PATTERN = re.compile(
        r'^\s*(--[^\n]*\n\s*)*SELECT\s',
        re.IGNORECASE | re.MULTILINE
    )
    
    @classmethod
    def validate(cls, sql: str) -> Tuple[bool, str]:
        """
        验证 SQL 是否安全
        返回 (is_safe, reason)
        """
        # 检查是否以 SELECT 开头(忽略注释)
        sql_no_comments = re.sub(r'--[^\n]*', '', sql)  # 移除单行注释
        sql_no_comments = re.sub(r'/\*.*?\*/', '', sql_no_comments, flags=re.DOTALL)  # 移除多行注释
        sql_stripped = sql_no_comments.strip()
        
        if not re.match(r'^SELECT\b', sql_stripped, re.IGNORECASE):
            return False, f"查询不以 SELECT 开头(实际:{sql_stripped[:50]})"
        
        # 检查危险关键字
        for pattern in cls.DANGEROUS_KEYWORDS:
            if re.search(pattern, sql, re.IGNORECASE | re.MULTILINE):
                return False, f"包含危险关键字:{pattern}"
        
        # 检查分号(可能存在多语句注入)
        # 允许字符串内的分号,但不允许语句分隔符
        statements = [s.strip() for s in sql.split(';') if s.strip()]
        if len(statements) > 1:
            return False, "包含多条语句(可能是 SQL 注入)"
        
        return True, "验证通过"
    
    @classmethod
    def extract_table_names(cls, sql: str) -> list[str]:
        """提取 SQL 中引用的表名"""
        # 匹配 FROM 和 JOIN 后面的表名
        pattern = r'(?:FROM|JOIN)\s+([a-zA-Z_][a-zA-Z0-9_]*)'
        return re.findall(pattern, sql, re.IGNORECASE)


class SafeSQLExecutor:
    """安全的 SQL 执行器"""
    
    def __init__(self, conn, allowed_tables: Optional[list] = None, max_rows: int = 1000):
        self.conn = conn
        self.allowed_tables = allowed_tables  # None = 允许所有表
        self.max_rows = max_rows
    
    def execute(self, sql: str) -> dict:
        """执行经过验证的 SQL"""
        
        # 安全验证
        is_safe, reason = SQLValidator.validate(sql)
        if not is_safe:
            return {"success": False, "error": f"SQL 安全验证失败:{reason}"}
        
        # 表权限验证
        if self.allowed_tables:
            referenced_tables = SQLValidator.extract_table_names(sql)
            unauthorized = [t for t in referenced_tables if t not in self.allowed_tables]
            if unauthorized:
                return {"success": False, "error": f"无权访问表:{unauthorized}"}
        
        # 强制 LIMIT(防止意外的全表扫描)
        sql_with_limit = self._enforce_limit(sql)
        
        try:
            cursor = self.conn.cursor()
            cursor.execute(sql_with_limit)
            rows = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            
            return {
                "success": True,
                "columns": columns,
                "rows": rows,
                "row_count": len(rows),
                "truncated": len(rows) == self.max_rows
            }
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def _enforce_limit(self, sql: str) -> str:
        """确保查询有 LIMIT 子句"""
        if re.search(r'\bLIMIT\b', sql, re.IGNORECASE):
            return sql
        return f"{sql.rstrip(';')} LIMIT {self.max_rows}"

63.4 完整的自然语言 BI 系统

将 Text-to-SQL、安全执行、结果解读集成为完整的 BI 问答系统。

63.4.1 BI 问答系统

import pandas as pd
from io import StringIO

class NaturalLanguageBISystem:
    """自然语言 BI 系统:问题 → SQL → 执行 → 解读"""
    
    def __init__(self, conn, schema: str, allowed_tables: Optional[list] = None):
        self.conn = conn
        self.schema = schema
        self.executor = SafeSQLExecutor(conn, allowed_tables)
        self.client = anthropic.Anthropic()
    
    def ask(self, question: str, explain_results: bool = True) -> dict:
        """完整的问答流程"""
        
        # Step 1: Text-to-SQL
        sql_result = text_to_sql(question, self.schema)
        
        if not sql_result["success"]:
            return {
                "question": question,
                "error": sql_result.get("reason", "SQL 生成失败"),
                "stage": "sql_generation"
            }
        
        sql = sql_result["sql"]
        
        # Step 2: 安全执行
        exec_result = self.executor.execute(sql)
        
        if not exec_result["success"]:
            # SQL 执行失败,尝试让 Claude 修正
            corrected = self._try_fix_sql(sql, exec_result["error"], question)
            if corrected:
                sql = corrected
                exec_result = self.executor.execute(sql)
            
            if not exec_result["success"]:
                return {
                    "question": question,
                    "sql": sql,
                    "error": exec_result["error"],
                    "stage": "sql_execution"
                }
        
        # Step 3: 结果解读(可选)
        columns = exec_result["columns"]
        rows = exec_result["rows"]
        
        if not explain_results or not rows:
            return {
                "question": question,
                "sql": sql,
                "columns": columns,
                "rows": rows,
                "row_count": exec_result["row_count"],
                "truncated": exec_result.get("truncated", False)
            }
        
        # 将结果转为 Markdown 表格供 Claude 解读
        df = pd.DataFrame(rows, columns=columns)
        table_str = df.head(20).to_markdown(index=False)  # 最多 20 行
        
        explanation_response = self.client.messages.create(
            model="claude-haiku-4-5",  # 解读任务用便宜模型
            max_tokens=512,
            messages=[{
                "role": "user",
                "content": f"""用户问题:{question}

查询结果(共 {exec_result['row_count']} 行{', 已截断至前20行' if exec_result.get('truncated') else ''}):
{table_str}

请用 2-3 句话解读这个结果,直接回答用户的问题。如果数据有值得注意的趋势或异常,也请指出。"""
            }]
        )
        
        return {
            "question": question,
            "sql": sql,
            "columns": columns,
            "rows": rows,
            "row_count": exec_result["row_count"],
            "truncated": exec_result.get("truncated", False),
            "interpretation": explanation_response.content[0].text
        }
    
    def _try_fix_sql(self, sql: str, error: str, original_question: str) -> Optional[str]:
        """尝试修正执行失败的 SQL"""
        fix_response = self.client.messages.create(
            model="claude-opus-4-5",
            max_tokens=512,
            messages=[{
                "role": "user",
                "content": f"""以下 SQL 执行时出错,请修正:

原始问题:{original_question}

错误的 SQL:
```sql
{sql}

错误信息:{error}

Schema: {self.schema}

请只输出修正后的 SQL(```sql 代码块格式),不要解释:""" }] )

    reply = fix_response.content[0].text
    sql_match = re.search(r'```sql\n(.*?)\n```', reply, re.DOTALL)
    return sql_match.group(1).strip() if sql_match else None

使用示例

def demo_bi_system(): conn = psycopg2.connect("postgresql://user:pass@localhost/salesdb") schema = get_schema_description(conn)

bi = NaturalLanguageBISystem(
    conn=conn,
    schema=schema,
    allowed_tables=["orders", "customers", "products", "order_items"]
)

questions = [
    "上个月销售额最高的前 5 个产品是哪些?",
    "各省份的客户数量分布如何?",
    "平均订单金额超过 500 元的客户有多少?",
    "最近 7 天每天的订单数量趋势是怎样的?"
]

for question in questions:
    print(f"\n问题:{question}")
    result = bi.ask(question)
    
    if result.get("error"):
        print(f"错误:{result['error']}")
    else:
        print(f"SQL:{result['sql']}")
        print(f"行数:{result['row_count']}")
        if result.get("interpretation"):
            print(f"解读:{result['interpretation']}")

## 63.5 多步分析:复杂商业智能问题

对于需要多步骤查询的复杂分析问题,可以让 Claude 分解任务。

```python
def multi_step_analysis(question: str, bi_system: NaturalLanguageBISystem) -> dict:
    """
    多步骤分析:让 Claude 将复杂问题分解为多个子查询
    """
    
    # Step 1:分解问题
    decompose_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=512,
        messages=[{
            "role": "user",
            "content": f"""以下业务分析问题可能需要多个 SQL 查询步骤来回答。
请将其分解为 2-4 个子问题,每个子问题都可以用单一 SQL 查询回答。

分析问题:{question}

数据库包含的表:orders(订单)、customers(客户)、products(产品)

输出格式(JSON 数组):
[
  {{"step": 1, "sub_question": "子问题1"}},
  {{"step": 2, "sub_question": "子问题2", "depends_on": 1}}
]"""
        }]
    )
    
    import json
    steps_text = decompose_response.content[0].text
    
    # 提取 JSON
    json_match = re.search(r'\[[\s\S]*\]', steps_text)
    if not json_match:
        # 无法分解,直接尝试单步查询
        return bi_system.ask(question)
    
    steps = json.loads(json_match.group(0))
    
    # Step 2:逐步执行
    step_results = {}
    for step in steps:
        step_num = step["step"]
        sub_question = step["sub_question"]
        
        result = bi_system.ask(sub_question, explain_results=False)
        step_results[step_num] = {
            "question": sub_question,
            "result": result
        }
    
    # Step 3:综合分析
    results_summary = "\n\n".join([
        f"步骤 {k}: {v['question']}\n结果:{v['result'].get('rows', [])[:5]}"
        for k, v in step_results.items()
    ])
    
    final_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"""原始问题:{question}

各步骤查询结果:
{results_summary}

请综合以上数据,给出完整的分析报告(300字以内):"""
        }]
    )
    
    return {
        "question": question,
        "steps": step_results,
        "final_analysis": final_response.content[0].text
    }

63.6 Schema 优化:让 Text-to-SQL 更准确

Schema 的质量对 Text-to-SQL 准确率有决定性影响。以下是优化建议:

63.6.1 为表和字段添加注释

-- 在 PostgreSQL 中为表添加注释
COMMENT ON TABLE orders IS '订单主表,记录每笔交易的基本信息';
COMMENT ON COLUMN orders.status IS '订单状态:pending=待支付, paid=已支付, shipped=已发货, completed=已完成, cancelled=已取消';
COMMENT ON COLUMN orders.total_amount IS '订单总金额(人民币分)';

-- Claude 生成 Schema 时会包含这些注释,大幅提升 SQL 准确率

63.6.2 提供查询示例(Few-Shot)

FEW_SHOT_EXAMPLES = """
## 查询示例

示例1:问"上个月销售额最高的产品"
```sql
SELECT p.name, SUM(oi.quantity * oi.unit_price) as revenue
FROM order_items oi
JOIN products p ON oi.product_id = p.id
JOIN orders o ON oi.order_id = o.id
WHERE o.created_at >= DATE_TRUNC('month', NOW() - INTERVAL '1 month')
  AND o.created_at < DATE_TRUNC('month', NOW())
  AND o.status = 'completed'
GROUP BY p.id, p.name
ORDER BY revenue DESC
LIMIT 10

示例2:问"每个城市的活跃用户数"

SELECT 
    c.city,
    COUNT(DISTINCT c.id) as active_users
FROM customers c
JOIN orders o ON c.id = o.customer_id
WHERE o.created_at >= NOW() - INTERVAL '30 days'
GROUP BY c.city
ORDER BY active_users DESC

"""

在 Text-to-SQL prompt 中加入示例

def text_to_sql_with_examples(question: str, schema: str) -> dict: user_content = f"""## Schema\n{schema}\n\n{FEW_SHOT_EXAMPLES}\n\n## 用户问题\n{question}""" # ... 调用 Claude


## 63.7 与 BI 工具集成

### 63.7.1 作为 Metabase/Superset 的 AI 层

```python
from flask import Flask, request, jsonify

app = Flask(__name__)
bi_system = None  # 初始化 BI 系统

@app.route("/api/nl-query", methods=["POST"])
def nl_query():
    """自然语言查询 REST API,可被 BI 工具调用"""
    data = request.json
    question = data.get("question")
    
    if not question:
        return jsonify({"error": "missing question"}), 400
    
    result = bi_system.ask(question)
    
    return jsonify({
        "question": question,
        "sql": result.get("sql"),
        "data": {
            "columns": result.get("columns", []),
            "rows": [list(row) for row in result.get("rows", [])]
        },
        "interpretation": result.get("interpretation"),
        "error": result.get("error")
    })

小结

Text-to-SQL 系统的质量取决于三个核心要素:Schema 注入的完整性(表注释、字段注释、外键关系、样例数据)、SQL 安全验证层(只读强制、参数化、多语句检测),以及结果解读层(Claude 对查询结果的自然语言诠释)。生产系统中必须实现 SQL 验证器防止注入风险,通过表级权限控制限制数据访问范围,并在 SQL 执行失败时提供自动修正机制。完整的 BI 问答系统将 Text-to-SQL、安全执行、结果解读串联成流水线,是让非技术人员直接获取数据洞察的高效路径。

本章评分
4.8  / 5  (3 评分)

💬 留言讨论