Chapter 63

n8n / Make / Zapier Zero-Code Workflow Integration: Automation Scenarios and Webhook Trigger Design

Chapter 63: Database Integration: Text-to-SQL, Data Analysis, and BI Enhancement

63.1 The Core Value of Database Integration

Most enterprises store core business data in relational databases. But people who can write SQL queries are far fewer than those who need data insights—product managers, sales, and operations staff need data every day but must rely on data teams, creating a significant efficiency bottleneck.

Claude's Text-to-SQL capability breaks this bottleneck:

This chapter covers three progressive implementation levels: basic Text-to-SQL, safe SQL generation with validation, and a complete natural language BI system.

63.2 Basic Text-to-SQL Implementation

63.2.1 Schema Injection Strategy

The core of Text-to-SQL is injecting the database schema into Claude's context. Schema quality directly determines the accuracy of generated SQL.

import anthropic
import psycopg2
from typing import Optional
import re

client = anthropic.Anthropic()

def get_schema_description(conn, db_type: str = "postgresql") -> str:
    """Extract schema description from the database (tables, column comments, foreign keys)"""
    
    cursor = conn.cursor()
    
    cursor.execute("""
        SELECT 
            t.table_name,
            c.column_name,
            c.data_type,
            c.is_nullable,
            pgd.description as column_comment
        FROM information_schema.tables t
        JOIN information_schema.columns c ON t.table_name = c.table_name
        LEFT JOIN pg_catalog.pg_statio_all_tables st ON st.relname = t.table_name
        LEFT JOIN pg_catalog.pg_description pgd ON pgd.objoid = st.relid 
            AND pgd.objsubid = c.ordinal_position
        WHERE t.table_schema = 'public'
        ORDER BY t.table_name, c.ordinal_position
    """)
    columns = cursor.fetchall()
    
    cursor.execute("""
        SELECT tc.table_name, kcu.column_name,
               ccu.table_name AS foreign_table_name,
               ccu.column_name AS foreign_column_name
        FROM information_schema.table_constraints AS tc
        JOIN information_schema.key_column_usage AS kcu
            ON tc.constraint_name = kcu.constraint_name
        JOIN information_schema.constraint_column_usage AS ccu
            ON ccu.constraint_name = tc.constraint_name
        WHERE tc.constraint_type = 'FOREIGN KEY'
    """)
    fk_rows = cursor.fetchall()
    
    tables = {}
    for row in columns:
        table_name = row[0]
        if table_name not in tables:
            tables[table_name] = {"columns": [], "foreign_keys": []}
        nullable = "NULL" if row[3] == "YES" else "NOT NULL"
        comment = f"  -- {row[4]}" if row[4] else ""
        tables[table_name]["columns"].append(f"  {row[1]} {row[2].upper()} {nullable}{comment}")
    
    for fk in fk_rows:
        tables[fk[0]]["foreign_keys"].append(
            f"  FOREIGN KEY ({fk[1]}) REFERENCES {fk[2]}({fk[3]})"
        )
    
    schema_parts = []
    for table_name, info in tables.items():
        schema_parts.append(f"-- Table: {table_name}")
        schema_parts.append(f"CREATE TABLE {table_name} (")
        schema_parts.extend(info["columns"])
        if info["foreign_keys"]:
            schema_parts.extend(info["foreign_keys"])
        schema_parts.append(");\n")
    
    return "\n".join(schema_parts)

63.2.2 Text-to-SQL Prompt Template

TEXT_TO_SQL_SYSTEM = """You are a precise SQL query generation expert.

## Your responsibilities
Convert users' natural language questions into correct SQL queries.

## Rules
1. Only generate SELECT queries. Never generate INSERT/UPDATE/DELETE/DROP/CREATE or other modification operations.
2. Use parameterized query placeholders ($1, $2...) for user-supplied values to prevent SQL injection.
3. For queries that might return large datasets, automatically add LIMIT 100.
4. If the question cannot be answered with the current schema, clearly explain why.
5. Briefly explain your query approach before generating SQL.

## Output format
```sql
-- Approach: [brief explanation]
SELECT ...
FROM ...
WHERE ...

If you cannot generate SQL, output:

Cannot generate: [reason]

"""

