Chapter 38

Building a Custom MCP Server

Chapter 38: Building a Custom MCP Server

Understanding the MCP protocol specification is valuable—but the best way to internalize it is to build one yourself. This chapter walks through a complete hands-on project: developing a "database query" MCP Server in Python, covering everything from environment setup to registering and using it inside Hermes.


38.1 Project Design: Database Query MCP Server

Capability Design

The db-query-mcp-server we'll build provides:

db-query-mcp-server
├── Tools
│   ├── query_sql       — Execute read-only SQL queries
│   ├── list_tables     — List all table names
│   └── describe_table  — Show table schema
│
├── Resources
│   ├── db://schema           — Complete database schema
│   └── db://tables/{name}    — Single-table data preview
│
└── Prompts
    └── analyze_table   — Optimal prompt for table analysis

Directory Structure

db-query-mcp-server/
├── pyproject.toml
├── README.md
├── src/
│   └── db_query_mcp/
│       ├── __init__.py
│       ├── server.py       # MCP Server entry point
│       ├── tools.py        # Tool implementations
│       ├── resources.py    # Resource implementations
│       ├── db.py           # Database connection layer
│       └── schemas.py      # Input validation schemas
└── tests/
    ├── test_tools.py
    └── test_server.py

38.2 SDK Installation and Initialization

mkdir db-query-mcp-server && cd db-query-mcp-server
python -m venv .venv && source .venv/bin/activate

pip install mcp aiosqlite pydantic structlog
# pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "db-query-mcp-server"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = ["mcp>=0.9.0", "aiosqlite>=0.20.0", "pydantic>=2.0"]

[project.scripts]
db-query-mcp = "db_query_mcp.server:main"

38.3 Defining Tool Schemas

# src/db_query_mcp/schemas.py
from pydantic import BaseModel, Field, field_validator
from typing import Optional, List
import re

class QuerySQLInput(BaseModel):
    sql: str = Field(description="SELECT SQL statement to execute", min_length=1)
    params: Optional[List] = Field(default=None, description="Bind parameters")
    limit: int = Field(default=100, ge=1, le=10000, description="Max rows to return")
    
    @field_validator("sql")
    @classmethod
    def validate_readonly(cls, v: str) -> str:
        normalized = v.strip().upper()
        if not normalized.startswith("SELECT"):
            raise ValueError(
                f"Security: only SELECT statements are allowed. "
                f"Detected: {normalized.split()[0]}"
            )
        dangerous = ["DROP","DELETE","UPDATE","INSERT","ALTER","TRUNCATE","EXEC"]
        for kw in dangerous:
            if re.search(r'\b' + kw + r'\b', normalized):
                raise ValueError(f"Security: forbidden keyword '{kw}' detected in SQL")
        return v

class ListTablesInput(BaseModel):
    include_views: bool = Field(default=False, description="Include views in results")

class DescribeTableInput(BaseModel):
    table_name: str = Field(
        description="Table name", min_length=1, max_length=255,
        pattern=r'^[a-zA-Z_][a-zA-Z0-9_]*$'
    )

38.4 Database Connection Layer

# src/db_query_mcp/db.py
import asyncio
import aiosqlite
from typing import Any, Dict, List, Optional
from contextlib import asynccontextmanager

class DatabaseConnection:
    def __init__(self, connection_string: str, pool_size: int = 5):
        self.connection_string = connection_string
        self.pool_size = pool_size
        self._pool: List[aiosqlite.Connection] = []
        self._semaphore = asyncio.Semaphore(pool_size)
    
    async def initialize(self):
        for _ in range(self.pool_size):
            conn = await aiosqlite.connect(self.connection_string)
            conn.row_factory = aiosqlite.Row
            await conn.execute("PRAGMA journal_mode=WAL")
            self._pool.append(conn)
    
    @asynccontextmanager
    async def acquire(self):
        async with self._semaphore:
            conn = self._pool[0]
            try:
                yield conn
            except Exception:
                raise
    
    async def query(self, sql: str, params=None, limit=100) -> Dict[str, Any]:
        async with self.acquire() as conn:
            # Wrap in LIMIT to guard against huge result sets
            limited_sql = f"SELECT * FROM ({sql}) AS __q LIMIT {limit + 1}"
            async with conn.execute(limited_sql, params or []) as cursor:
                rows = await cursor.fetchall()
                columns = [d[0] for d in cursor.description]
                truncated = len(rows) > limit
                return {
                    "rows": [dict(r) for r in rows[:limit]],
                    "columns": columns,
                    "row_count": min(len(rows), limit),
                    "truncated": truncated
                }
    
    async def get_tables(self, include_views=False) -> List[str]:
        type_filter = "IN ('table','view')" if include_views else "= 'table'"
        async with self.acquire() as conn:
            async with conn.execute(
                f"SELECT name FROM sqlite_master WHERE type {type_filter} ORDER BY name"
            ) as cursor:
                return [r[0] for r in await cursor.fetchall()]
    
    async def get_table_schema(self, table_name: str) -> List[Dict]:
        async with self.acquire() as conn:
            async with conn.execute(f"PRAGMA table_info({table_name})") as cursor:
                return [
                    {"name": c["name"], "type": c["type"],
                     "nullable": not c["notnull"], "default": c["dflt_value"],
                     "primary_key": bool(c["pk"])}
                    for c in await cursor.fetchall()
                ]
    
    async def close(self):
        for conn in self._pool:
            await conn.close()

