Chapter 38
Building a Custom MCP Server
Chapter 38: Building a Custom MCP Server
Understanding the MCP protocol specification is valuableโbut the best way to internalize it is to build one yourself. This chapter walks through a complete hands-on project: developing a "database query" MCP Server in Python, covering everything from environment setup to registering and using it inside Hermes.
38.1 Project Design: Database Query MCP Server
Capability Design
The db-query-mcp-server we'll build provides:
db-query-mcp-server
โโโ Tools
โ โโโ query_sql โ Execute read-only SQL queries
โ โโโ list_tables โ List all table names
โ โโโ describe_table โ Show table schema
โ
โโโ Resources
โ โโโ db://schema โ Complete database schema
โ โโโ db://tables/{name} โ Single-table data preview
โ
โโโ Prompts
โโโ analyze_table โ Optimal prompt for table analysis
Directory Structure
db-query-mcp-server/
โโโ pyproject.toml
โโโ README.md
โโโ src/
โ โโโ db_query_mcp/
โ โโโ __init__.py
โ โโโ server.py # MCP Server entry point
โ โโโ tools.py # Tool implementations
โ โโโ resources.py # Resource implementations
โ โโโ db.py # Database connection layer
โ โโโ schemas.py # Input validation schemas
โโโ tests/
โโโ test_tools.py
โโโ test_server.py
38.2 SDK Installation and Initialization
mkdir db-query-mcp-server && cd db-query-mcp-server
python -m venv .venv && source .venv/bin/activate
pip install mcp aiosqlite pydantic structlog
# pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "db-query-mcp-server"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = ["mcp>=0.9.0", "aiosqlite>=0.20.0", "pydantic>=2.0"]
[project.scripts]
db-query-mcp = "db_query_mcp.server:main"
38.3 Defining Tool Schemas
# src/db_query_mcp/schemas.py
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List
import re
class QuerySQLInput(BaseModel):
sql: str = Field(description="SELECT SQL statement to execute", min_length=1)
params: Optional[List] = Field(default=None, description="Bind parameters")
limit: int = Field(default=100, ge=1, le=10000, description="Max rows to return")
@field_validator("sql")
@classmethod
def validate_readonly(cls, v: str) -> str:
normalized = v.strip().upper()
if not normalized.startswith("SELECT"):
raise ValueError(
f"Security: only SELECT statements are allowed. "
f"Detected: {normalized.split()[0]}"
)
dangerous = ["DROP","DELETE","UPDATE","INSERT","ALTER","TRUNCATE","EXEC"]
for kw in dangerous:
if re.search(r'\b' + kw + r'\b', normalized):
raise ValueError(f"Security: forbidden keyword '{kw}' detected in SQL")
return v
class ListTablesInput(BaseModel):
include_views: bool = Field(default=False, description="Include views in results")
class DescribeTableInput(BaseModel):
table_name: str = Field(
description="Table name", min_length=1, max_length=255,
pattern=r'^[a-zA-Z_][a-zA-Z0-9_]*$'
)
38.4 Database Connection Layer
# src/db_query_mcp/db.py
import asyncio
import aiosqlite
from typing import Any, Dict, List, Optional
from contextlib import asynccontextmanager
class DatabaseConnection:
def __init__(self, connection_string: str, pool_size: int = 5):
self.connection_string = connection_string
self.pool_size = pool_size
self._pool: List[aiosqlite.Connection] = []
self._semaphore = asyncio.Semaphore(pool_size)
async def initialize(self):
for _ in range(self.pool_size):
conn = await aiosqlite.connect(self.connection_string)
conn.row_factory = aiosqlite.Row
await conn.execute("PRAGMA journal_mode=WAL")
self._pool.append(conn)
@asynccontextmanager
async def acquire(self):
async with self._semaphore:
conn = self._pool[0]
try:
yield conn
except Exception:
raise
async def query(self, sql: str, params=None, limit=100) -> Dict[str, Any]:
async with self.acquire() as conn:
# Wrap in LIMIT to guard against huge result sets
limited_sql = f"SELECT * FROM ({sql}) AS __q LIMIT {limit + 1}"
async with conn.execute(limited_sql, params or []) as cursor:
rows = await cursor.fetchall()
columns = [d[0] for d in cursor.description]
truncated = len(rows) > limit
return {
"rows": [dict(r) for r in rows[:limit]],
"columns": columns,
"row_count": min(len(rows), limit),
"truncated": truncated
}
async def get_tables(self, include_views=False) -> List[str]:
type_filter = "IN ('table','view')" if include_views else "= 'table'"
async with self.acquire() as conn:
async with conn.execute(
f"SELECT name FROM sqlite_master WHERE type {type_filter} ORDER BY name"
) as cursor:
return [r[0] for r in await cursor.fetchall()]
async def get_table_schema(self, table_name: str) -> List[Dict]:
async with self.acquire() as conn:
async with conn.execute(f"PRAGMA table_info({table_name})") as cursor:
return [
{"name": c["name"], "type": c["type"],
"nullable": not c["notnull"], "default": c["dflt_value"],
"primary_key": bool(c["pk"])}
for c in await cursor.fetchall()
]
async def close(self):
for conn in self._pool:
await conn.close()
38.5 Implementing handle_call_tool
# src/db_query_mcp/tools.py
import json
from typing import Any, Dict, List
from mcp.types import Tool, TextContent, CallToolResult
from .schemas import QuerySQLInput, ListTablesInput, DescribeTableInput
def get_tool_definitions() -> List[Tool]:
return [
Tool(
name="query_sql",
description=(
"Execute a read-only SQL query against the database. "
"Only SELECT statements are permitted. Results are automatically "
"capped at the specified limit."
),
inputSchema={
"type": "object",
"properties": {
"sql": {"type": "string", "description": "SELECT statement to execute"},
"params": {"type": "array", "items": {}, "description": "Bind parameters"},
"limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 10000}
},
"required": ["sql"]
}
),
Tool(
name="list_tables",
description="List all table names in the database",
inputSchema={
"type": "object",
"properties": {
"include_views": {"type": "boolean", "default": False}
}
}
),
Tool(
name="describe_table",
description="Show the schema (columns, types, nullability, keys) of a table",
inputSchema={
"type": "object",
"properties": {
"table_name": {"type": "string", "description": "Table name to describe"}
},
"required": ["table_name"]
}
)
]
async def handle_call_tool(name: str, arguments: Dict[str, Any], db) -> CallToolResult:
try:
if name == "query_sql":
return await _query_sql(arguments, db)
elif name == "list_tables":
return await _list_tables(arguments, db)
elif name == "describe_table":
return await _describe_table(arguments, db)
else:
return CallToolResult(
content=[TextContent(type="text", text=f"Unknown tool: {name}")],
isError=True
)
except ValueError as e:
return CallToolResult(
content=[TextContent(type="text", text=f"Validation error: {e}")],
isError=True
)
except Exception as e:
return CallToolResult(
content=[TextContent(type="text", text=f"Execution error: {e}")],
isError=True
)
async def _query_sql(arguments: Dict, db) -> CallToolResult:
params = QuerySQLInput(**arguments)
result = await db.query(params.sql, params.params, params.limit)
rows, columns = result["rows"], result["columns"]
if not rows:
return CallToolResult(
content=[TextContent(type="text", text="Query returned 0 rows.")],
isError=False
)
# Build Markdown table
md = ["| " + " | ".join(columns) + " |",
"| " + " | ".join("---" for _ in columns) + " |"]
for row in rows:
md.append("| " + " | ".join(str(row.get(c, "")) for c in columns) + " |")
note = " (truncated)" if result["truncated"] else ""
summary = f"Query returned {result['row_count']} rows{note}:\n\n" + "\n".join(md)
return CallToolResult(
content=[
TextContent(type="text", text=summary),
TextContent(type="text",
text="**Raw JSON:**\n```json\n"
+ json.dumps(rows[:10], indent=2)
+ "\n```")
],
isError=False
)
async def _list_tables(arguments: Dict, db) -> CallToolResult:
params = ListTablesInput(**arguments)
tables = await db.get_tables(params.include_views)
if not tables:
return CallToolResult(
content=[TextContent(type="text", text="No tables found in the database.")],
isError=False
)
table_list = "\n".join(f"- `{t}`" for t in tables)
return CallToolResult(
content=[TextContent(type="text", text=f"Database has {len(tables)} table(s):\n\n{table_list}")],
isError=False
)
async def _describe_table(arguments: Dict, db) -> CallToolResult:
params = DescribeTableInput(**arguments)
schema = await db.get_table_schema(params.table_name)
if not schema:
return CallToolResult(
content=[TextContent(type="text", text=f"Table `{params.table_name}` not found.")],
isError=True
)
lines = [
f"## Schema: `{params.table_name}`\n",
"| Column | Type | Nullable | Default | PK |",
"| ------ | ---- | -------- | ------- | -- |",
]
for col in schema:
lines.append(
f"| `{col['name']}` | {col['type']} | "
f"{'Yes' if col['nullable'] else 'No'} | "
f"{col['default'] or '-'} | "
f"{'โ' if col['primary_key'] else ''} |"
)
return CallToolResult(
content=[TextContent(type="text", text="\n".join(lines))],
isError=False
)
38.6 Complete Server Entry Point
# src/db_query_mcp/server.py
import asyncio, os, sys
from typing import Any, Dict
from mcp.server import Server
from mcp.server.stdio import stdio_server
from .db import DatabaseConnection
from .tools import get_tool_definitions, handle_call_tool
app = Server("db-query-mcp-server")
db: DatabaseConnection = None
@app.list_tools()
async def list_tools():
return get_tool_definitions()
@app.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]):
return await handle_call_tool(name, arguments, db)
async def run_server():
global db
db_path = os.environ.get("DB_PATH", ":memory:")
db = DatabaseConnection(db_path)
await db.initialize()
try:
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
finally:
await db.close()
def main():
asyncio.run(run_server())
if __name__ == "__main__":
main()
38.7 Registering with Hermes
Option 1: Config File
# hermes_config.yaml
mcp_servers:
- name: "db-query"
description: "Database query tools"
command: ["db-query-mcp"]
env:
DB_PATH: "/data/production.db"
DB_POOL_SIZE: "10"
timeout: 30
auto_approve_tools:
- "list_tables"
- "describe_table"
require_approval_tools:
- "query_sql" # Requires confirmation (potential risk)
Option 2: Programmatic Registration
import asyncio
from hermes import Agent
from hermes.mcp import StdioServerConfig
async def main():
agent = Agent(model="hermes-pro")
db_server = StdioServerConfig(
name="db-query",
command=["db-query-mcp"],
env={"DB_PATH": "/data/production.db"},
timeout=30
)
await agent.mcp.register(db_server)
tools = await agent.mcp.list_tools("db-query")
print(f"Registered tools: {[t.name for t in tools]}")
result = await agent.run("What tables exist in the database and how many rows does each have?")
print(result.text)
asyncio.run(main())
38.8 Debugging Techniques
Technique 1: MCP Inspector (Official Tool)
npm install -g @modelcontextprotocol/inspector
mcp-inspector db-query-mcp
# Opens Web UI at http://localhost:5173
# Allows: browsing Tools/Resources/Prompts,
# manual tool calls, raw JSON-RPC inspection, server logs
Technique 2: Manual JSON-RPC Testing
echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"0.1"}}}' | db-query-mcp
Technique 3: pytest Integration Tests
# tests/test_server.py
import pytest
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
@pytest.fixture
async def mcp_session(tmp_path):
import aiosqlite
db_path = str(tmp_path / "test.db")
async with aiosqlite.connect(db_path) as conn:
await conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
await conn.execute("INSERT INTO users VALUES (1, 'Alice')")
await conn.commit()
params = StdioServerParameters(command="db-query-mcp", env={"DB_PATH": db_path})
async with stdio_client(params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
yield session
@pytest.mark.asyncio
async def test_list_tools(mcp_session):
tools = await mcp_session.list_tools()
names = [t.name for t in tools.tools]
assert "query_sql" in names
@pytest.mark.asyncio
async def test_select_query(mcp_session):
result = await mcp_session.call_tool("query_sql", {"sql": "SELECT * FROM users"})
assert not result.isError
assert "Alice" in result.content[0].text
@pytest.mark.asyncio
async def test_write_blocked(mcp_session):
result = await mcp_session.call_tool("query_sql", {"sql": "DELETE FROM users"})
assert result.isError
assert "Security" in result.content[0].text
Chapter Summary
This chapter built a production-grade MCP Server from scratch:
- Project design: Clearly scoped Tool/Resource/Prompt capabilities before writing a line of code
- SDK usage: MCP Python SDK's
Serverandstdio_serverprimitives dramatically reduce boilerplate - Schema definition: Pydantic for input validation + JSON Schema for tool interface description
- Secure tool implementation: SELECT-only enforcement, parameterized queries, row limits
- Server entry point: Decorator-driven routing (
@app.call_tool()), clean separation of concerns - Registration: Config file and programmatic approaches for different deployment scenarios
- Debugging: MCP Inspector for visual debugging, manual JSON-RPC for raw protocol, pytest for automation
Review Questions
- The current implementation uses SQLite. How would you modify
DatabaseConnectionto support PostgreSQL? What differences must you handle (connection strings, RETURNING clauses, type mappings)? - How would you add a query execution time limit (e.g., 10 seconds) that automatically cancels the query on timeout?
- Is the MCP Server stateless? If a user runs multiple queries in one conversation, does the Server need to maintain a "query history" state?
- How would you convert this Server from stdio mode to SSE mode so it can run as a standalone HTTP service?