n8n / Make / Zapier Zero-Code Workflow Integration: Automation Scenarios and Webhook Trigger Design
Chapter 63: Database Integration: Text-to-SQL, Data Analysis, and BI Enhancement
63.1 The Core Value of Database Integration
Most enterprises store core business data in relational databases. But people who can write SQL queries are far fewer than those who need data insightsโproduct managers, sales, and operations staff need data every day but must rely on data teams, creating a significant efficiency bottleneck.
Claude's Text-to-SQL capability breaks this bottleneck:
- Business users describe their needs in natural language and get query results directly
- Data analysts use Claude to quickly draft complex queries, reducing debugging time
- BI teams use Claude to augment reporting tools with an intelligent Q&A layer
This chapter covers three progressive implementation levels: basic Text-to-SQL, safe SQL generation with validation, and a complete natural language BI system.
63.2 Basic Text-to-SQL Implementation
63.2.1 Schema Injection Strategy
The core of Text-to-SQL is injecting the database schema into Claude's context. Schema quality directly determines the accuracy of generated SQL.
import anthropic
import psycopg2
from typing import Optional
import re
client = anthropic.Anthropic()
def get_schema_description(conn, db_type: str = "postgresql") -> str:
"""Extract schema description from the database (tables, column comments, foreign keys)"""
cursor = conn.cursor()
cursor.execute("""
SELECT
t.table_name,
c.column_name,
c.data_type,
c.is_nullable,
pgd.description as column_comment
FROM information_schema.tables t
JOIN information_schema.columns c ON t.table_name = c.table_name
LEFT JOIN pg_catalog.pg_statio_all_tables st ON st.relname = t.table_name
LEFT JOIN pg_catalog.pg_description pgd ON pgd.objoid = st.relid
AND pgd.objsubid = c.ordinal_position
WHERE t.table_schema = 'public'
ORDER BY t.table_name, c.ordinal_position
""")
columns = cursor.fetchall()
cursor.execute("""
SELECT tc.table_name, kcu.column_name,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
WHERE tc.constraint_type = 'FOREIGN KEY'
""")
fk_rows = cursor.fetchall()
tables = {}
for row in columns:
table_name = row[0]
if table_name not in tables:
tables[table_name] = {"columns": [], "foreign_keys": []}
nullable = "NULL" if row[3] == "YES" else "NOT NULL"
comment = f" -- {row[4]}" if row[4] else ""
tables[table_name]["columns"].append(f" {row[1]} {row[2].upper()} {nullable}{comment}")
for fk in fk_rows:
tables[fk[0]]["foreign_keys"].append(
f" FOREIGN KEY ({fk[1]}) REFERENCES {fk[2]}({fk[3]})"
)
schema_parts = []
for table_name, info in tables.items():
schema_parts.append(f"-- Table: {table_name}")
schema_parts.append(f"CREATE TABLE {table_name} (")
schema_parts.extend(info["columns"])
if info["foreign_keys"]:
schema_parts.extend(info["foreign_keys"])
schema_parts.append(");\n")
return "\n".join(schema_parts)
63.2.2 Text-to-SQL Prompt Template
TEXT_TO_SQL_SYSTEM = """You are a precise SQL query generation expert.
## Your responsibilities
Convert users' natural language questions into correct SQL queries.
## Rules
1. Only generate SELECT queries. Never generate INSERT/UPDATE/DELETE/DROP/CREATE or other modification operations.
2. Use parameterized query placeholders ($1, $2...) for user-supplied values to prevent SQL injection.
3. For queries that might return large datasets, automatically add LIMIT 100.
4. If the question cannot be answered with the current schema, clearly explain why.
5. Briefly explain your query approach before generating SQL.
## Output format
```sql
-- Approach: [brief explanation]
SELECT ...
FROM ...
WHERE ...
If you cannot generate SQL, output:
Cannot generate: [reason]
"""
def text_to_sql( question: str, schema: str, sample_data: Optional[str] = None, dialect: str = "PostgreSQL" ) -> dict: """Convert a natural language question to a SQL query"""
user_content = f"""## Database Schema ({dialect})
{schema}
{f"## Sample Data\n{sample_data}" if sample_data else ""}
User Question
{question}
Please generate the corresponding SQL query."""
response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
system=TEXT_TO_SQL_SYSTEM,
messages=[{"role": "user", "content": user_content}]
)
reply = response.content[0].text
sql_match = re.search(r'```sql\n(.*?)\n```', reply, re.DOTALL)
if sql_match:
sql = sql_match.group(1).strip()
return {"success": True, "sql": sql, "explanation": reply}
elif "Cannot generate" in reply or "cannot generate" in reply:
return {"success": False, "reason": reply}
else:
return {"success": False, "reason": "Could not extract SQL statement", "raw": reply}
## 63.3 SQL Security Validation Layer
Executing Claude-generated SQL directly is risky. Multiple validation layers are required before execution.
### 63.3.1 SQL Validator
```python
from typing import Tuple
class SQLValidator:
"""SQL security validator: ensures only read-only queries are executed"""
DANGEROUS_KEYWORDS = [
r'\bINSERT\b', r'\bUPDATE\b', r'\bDELETE\b', r'\bDROP\b',
r'\bCREATE\b', r'\bALTER\b', r'\bTRUNCATE\b', r'\bEXEC\b',
r'\bEXECUTE\b', r'\bGRANT\b', r'\bREVOKE\b', r'\bXP_\w+',
]
@classmethod
def validate(cls, sql: str) -> Tuple[bool, str]:
"""Validate SQL safety. Returns (is_safe, reason)."""
# Remove comments
sql_no_comments = re.sub(r'--[^\n]*', '', sql)
sql_no_comments = re.sub(r'/\*.*?\*/', '', sql_no_comments, flags=re.DOTALL)
sql_stripped = sql_no_comments.strip()
if not re.match(r'^SELECT\b', sql_stripped, re.IGNORECASE):
return False, f"Query does not start with SELECT (actual: {sql_stripped[:50]})"
for pattern in cls.DANGEROUS_KEYWORDS:
if re.search(pattern, sql, re.IGNORECASE | re.MULTILINE):
return False, f"Contains dangerous keyword: {pattern}"
statements = [s.strip() for s in sql.split(';') if s.strip()]
if len(statements) > 1:
return False, "Contains multiple statements (possible SQL injection)"
return True, "Validation passed"
@classmethod
def extract_table_names(cls, sql: str) -> list[str]:
pattern = r'(?:FROM|JOIN)\s+([a-zA-Z_][a-zA-Z0-9_]*)'
return re.findall(pattern, sql, re.IGNORECASE)
class SafeSQLExecutor:
"""Safe SQL executor"""
def __init__(self, conn, allowed_tables: Optional[list] = None, max_rows: int = 1000):
self.conn = conn
self.allowed_tables = allowed_tables
self.max_rows = max_rows
def execute(self, sql: str) -> dict:
"""Execute a validated SQL query"""
is_safe, reason = SQLValidator.validate(sql)
if not is_safe:
return {"success": False, "error": f"SQL security validation failed: {reason}"}
if self.allowed_tables:
referenced_tables = SQLValidator.extract_table_names(sql)
unauthorized = [t for t in referenced_tables if t not in self.allowed_tables]
if unauthorized:
return {"success": False, "error": f"Unauthorized table access: {unauthorized}"}
sql_with_limit = self._enforce_limit(sql)
try:
cursor = self.conn.cursor()
cursor.execute(sql_with_limit)
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return {
"success": True,
"columns": columns,
"rows": rows,
"row_count": len(rows),
"truncated": len(rows) == self.max_rows
}
except Exception as e:
return {"success": False, "error": str(e)}
def _enforce_limit(self, sql: str) -> str:
if re.search(r'\bLIMIT\b', sql, re.IGNORECASE):
return sql
return f"{sql.rstrip(';')} LIMIT {self.max_rows}"
63.4 Complete Natural Language BI System
Integrating Text-to-SQL, safe execution, and result interpretation into a complete BI Q&A system.
import pandas as pd
class NaturalLanguageBISystem:
"""Natural language BI system: question โ SQL โ execute โ interpret"""
def __init__(self, conn, schema: str, allowed_tables: Optional[list] = None):
self.conn = conn
self.schema = schema
self.executor = SafeSQLExecutor(conn, allowed_tables)
self.client = anthropic.Anthropic()
def ask(self, question: str, explain_results: bool = True) -> dict:
"""Complete Q&A flow"""
# Step 1: Text-to-SQL
sql_result = text_to_sql(question, self.schema)
if not sql_result["success"]:
return {
"question": question,
"error": sql_result.get("reason", "SQL generation failed"),
"stage": "sql_generation"
}
sql = sql_result["sql"]
# Step 2: Safe execution
exec_result = self.executor.execute(sql)
if not exec_result["success"]:
corrected = self._try_fix_sql(sql, exec_result["error"], question)
if corrected:
sql = corrected
exec_result = self.executor.execute(sql)
if not exec_result["success"]:
return {
"question": question,
"sql": sql,
"error": exec_result["error"],
"stage": "sql_execution"
}
columns = exec_result["columns"]
rows = exec_result["rows"]
if not explain_results or not rows:
return {
"question": question,
"sql": sql,
"columns": columns,
"rows": rows,
"row_count": exec_result["row_count"],
"truncated": exec_result.get("truncated", False)
}
# Step 3: Result interpretation
df = pd.DataFrame(rows, columns=columns)
table_str = df.head(20).to_markdown(index=False)
explanation_response = self.client.messages.create(
model="claude-haiku-4-5",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""User question: {question}
Query results ({exec_result['row_count']} rows{', truncated to first 20' if exec_result.get('truncated') else ''}):
{table_str}
Please interpret these results in 2-3 sentences, directly answering the user's question. Note any significant trends or anomalies."""
}]
)
return {
"question": question,
"sql": sql,
"columns": columns,
"rows": rows,
"row_count": exec_result["row_count"],
"truncated": exec_result.get("truncated", False),
"interpretation": explanation_response.content[0].text
}
def _try_fix_sql(self, sql: str, error: str, original_question: str) -> Optional[str]:
"""Attempt to fix a failed SQL query"""
fix_response = self.client.messages.create(
model="claude-opus-4-5",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""The following SQL failed to execute. Please fix it:
Original question: {original_question}
Incorrect SQL:
```sql
{sql}
Error: {error}
Schema: {self.schema}
Output only the corrected SQL in a ```sql code block:""" }] )
reply = fix_response.content[0].text
sql_match = re.search(r'```sql\n(.*?)\n```', reply, re.DOTALL)
return sql_match.group(1).strip() if sql_match else None
## 63.5 Multi-Step Analysis for Complex Business Questions
```python
def multi_step_analysis(question: str, bi_system: NaturalLanguageBISystem) -> dict:
"""Multi-step analysis: decompose complex questions into sub-queries"""
decompose_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=512,
messages=[{
"role": "user",
"content": f"""This business analysis question may require multiple SQL queries.
Break it into 2-4 sub-questions, each answerable by a single SQL query.
Analysis question: {question}
Available tables: orders, customers, products, order_items
Output as JSON array:
[
{{"step": 1, "sub_question": "Sub-question 1"}},
{{"step": 2, "sub_question": "Sub-question 2", "depends_on": 1}}
]"""
}]
)
import json
steps_text = decompose_response.content[0].text
json_match = re.search(r'\[[\s\S]*\]', steps_text)
if not json_match:
return bi_system.ask(question)
steps = json.loads(json_match.group(0))
step_results = {}
for step in steps:
result = bi_system.ask(step["sub_question"], explain_results=False)
step_results[step["step"]] = {"question": step["sub_question"], "result": result}
results_summary = "\n\n".join([
f"Step {k}: {v['question']}\nResult: {v['result'].get('rows', [])[:5]}"
for k, v in step_results.items()
])
final_response = client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"""Original question: {question}
Step-by-step query results:
{results_summary}
Please provide a complete analysis (under 300 words):"""
}]
)
return {
"question": question,
"steps": step_results,
"final_analysis": final_response.content[0].text
}
63.6 Schema Optimization for Better Text-to-SQL Accuracy
63.6.1 Add Comments to Tables and Columns
-- PostgreSQL table and column comments dramatically improve SQL accuracy
COMMENT ON TABLE orders IS 'Order master table recording each transaction';
COMMENT ON COLUMN orders.status IS 'Order status: pending=awaiting payment, paid=paid, shipped=shipped, completed=complete, cancelled=cancelled';
COMMENT ON COLUMN orders.total_amount IS 'Order total in cents (USD)';
63.6.2 Provide Query Examples (Few-Shot)
FEW_SHOT_EXAMPLES = """
## Query Examples
Example 1: "Top 5 products by revenue last month"
```sql
SELECT p.name, SUM(oi.quantity * oi.unit_price) as revenue
FROM order_items oi
JOIN products p ON oi.product_id = p.id
JOIN orders o ON oi.order_id = o.id
WHERE o.created_at >= DATE_TRUNC('month', NOW() - INTERVAL '1 month')
AND o.created_at < DATE_TRUNC('month', NOW())
AND o.status = 'completed'
GROUP BY p.id, p.name
ORDER BY revenue DESC
LIMIT 10
Example 2: "Active user count by city"
SELECT c.city, COUNT(DISTINCT c.id) as active_users
FROM customers c
JOIN orders o ON c.id = o.customer_id
WHERE o.created_at >= NOW() - INTERVAL '30 days'
GROUP BY c.city
ORDER BY active_users DESC
"""
## 63.7 Integration with BI Tools
### 63.7.1 REST API Layer for BI Tool Integration
```python
from flask import Flask, request, jsonify
app = Flask(__name__)
bi_system = None # Initialize BI system at startup
@app.route("/api/nl-query", methods=["POST"])
def nl_query():
"""Natural language query REST API โ callable from BI tools like Metabase or Superset"""
data = request.json
question = data.get("question")
if not question:
return jsonify({"error": "missing question"}), 400
result = bi_system.ask(question)
return jsonify({
"question": question,
"sql": result.get("sql"),
"data": {
"columns": result.get("columns", []),
"rows": [list(row) for row in result.get("rows", [])]
},
"interpretation": result.get("interpretation"),
"error": result.get("error")
})
Summary
The quality of a Text-to-SQL system depends on three core elements: completeness of schema injection (table comments, column comments, foreign key relationships, sample data), a SQL security validation layer (read-only enforcement, parameterization, multi-statement detection), and a result interpretation layer (Claude's natural language interpretation of query results). Production systems must implement a SQL validator to prevent injection risks, use table-level permission control to limit data access scope, and provide auto-correction when SQL execution fails. The complete BI Q&A system chains Text-to-SQL, safe execution, and result interpretation into a pipeline โ an effective path for giving non-technical staff direct access to data insights.