Chapter 35

MCP Three Core Primitives: Design Specifications and Implementation Patterns for Tools / Resources / Prompts

Chapter 35: Building Your Own MCP Server: Creating Tool Services from Scratch

35.1 Prerequisites

Building an MCP Server doesn't require implementing JSON-RPC from scratch โ€” Anthropic provides official Python and TypeScript SDKs that dramatically lower the development barrier. This chapter demonstrates building a fully featured MCP Server in both languages, covering all three capability types: Tools, Resources, and Prompts.

35.1.1 SDK Installation

Python SDK (recommended for data processing, backend integrations):

pip install "mcp[cli]"

TypeScript/Node.js SDK (recommended for frontend ecosystems, Node.js toolchains):

npm install @modelcontextprotocol/sdk

35.1.2 Environment Check

# Python
python --version    # Requires 3.10+
pip show mcp        # Confirm SDK is installed

# Node.js
node --version      # Requires 18+
npx @modelcontextprotocol/inspector --version  # MCP debugging tool

35.2 A Complete Python MCP Server

We'll build a "project assistant" Server providing:

35.2.1 Basic Structure

# server.py
import asyncio
import os
import subprocess
from pathlib import Path
from typing import Any, Sequence

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types

app = Server("project-assistant")

WORKSPACE = Path(os.environ.get("WORKSPACE", "/tmp/workspace"))
WORKSPACE.mkdir(parents=True, exist_ok=True)

35.2.2 Implementing Tools

@app.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="read_file",
            description="Read the contents of a file within the workspace",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "File path relative to the workspace root"
                    }
                },
                "required": ["path"]
            }
        ),
        types.Tool(
            name="write_file",
            description="Write or create a file within the workspace",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {"type": "string"},
                    "content": {"type": "string"}
                },
                "required": ["path", "content"]
            }
        ),
        types.Tool(
            name="run_command",
            description="Execute a shell command in the workspace directory",
            inputSchema={
                "type": "object",
                "properties": {
                    "command": {"type": "string"},
                    "timeout": {"type": "integer", "default": 30}
                },
                "required": ["command"]
            }
        )
    ]


@app.call_tool()
async def call_tool(
    name: str,
    arguments: dict[str, Any]
) -> Sequence[types.TextContent]:
    
    if name == "read_file":
        rel_path = arguments["path"].lstrip("/")
        full_path = WORKSPACE / rel_path
        
        # Security check: prevent path traversal
        try:
            full_path = full_path.resolve()
            full_path.relative_to(WORKSPACE.resolve())
        except ValueError:
            return [types.TextContent(type="text", text=f"Error: Path outside workspace")]
        
        if not full_path.exists():
            return [types.TextContent(type="text", text=f"Error: File not found")]
        
        return [types.TextContent(type="text", text=full_path.read_text(encoding="utf-8"))]
    
    elif name == "write_file":
        rel_path = arguments["path"].lstrip("/")
        full_path = (WORKSPACE / rel_path).resolve()
        full_path.parent.mkdir(parents=True, exist_ok=True)
        full_path.write_text(arguments["content"], encoding="utf-8")
        return [types.TextContent(type="text", text=f"Successfully wrote {arguments['path']}")]
    
    elif name == "run_command":
        timeout = arguments.get("timeout", 30)
        try:
            result = subprocess.run(
                arguments["command"], shell=True, cwd=str(WORKSPACE),
                capture_output=True, text=True, timeout=timeout
            )
            parts = []
            if result.stdout:
                parts.append(f"stdout:\n{result.stdout}")
            if result.stderr:
                parts.append(f"stderr:\n{result.stderr}")
            parts.append(f"exit code: {result.returncode}")
            return [types.TextContent(type="text", text="\n".join(parts))]
        except subprocess.TimeoutExpired:
            return [types.TextContent(type="text", text=f"Error: Command timed out after {timeout}s")]
    
    raise ValueError(f"Unknown tool: {name}")

35.2.3 Implementing Resources

@app.list_resources()
async def list_resources() -> list[types.Resource]:
    resources = []
    
    for file_path in WORKSPACE.rglob("*"):
        if file_path.is_file() and file_path.suffix in [
            ".py", ".js", ".ts", ".md", ".txt", ".json", ".yaml"
        ]:
            rel_path = file_path.relative_to(WORKSPACE)
            resources.append(types.Resource(
                uri=f"file://workspace/{rel_path}",
                name=str(rel_path),
                description=f"Workspace file: {rel_path}",
                mimeType="text/plain"
            ))
    
    resources.append(types.Resource(
        uri="workspace://tree",
        name="Directory Tree",
        description="Complete workspace directory structure",
        mimeType="text/plain"
    ))
    return resources