def text_to_sql( question: str, schema: str, sample_data: Optional[str] = None, dialect: str = "PostgreSQL" ) -> dict: """Convert a natural language question to a SQL query"""

user_content = f"""## Database Schema ({dialect})

{schema}

{f"## Sample Data\n{sample_data}" if sample_data else ""}

User Question

{question}

Please generate the corresponding SQL query."""

response = client.messages.create(
    model="claude-opus-4-5",
    max_tokens=1024,
    system=TEXT_TO_SQL_SYSTEM,
    messages=[{"role": "user", "content": user_content}]
)

reply = response.content[0].text

sql_match = re.search(r'```sql\n(.*?)\n```', reply, re.DOTALL)

if sql_match:
    sql = sql_match.group(1).strip()
    return {"success": True, "sql": sql, "explanation": reply}
elif "Cannot generate" in reply or "cannot generate" in reply:
    return {"success": False, "reason": reply}
else:
    return {"success": False, "reason": "Could not extract SQL statement", "raw": reply}

## 63.3 SQL Security Validation Layer

Executing Claude-generated SQL directly is risky. Multiple validation layers are required before execution.

### 63.3.1 SQL Validator

```python
from typing import Tuple

class SQLValidator:
    """SQL security validator: ensures only read-only queries are executed"""
    
    DANGEROUS_KEYWORDS = [
        r'\bINSERT\b', r'\bUPDATE\b', r'\bDELETE\b', r'\bDROP\b',
        r'\bCREATE\b', r'\bALTER\b', r'\bTRUNCATE\b', r'\bEXEC\b',
        r'\bEXECUTE\b', r'\bGRANT\b', r'\bREVOKE\b', r'\bXP_\w+',
    ]
    
    @classmethod
    def validate(cls, sql: str) -> Tuple[bool, str]:
        """Validate SQL safety. Returns (is_safe, reason)."""
        
        # Remove comments
        sql_no_comments = re.sub(r'--[^\n]*', '', sql)
        sql_no_comments = re.sub(r'/\*.*?\*/', '', sql_no_comments, flags=re.DOTALL)
        sql_stripped = sql_no_comments.strip()
        
        if not re.match(r'^SELECT\b', sql_stripped, re.IGNORECASE):
            return False, f"Query does not start with SELECT (actual: {sql_stripped[:50]})"
        
        for pattern in cls.DANGEROUS_KEYWORDS:
            if re.search(pattern, sql, re.IGNORECASE | re.MULTILINE):
                return False, f"Contains dangerous keyword: {pattern}"
        
        statements = [s.strip() for s in sql.split(';') if s.strip()]
        if len(statements) > 1:
            return False, "Contains multiple statements (possible SQL injection)"
        
        return True, "Validation passed"
    
    @classmethod
    def extract_table_names(cls, sql: str) -> list[str]:
        pattern = r'(?:FROM|JOIN)\s+([a-zA-Z_][a-zA-Z0-9_]*)'
        return re.findall(pattern, sql, re.IGNORECASE)


class SafeSQLExecutor:
    """Safe SQL executor"""
    
    def __init__(self, conn, allowed_tables: Optional[list] = None, max_rows: int = 1000):
        self.conn = conn
        self.allowed_tables = allowed_tables
        self.max_rows = max_rows
    
    def execute(self, sql: str) -> dict:
        """Execute a validated SQL query"""
        
        is_safe, reason = SQLValidator.validate(sql)
        if not is_safe:
            return {"success": False, "error": f"SQL security validation failed: {reason}"}
        
        if self.allowed_tables:
            referenced_tables = SQLValidator.extract_table_names(sql)
            unauthorized = [t for t in referenced_tables if t not in self.allowed_tables]
            if unauthorized:
                return {"success": False, "error": f"Unauthorized table access: {unauthorized}"}
        
        sql_with_limit = self._enforce_limit(sql)
        
        try:
            cursor = self.conn.cursor()
            cursor.execute(sql_with_limit)
            rows = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
            
            return {
                "success": True,
                "columns": columns,
                "rows": rows,
                "row_count": len(rows),
                "truncated": len(rows) == self.max_rows
            }
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def _enforce_limit(self, sql: str) -> str:
        if re.search(r'\bLIMIT\b', sql, re.IGNORECASE):
            return sql
        return f"{sql.rstrip(';')} LIMIT {self.max_rows}"

63.4 Complete Natural Language BI System

Integrating Text-to-SQL, safe execution, and result interpretation into a complete BI Q&A system.

import pandas as pd

class NaturalLanguageBISystem:
    """Natural language BI system: question → SQL → execute → interpret"""
    
    def __init__(self, conn, schema: str, allowed_tables: Optional[list] = None):
        self.conn = conn
        self.schema = schema
        self.executor = SafeSQLExecutor(conn, allowed_tables)
        self.client = anthropic.Anthropic()
    
    def ask(self, question: str, explain_results: bool = True) -> dict:
        """Complete Q&A flow"""
        
        # Step 1: Text-to-SQL
        sql_result = text_to_sql(question, self.schema)
        
        if not sql_result["success"]:
            return {
                "question": question,
                "error": sql_result.get("reason", "SQL generation failed"),
                "stage": "sql_generation"
            }
        
        sql = sql_result["sql"]
        
        # Step 2: Safe execution
        exec_result = self.executor.execute(sql)
        
        if not exec_result["success"]:
            corrected = self._try_fix_sql(sql, exec_result["error"], question)
            if corrected:
                sql = corrected
                exec_result = self.executor.execute(sql)
            
            if not exec_result["success"]:
                return {
                    "question": question,
                    "sql": sql,
                    "error": exec_result["error"],
                    "stage": "sql_execution"
                }
        
        columns = exec_result["columns"]
        rows = exec_result["rows"]
        
        if not explain_results or not rows:
            return {
                "question": question,
                "sql": sql,
                "columns": columns,
                "rows": rows,
                "row_count": exec_result["row_count"],
                "truncated": exec_result.get("truncated", False)
            }
        
        # Step 3: Result interpretation
        df = pd.DataFrame(rows, columns=columns)
        table_str = df.head(20).to_markdown(index=False)
        
        explanation_response = self.client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=512,
            messages=[{
                "role": "user",
                "content": f"""User question: {question}

Query results ({exec_result['row_count']} rows{', truncated to first 20' if exec_result.get('truncated') else ''}):
{table_str}

Please interpret these results in 2-3 sentences, directly answering the user's question. Note any significant trends or anomalies."""
            }]
        )
        
        return {
            "question": question,
            "sql": sql,
            "columns": columns,
            "rows": rows,
            "row_count": exec_result["row_count"],
            "truncated": exec_result.get("truncated", False),
            "interpretation": explanation_response.content[0].text
        }
    
    def _try_fix_sql(self, sql: str, error: str, original_question: str) -> Optional[str]:
        """Attempt to fix a failed SQL query"""
        fix_response = self.client.messages.create(
            model="claude-opus-4-5",
            max_tokens=512,
            messages=[{
                "role": "user",
                "content": f"""The following SQL failed to execute. Please fix it:

Original question: {original_question}

Incorrect SQL:
```sql
{sql}