38.5 Implementing handle_call_tool

# src/db_query_mcp/tools.py
import json
from typing import Any, Dict, List
from mcp.types import Tool, TextContent, CallToolResult
from .schemas import QuerySQLInput, ListTablesInput, DescribeTableInput

def get_tool_definitions() -> List[Tool]:
    return [
        Tool(
            name="query_sql",
            description=(
                "Execute a read-only SQL query against the database. "
                "Only SELECT statements are permitted. Results are automatically "
                "capped at the specified limit."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {"type": "string", "description": "SELECT statement to execute"},
                    "params": {"type": "array", "items": {}, "description": "Bind parameters"},
                    "limit": {"type": "integer", "default": 100, "minimum": 1, "maximum": 10000}
                },
                "required": ["sql"]
            }
        ),
        Tool(
            name="list_tables",
            description="List all table names in the database",
            inputSchema={
                "type": "object",
                "properties": {
                    "include_views": {"type": "boolean", "default": False}
                }
            }
        ),
        Tool(
            name="describe_table",
            description="Show the schema (columns, types, nullability, keys) of a table",
            inputSchema={
                "type": "object",
                "properties": {
                    "table_name": {"type": "string", "description": "Table name to describe"}
                },
                "required": ["table_name"]
            }
        )
    ]


async def handle_call_tool(name: str, arguments: Dict[str, Any], db) -> CallToolResult:
    try:
        if name == "query_sql":
            return await _query_sql(arguments, db)
        elif name == "list_tables":
            return await _list_tables(arguments, db)
        elif name == "describe_table":
            return await _describe_table(arguments, db)
        else:
            return CallToolResult(
                content=[TextContent(type="text", text=f"Unknown tool: {name}")],
                isError=True
            )
    except ValueError as e:
        return CallToolResult(
            content=[TextContent(type="text", text=f"Validation error: {e}")],
            isError=True
        )
    except Exception as e:
        return CallToolResult(
            content=[TextContent(type="text", text=f"Execution error: {e}")],
            isError=True
        )


async def _query_sql(arguments: Dict, db) -> CallToolResult:
    params = QuerySQLInput(**arguments)
    result = await db.query(params.sql, params.params, params.limit)
    rows, columns = result["rows"], result["columns"]
    
    if not rows:
        return CallToolResult(
            content=[TextContent(type="text", text="Query returned 0 rows.")],
            isError=False
        )
    
    # Build Markdown table
    md = ["| " + " | ".join(columns) + " |",
          "| " + " | ".join("---" for _ in columns) + " |"]
    for row in rows:
        md.append("| " + " | ".join(str(row.get(c, "")) for c in columns) + " |")
    
    note = " (truncated)" if result["truncated"] else ""
    summary = f"Query returned {result['row_count']} rows{note}:\n\n" + "\n".join(md)
    
    return CallToolResult(
        content=[
            TextContent(type="text", text=summary),
            TextContent(type="text",
                text="**Raw JSON:**\n```json\n"
                     + json.dumps(rows[:10], indent=2)
                     + "\n```")
        ],
        isError=False
    )


async def _list_tables(arguments: Dict, db) -> CallToolResult:
    params = ListTablesInput(**arguments)
    tables = await db.get_tables(params.include_views)
    if not tables:
        return CallToolResult(
            content=[TextContent(type="text", text="No tables found in the database.")],
            isError=False
        )
    table_list = "\n".join(f"- `{t}`" for t in tables)
    return CallToolResult(
        content=[TextContent(type="text", text=f"Database has {len(tables)} table(s):\n\n{table_list}")],
        isError=False
    )


async def _describe_table(arguments: Dict, db) -> CallToolResult:
    params = DescribeTableInput(**arguments)
    schema = await db.get_table_schema(params.table_name)
    if not schema:
        return CallToolResult(
            content=[TextContent(type="text", text=f"Table `{params.table_name}` not found.")],
            isError=True
        )
    lines = [
        f"## Schema: `{params.table_name}`\n",
        "| Column | Type | Nullable | Default | PK |",
        "| ------ | ---- | -------- | ------- | -- |",
    ]
    for col in schema:
        lines.append(
            f"| `{col['name']}` | {col['type']} | "
            f"{'Yes' if col['nullable'] else 'No'} | "
            f"{col['default'] or '-'} | "
            f"{'✓' if col['primary_key'] else ''} |"
        )
    return CallToolResult(
        content=[TextContent(type="text", text="\n".join(lines))],
        isError=False
    )