@app.read_resource()
async def read_resource(uri: str) -> str:
    if uri == "workspace://tree":
        # Generate a simple directory listing
        lines = []
        for p in sorted(WORKSPACE.rglob("*")):
            if ".git" not in p.parts and "__pycache__" not in p.parts:
                rel = p.relative_to(WORKSPACE)
                indent = "  " * (len(rel.parts) - 1)
                lines.append(f"{indent}{p.name}{'/' if p.is_dir() else ''}")
        return "\n".join(lines)
    
    if uri.startswith("file://workspace/"):
        rel = uri[len("file://workspace/"):]
        return (WORKSPACE / rel).read_text(encoding="utf-8")
    
    raise ValueError(f"Unknown URI: {uri}")

35.2.4 Implementing Prompt Templates

@app.list_prompts()
async def list_prompts() -> list[types.Prompt]:
    return [
        types.Prompt(
            name="code_review",
            description="Perform a comprehensive code review on a workspace file",
            arguments=[
                types.PromptArgument(name="file_path", description="File to review", required=True),
                types.PromptArgument(name="focus", description="performance/security/readability/all", required=False)
            ]
        )
    ]


@app.get_prompt()
async def get_prompt(name: str, arguments: dict[str, str] | None) -> types.GetPromptResult:
    arguments = arguments or {}
    
    if name == "code_review":
        file_path = arguments.get("file_path", "")
        focus = arguments.get("focus", "all")
        
        try:
            code = (WORKSPACE / file_path.lstrip("/")).read_text(encoding="utf-8")
        except FileNotFoundError:
            code = f"[File {file_path} not found]"
        
        return types.GetPromptResult(
            description=f"Code review: {file_path}",
            messages=[
                types.PromptMessage(
                    role="user",
                    content=types.TextContent(
                        type="text",
                        text=f"Please review this code (focus: {focus}):\n\nFile: {file_path}\n\n```\n{code}\n```\n\nProvide: 1) Overall assessment 2) Issues found (Critical/Warning/Suggestion) 3) Actionable improvements 4) Sample improved code"
                    )
                )
            ]
        )
    
    raise ValueError(f"Unknown prompt: {name}")


async def main():
    async with stdio_server() as (read_stream, write_stream):
        await app.run(read_stream, write_stream, app.create_initialization_options())

if __name__ == "__main__":
    asyncio.run(main())

35.3 Complete TypeScript MCP Server

35.3.1 Project Setup

mkdir mcp-todo-server && cd mcp-todo-server
npm init -y
npm install @modelcontextprotocol/sdk
npm install -D typescript @types/node
npx tsc --init

35.3.2 Full Todo Manager Server

// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs";

interface Todo {
  id: string;
  title: string;
  completed: boolean;
  priority: "low" | "medium" | "high";
  tags: string[];
  createdAt: string;
}

const DATA_FILE = "todos.json";
const loadTodos = (): Todo[] =>
  fs.existsSync(DATA_FILE) ? JSON.parse(fs.readFileSync(DATA_FILE, "utf-8")) : [];
const saveTodos = (todos: Todo[]) =>
  fs.writeFileSync(DATA_FILE, JSON.stringify(todos, null, 2));

const server = new Server(
  { name: "todo-manager", version: "1.0.0" },
  { capabilities: { tools: {}, resources: {} } }
);

server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "create_todo",
      description: "Create a new todo item",
      inputSchema: {
        type: "object",
        properties: {
          title: { type: "string" },
          priority: { type: "string", enum: ["low", "medium", "high"], default: "medium" },
          tags: { type: "array", items: { type: "string" }, default: [] },
        },
        required: ["title"],
      },
    },
    {
      name: "list_todos",
      description: "List todos with optional filters",
      inputSchema: {
        type: "object",
        properties: {
          completed: { type: "boolean" },
          priority: { type: "string", enum: ["low", "medium", "high"] },
        },
      },
    },
    {
      name: "complete_todo",
      description: "Mark a todo as completed",
      inputSchema: {
        type: "object",
        properties: { id: { type: "string" } },
        required: ["id"],
      },
    },
  ],
}));

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args = {} } = request.params;

  if (name === "create_todo") {
    const todos = loadTodos();
    const todo: Todo = {
      id: Date.now().toString(36),
      title: args.title as string,
      completed: false,
      priority: (args.priority as Todo["priority"]) || "medium",
      tags: (args.tags as string[]) || [],
      createdAt: new Date().toISOString(),
    };
    todos.push(todo);
    saveTodos(todos);
    return {
      content: [{ type: "text", text: `Created todo: ${todo.title} (ID: ${todo.id})` }],
    };
  }

  if (name === "list_todos") {
    let todos = loadTodos();
    if (typeof args.completed === "boolean") {
      todos = todos.filter((t) => t.completed === args.completed);
    }
    if (args.priority) {
      todos = todos.filter((t) => t.priority === args.priority);
    }
    const text = todos.length === 0
      ? "No todos found"
      : todos.map((t) => `${t.completed ? "โœ…" : "โณ"} [${t.priority}] ${t.title} (${t.id})`).join("\n");
    return { content: [{ type: "text", text }] };
  }

  if (name === "complete_todo") {
    const todos = loadTodos();
    const todo = todos.find((t) => t.id === args.id);
    if (!todo) {
      return { content: [{ type: "text", text: `Todo ${args.id} not found` }], isError: true };
    }
    todo.completed = true;
    saveTodos(todos);
    return { content: [{ type: "text", text: `Completed: ${todo.title}` }] };
  }

  throw new Error(`Unknown tool: ${name}`);
});