Error: {error}

Schema: {self.schema}

Output only the corrected SQL in a ```sql code block:""" }] )

    reply = fix_response.content[0].text
    sql_match = re.search(r'```sql\n(.*?)\n```', reply, re.DOTALL)
    return sql_match.group(1).strip() if sql_match else None

## 63.5 Multi-Step Analysis for Complex Business Questions

```python
def multi_step_analysis(question: str, bi_system: NaturalLanguageBISystem) -> dict:
    """Multi-step analysis: decompose complex questions into sub-queries"""
    
    decompose_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=512,
        messages=[{
            "role": "user",
            "content": f"""This business analysis question may require multiple SQL queries.
Break it into 2-4 sub-questions, each answerable by a single SQL query.

Analysis question: {question}

Available tables: orders, customers, products, order_items

Output as JSON array:
[
  {{"step": 1, "sub_question": "Sub-question 1"}},
  {{"step": 2, "sub_question": "Sub-question 2", "depends_on": 1}}
]"""
        }]
    )
    
    import json
    steps_text = decompose_response.content[0].text
    json_match = re.search(r'\[[\s\S]*\]', steps_text)
    
    if not json_match:
        return bi_system.ask(question)
    
    steps = json.loads(json_match.group(0))
    step_results = {}
    
    for step in steps:
        result = bi_system.ask(step["sub_question"], explain_results=False)
        step_results[step["step"]] = {"question": step["sub_question"], "result": result}
    
    results_summary = "\n\n".join([
        f"Step {k}: {v['question']}\nResult: {v['result'].get('rows', [])[:5]}"
        for k, v in step_results.items()
    ])
    
    final_response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"""Original question: {question}

Step-by-step query results:
{results_summary}

Please provide a complete analysis (under 300 words):"""
        }]
    )
    
    return {
        "question": question,
        "steps": step_results,
        "final_analysis": final_response.content[0].text
    }