38.6 Complete Server Entry Point

# src/db_query_mcp/server.py
import asyncio, os, sys
from typing import Any, Dict
from mcp.server import Server
from mcp.server.stdio import stdio_server
from .db import DatabaseConnection
from .tools import get_tool_definitions, handle_call_tool

app = Server("db-query-mcp-server")
db: DatabaseConnection = None

@app.list_tools()
async def list_tools():
    return get_tool_definitions()

@app.call_tool()
async def call_tool(name: str, arguments: Dict[str, Any]):
    return await handle_call_tool(name, arguments, db)

async def run_server():
    global db
    db_path = os.environ.get("DB_PATH", ":memory:")
    db = DatabaseConnection(db_path)
    await db.initialize()
    try:
        async with stdio_server() as (read_stream, write_stream):
            await app.run(read_stream, write_stream, app.create_initialization_options())
    finally:
        await db.close()

def main():
    asyncio.run(run_server())

if __name__ == "__main__":
    main()

38.7 Registering with Hermes

Option 1: Config File

# hermes_config.yaml
mcp_servers:
  - name: "db-query"
    description: "Database query tools"
    command: ["db-query-mcp"]
    env:
      DB_PATH: "/data/production.db"
      DB_POOL_SIZE: "10"
    timeout: 30
    auto_approve_tools:
      - "list_tables"
      - "describe_table"
    require_approval_tools:
      - "query_sql"   # Requires confirmation (potential risk)

Option 2: Programmatic Registration

import asyncio
from hermes import Agent
from hermes.mcp import StdioServerConfig

async def main():
    agent = Agent(model="hermes-pro")
    
    db_server = StdioServerConfig(
        name="db-query",
        command=["db-query-mcp"],
        env={"DB_PATH": "/data/production.db"},
        timeout=30
    )
    await agent.mcp.register(db_server)
    
    tools = await agent.mcp.list_tools("db-query")
    print(f"Registered tools: {[t.name for t in tools]}")
    
    result = await agent.run("What tables exist in the database and how many rows does each have?")
    print(result.text)

asyncio.run(main())

38.8 Debugging Techniques

Technique 1: MCP Inspector (Official Tool)

npm install -g @modelcontextprotocol/inspector
mcp-inspector db-query-mcp
# Opens Web UI at http://localhost:5173
# Allows: browsing Tools/Resources/Prompts,
#          manual tool calls, raw JSON-RPC inspection, server logs

Technique 2: Manual JSON-RPC Testing

echo '{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"0.1"}}}' | db-query-mcp

Technique 3: pytest Integration Tests

# tests/test_server.py
import pytest
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

@pytest.fixture
async def mcp_session(tmp_path):
    import aiosqlite
    db_path = str(tmp_path / "test.db")
    async with aiosqlite.connect(db_path) as conn:
        await conn.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
        await conn.execute("INSERT INTO users VALUES (1, 'Alice')")
        await conn.commit()
    
    params = StdioServerParameters(command="db-query-mcp", env={"DB_PATH": db_path})
    async with stdio_client(params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            yield session

@pytest.mark.asyncio
async def test_list_tools(mcp_session):
    tools = await mcp_session.list_tools()
    names = [t.name for t in tools.tools]
    assert "query_sql" in names

@pytest.mark.asyncio
async def test_select_query(mcp_session):
    result = await mcp_session.call_tool("query_sql", {"sql": "SELECT * FROM users"})
    assert not result.isError
    assert "Alice" in result.content[0].text

@pytest.mark.asyncio
async def test_write_blocked(mcp_session):
    result = await mcp_session.call_tool("query_sql", {"sql": "DELETE FROM users"})
    assert result.isError
    assert "Security" in result.content[0].text

Chapter Summary

This chapter built a production-grade MCP Server from scratch:

  1. Project design: Clearly scoped Tool/Resource/Prompt capabilities before writing a line of code
  2. SDK usage: MCP Python SDK's Server and stdio_server primitives dramatically reduce boilerplate
  3. Schema definition: Pydantic for input validation + JSON Schema for tool interface description
  4. Secure tool implementation: SELECT-only enforcement, parameterized queries, row limits
  5. Server entry point: Decorator-driven routing (@app.call_tool()), clean separation of concerns
  6. Registration: Config file and programmatic approaches for different deployment scenarios
  7. Debugging: MCP Inspector for visual debugging, manual JSON-RPC for raw protocol, pytest for automation

Review Questions

  1. The current implementation uses SQLite. How would you modify DatabaseConnection to support PostgreSQL? What differences must you handle (connection strings, RETURNING clauses, type mappings)?
  2. How would you add a query execution time limit (e.g., 10 seconds) that automatically cancels the query on timeout?
  3. Is the MCP Server stateless? If a user runs multiple queries in one conversation, does the Server need to maintain a "query history" state?
  4. How would you convert this Server from stdio mode to SSE mode so it can run as a standalone HTTP service?
Rate this chapter
4.6  / 5  (3 ratings)

💬 Comments