server.setRequestHandler(ListResourcesRequestSchema, async () => ({
  resources: [
    {
      uri: "todos://stats",
      name: "Todo Statistics",
      description: "Completion rates and priority breakdown",
      mimeType: "text/plain",
    },
  ],
}));

server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  if (request.params.uri === "todos://stats") {
    const todos = loadTodos();
    const done = todos.filter((t) => t.completed).length;
    const text = `Total: ${todos.length}\nCompleted: ${done}\nPending: ${todos.length - done}\nCompletion rate: ${todos.length ? Math.round((done / todos.length) * 100) : 0}%`;
    return { contents: [{ uri: "todos://stats", mimeType: "text/plain", text }] };
  }
  throw new Error(`Unknown resource: ${request.params.uri}`);
});

const transport = new StdioServerTransport();
server.connect(transport);

35.4 Debugging and Testing

35.4.1 MCP Inspector

# Install the official debugging tool
npm install -g @modelcontextprotocol/inspector

# Connect to Python server
mcp-inspector python server.py

# Connect to Node.js server
mcp-inspector node dist/index.js

The Inspector provides a web UI for:

35.4.2 Claude Desktop Integration Test

{
  "mcpServers": {
    "project-assistant": {
      "command": "python",
      "args": ["/path/to/server.py"],
      "env": { "WORKSPACE": "/path/to/your/workspace" }
    },
    "todo-manager": {
      "command": "node",
      "args": ["/path/to/dist/index.js"]
    }
  }
}

35.4.3 Automated Testing

# test_server.py
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def test_mcp_server():
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"],
        env={"WORKSPACE": "/tmp/test_workspace"}
    )
    
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()
            
            tools = await session.list_tools()
            assert len(tools.tools) > 0, "Should have tools"
            
            result = await session.call_tool("write_file", {
                "path": "test.txt", "content": "Hello, MCP!"
            })
            assert not result.isError
            
            result = await session.call_tool("read_file", {"path": "test.txt"})
            assert "Hello, MCP!" in result.content[0].text
            
            print("All tests passed!")

asyncio.run(test_mcp_server())

35.5 Publishing Your MCP Server

35.5.1 Publish to npm (TypeScript/Node.js)

{
  "bin": { "mcp-todo-server": "./dist/index.js" }
}
npm run build
npm publish
# Users can then use: npx -y your-package-name

35.5.2 Publish to PyPI (Python)

# pyproject.toml
[project]
name = "mcp-project-assistant"
version = "1.0.0"
requires-python = ">=3.10"
dependencies = ["mcp>=1.0.0"]

[project.scripts]
mcp-project-assistant = "server:main"

35.6 Production-Ready Considerations

Input validation: Always validate tool arguments before using them. Use JSON Schema validation libraries rather than manual checks.

Path traversal prevention: Any tool that accesses the filesystem must verify that resolved paths stay within the permitted root. The pattern shown above (.resolve() then .relative_to()) is the correct approach.

Error handling: Use isError: true in tool responses to signal failures. This lets the AI model recognize that the tool failed and decide how to recover, rather than treating error text as a successful result.

Logging to stderr: All diagnostic logging should go to stderr (not stdout), since stdout is reserved for the protocol's JSON-RPC messages.

Resource cleanup: For servers that hold open connections (database handles, file watchers), implement proper cleanup on process termination signals.


Summary

This chapter walked through building two complete MCP Servers from scratch โ€” a Python project assistant and a TypeScript todo manager โ€” demonstrating all three capability types (Tools, Resources, Prompts) with production-quality patterns.

Key implementation points:

  1. Use the official SDK; avoid hand-coding the JSON-RPC protocol
  2. Tool inputSchema definitions directly affect how well the AI calls your tools โ€” be precise
  3. Security checks (path traversal prevention, input validation) are non-negotiable for production servers
  4. Use the MCP Inspector for interactive debugging during development
  5. Signal tool failures with isError: true so the AI model can recover gracefully

The next chapter covers integrating MCP Servers into Claude Code and Claude Desktop client applications.

Rate this chapter
4.8  / 5  (3 ratings)

๐Ÿ’ฌ Comments