63.6 Schema Optimization for Better Text-to-SQL Accuracy

63.6.1 Add Comments to Tables and Columns

-- PostgreSQL table and column comments dramatically improve SQL accuracy
COMMENT ON TABLE orders IS 'Order master table recording each transaction';
COMMENT ON COLUMN orders.status IS 'Order status: pending=awaiting payment, paid=paid, shipped=shipped, completed=complete, cancelled=cancelled';
COMMENT ON COLUMN orders.total_amount IS 'Order total in cents (USD)';

63.6.2 Provide Query Examples (Few-Shot)

FEW_SHOT_EXAMPLES = """
## Query Examples

Example 1: "Top 5 products by revenue last month"
```sql
SELECT p.name, SUM(oi.quantity * oi.unit_price) as revenue
FROM order_items oi
JOIN products p ON oi.product_id = p.id
JOIN orders o ON oi.order_id = o.id
WHERE o.created_at >= DATE_TRUNC('month', NOW() - INTERVAL '1 month')
  AND o.created_at < DATE_TRUNC('month', NOW())
  AND o.status = 'completed'
GROUP BY p.id, p.name
ORDER BY revenue DESC
LIMIT 10

Example 2: "Active user count by city"

SELECT c.city, COUNT(DISTINCT c.id) as active_users
FROM customers c
JOIN orders o ON c.id = o.customer_id
WHERE o.created_at >= NOW() - INTERVAL '30 days'
GROUP BY c.city
ORDER BY active_users DESC

"""


## 63.7 Integration with BI Tools

### 63.7.1 REST API Layer for BI Tool Integration

```python
from flask import Flask, request, jsonify

app = Flask(__name__)
bi_system = None  # Initialize BI system at startup

@app.route("/api/nl-query", methods=["POST"])
def nl_query():
    """Natural language query REST API — callable from BI tools like Metabase or Superset"""
    data = request.json
    question = data.get("question")
    
    if not question:
        return jsonify({"error": "missing question"}), 400
    
    result = bi_system.ask(question)
    
    return jsonify({
        "question": question,
        "sql": result.get("sql"),
        "data": {
            "columns": result.get("columns", []),
            "rows": [list(row) for row in result.get("rows", [])]
        },
        "interpretation": result.get("interpretation"),
        "error": result.get("error")
    })

Summary

The quality of a Text-to-SQL system depends on three core elements: completeness of schema injection (table comments, column comments, foreign key relationships, sample data), a SQL security validation layer (read-only enforcement, parameterization, multi-statement detection), and a result interpretation layer (Claude's natural language interpretation of query results). Production systems must implement a SQL validator to prevent injection risks, use table-level permission control to limit data access scope, and provide auto-correction when SQL execution fails. The complete BI Q&A system chains Text-to-SQL, safe execution, and result interpretation into a pipeline — an effective path for giving non-technical staff direct access to data insights.

Rate this chapter
4.8  / 5  (3 ratings)

💬 Comments