第 35 章

MCP 三大原语:Tools / Resources / Prompts 的设计规范与实现模式

第三十五章:开发自己的 MCP Server:从零构建工具服务

35.1 开发前的准备工作

构建一个 MCP Server 并不需要从零实现 JSON-RPC 协议——Anthropic 官方提供了 Python 和 TypeScript 的 SDK,大幅降低了开发门槛。本章将用两种语言分别演示如何构建一个功能完整的 MCP Server,涵盖 Tools、Resources 和 Prompts 三种能力类型。

35.1.1 SDK 选择

Python SDK(推荐用于数据处理、后端集成)

pip install mcp
# 或带完整依赖
pip install "mcp[cli]"

TypeScript/Node.js SDK(推荐用于前端生态、Node.js 工具链)

npm install @modelcontextprotocol/sdk

35.1.2 开发环境检查

# Python
python --version   # 需要 3.10+
pip show mcp       # 确认 SDK 已安装

# Node.js
node --version     # 需要 18+
npx @modelcontextprotocol/inspector --version  # MCP 调试工具

35.2 第一个 MCP Server:Python 实现

我们将构建一个"项目助手"Server,提供以下功能:

35.2.1 基础结构

# server.py
import asyncio
import os
import subprocess
from pathlib import Path
from typing import Any, Sequence

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp import types

# 创建 Server 实例
app = Server("project-assistant")

# 工作目录(Server 操作的根路径)
WORKSPACE = Path(os.environ.get("WORKSPACE", "/tmp/workspace"))
WORKSPACE.mkdir(parents=True, exist_ok=True)

35.2.2 实现工具(Tools)

@app.list_tools()
async def list_tools() -> list[types.Tool]:
    """声明所有可用工具"""
    return [
        types.Tool(
            name="read_file",
            description="读取工作区内指定文件的内容",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "相对于工作区根目录的文件路径"
                    }
                },
                "required": ["path"]
            }
        ),
        types.Tool(
            name="write_file",
            description="在工作区内写入或创建文件",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "相对于工作区根目录的文件路径"
                    },
                    "content": {
                        "type": "string",
                        "description": "要写入的文件内容"
                    }
                },
                "required": ["path", "content"]
            }
        ),
        types.Tool(
            name="run_command",
            description="在工作区目录中执行 shell 命令",
            inputSchema={
                "type": "object",
                "properties": {
                    "command": {
                        "type": "string",
                        "description": "要执行的 shell 命令"
                    },
                    "timeout": {
                        "type": "integer",
                        "description": "超时时间(秒),默认 30",
                        "default": 30
                    }
                },
                "required": ["command"]
            }
        ),
        types.Tool(
            name="search_files",
            description="在工作区中搜索包含指定关键词的文件",
            inputSchema={
                "type": "object",
                "properties": {
                    "pattern": {
                        "type": "string",
                        "description": "搜索关键词或正则表达式"
                    },
                    "file_pattern": {
                        "type": "string",
                        "description": "文件名匹配模式,如 '*.py'",
                        "default": "*"
                    }
                },
                "required": ["pattern"]
            }
        )
    ]


@app.call_tool()
async def call_tool(
    name: str,
    arguments: dict[str, Any]
) -> Sequence[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    """处理所有工具调用"""
    
    if name == "read_file":
        return await _handle_read_file(arguments)
    elif name == "write_file":
        return await _handle_write_file(arguments)
    elif name == "run_command":
        return await _handle_run_command(arguments)
    elif name == "search_files":
        return await _handle_search_files(arguments)
    else:
        raise ValueError(f"未知工具:{name}")


async def _handle_read_file(args: dict) -> list[types.TextContent]:
    """读取文件内容"""
    rel_path = args["path"].lstrip("/")
    full_path = WORKSPACE / rel_path
    
    # 安全检查:防止路径穿越
    try:
        full_path = full_path.resolve()
        full_path.relative_to(WORKSPACE.resolve())
    except ValueError:
        return [types.TextContent(
            type="text",
            text=f"错误:路径 '{args['path']}' 超出工作区范围"
        )]
    
    if not full_path.exists():
        return [types.TextContent(
            type="text",
            text=f"错误:文件 '{args['path']}' 不存在"
        )]
    
    try:
        content = full_path.read_text(encoding="utf-8")
        return [types.TextContent(type="text", text=content)]
    except UnicodeDecodeError:
        # 二进制文件,返回 base64
        import base64
        content_b64 = base64.b64encode(full_path.read_bytes()).decode()
        return [types.TextContent(
            type="text",
            text=f"[二进制文件,base64 编码]\n{content_b64}"
        )]


async def _handle_write_file(args: dict) -> list[types.TextContent]:
    """写入文件内容"""
    rel_path = args["path"].lstrip("/")
    full_path = WORKSPACE / rel_path
    
    # 安全检查
    try:
        full_path = full_path.resolve()
        full_path.relative_to(WORKSPACE.resolve())
    except ValueError:
        return [types.TextContent(
            type="text",
            text=f"错误:路径超出工作区范围"
        )]
    
    full_path.parent.mkdir(parents=True, exist_ok=True)
    full_path.write_text(args["content"], encoding="utf-8")
    
    return [types.TextContent(
        type="text",
        text=f"成功写入文件:{args['path']}({len(args['content'])} 字符)"
    )]


async def _handle_run_command(args: dict) -> list[types.TextContent]:
    """执行 shell 命令"""
    timeout = args.get("timeout", 30)
    
    try:
        result = subprocess.run(
            args["command"],
            shell=True,
            cwd=str(WORKSPACE),
            capture_output=True,
            text=True,
            timeout=timeout
        )
        
        output_parts = []
        if result.stdout:
            output_parts.append(f"标准输出:\n{result.stdout}")
        if result.stderr:
            output_parts.append(f"标准错误:\n{result.stderr}")
        output_parts.append(f"退出码:{result.returncode}")
        
        return [types.TextContent(
            type="text",
            text="\n".join(output_parts)
        )]
    
    except subprocess.TimeoutExpired:
        return [types.TextContent(
            type="text",
            text=f"错误:命令执行超时({timeout}秒)"
        )]


async def _handle_search_files(args: dict) -> list[types.TextContent]:
    """搜索文件内容"""
    pattern = args["pattern"]
    file_pattern = args.get("file_pattern", "*")
    
    matches = []
    for file_path in WORKSPACE.rglob(file_pattern):
        if file_path.is_file():
            try:
                content = file_path.read_text(encoding="utf-8", errors="ignore")
                if pattern.lower() in content.lower():
                    rel_path = file_path.relative_to(WORKSPACE)
                    # 找到匹配行
                    matching_lines = [
                        f"  行 {i+1}: {line.strip()}"
                        for i, line in enumerate(content.splitlines())
                        if pattern.lower() in line.lower()
                    ][:5]  # 最多显示5行
                    matches.append(f"{rel_path}:\n" + "\n".join(matching_lines))
            except Exception:
                pass
    
    if not matches:
        return [types.TextContent(
            type="text",
            text=f"未找到包含 '{pattern}' 的文件"
        )]
    
    return [types.TextContent(
        type="text",
        text=f"找到 {len(matches)} 个匹配文件:\n\n" + "\n\n".join(matches)
    )]

35.2.3 实现资源(Resources)

@app.list_resources()
async def list_resources() -> list[types.Resource]:
    """声明可访问的资源"""
    resources = []
    
    # 遍历工作区,暴露所有文本文件
    for file_path in WORKSPACE.rglob("*"):
        if file_path.is_file() and file_path.suffix in [
            ".py", ".js", ".ts", ".md", ".txt", ".json", ".yaml", ".toml"
        ]:
            rel_path = file_path.relative_to(WORKSPACE)
            resources.append(types.Resource(
                uri=f"file://workspace/{rel_path}",
                name=str(rel_path),
                description=f"工作区文件:{rel_path}",
                mimeType=_get_mime_type(file_path.suffix)
            ))
    
    # 也暴露目录结构作为特殊资源
    resources.append(types.Resource(
        uri="workspace://tree",
        name="工作区目录树",
        description="完整的工作区目录结构",
        mimeType="text/plain"
    ))
    
    return resources


@app.read_resource()
async def read_resource(uri: str) -> str:
    """读取资源内容"""
    if uri == "workspace://tree":
        return _generate_tree(WORKSPACE)
    
    if uri.startswith("file://workspace/"):
        rel_path = uri[len("file://workspace/"):]
        full_path = WORKSPACE / rel_path
        
        if full_path.exists():
            return full_path.read_text(encoding="utf-8")
        else:
            raise FileNotFoundError(f"资源不存在:{uri}")
    
    raise ValueError(f"不支持的 URI 格式:{uri}")


def _generate_tree(path: Path, prefix: str = "", max_depth: int = 4) -> str:
    """生成目录树字符串"""
    if max_depth == 0:
        return prefix + "...\n"
    
    result = []
    items = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name))
    
    for i, item in enumerate(items):
        is_last = (i == len(items) - 1)
        connector = "└── " if is_last else "├── "
        result.append(prefix + connector + item.name)
        
        if item.is_dir() and item.name not in [".git", "__pycache__", "node_modules"]:
            extension = "    " if is_last else "│   "
            result.append(_generate_tree(item, prefix + extension, max_depth - 1))
    
    return "\n".join(result)


def _get_mime_type(suffix: str) -> str:
    """根据文件扩展名返回 MIME 类型"""
    mime_map = {
        ".py": "text/x-python",
        ".js": "text/javascript",
        ".ts": "text/typescript",
        ".md": "text/markdown",
        ".json": "application/json",
        ".yaml": "text/yaml",
        ".toml": "text/toml",
        ".txt": "text/plain"
    }
    return mime_map.get(suffix, "text/plain")

35.2.4 实现提示模板(Prompts)

@app.list_prompts()
async def list_prompts() -> list[types.Prompt]:
    """声明预定义的提示模板"""
    return [
        types.Prompt(
            name="code_review",
            description="对工作区中的代码文件进行全面审查",
            arguments=[
                types.PromptArgument(
                    name="file_path",
                    description="要审查的文件路径",
                    required=True
                ),
                types.PromptArgument(
                    name="focus",
                    description="审查重点:performance/security/readability/all",
                    required=False
                )
            ]
        ),
        types.Prompt(
            name="explain_code",
            description="解释代码的功能和工作原理",
            arguments=[
                types.PromptArgument(
                    name="file_path",
                    description="要解释的代码文件路径",
                    required=True
                )
            ]
        ),
        types.Prompt(
            name="generate_tests",
            description="为指定代码生成单元测试",
            arguments=[
                types.PromptArgument(
                    name="file_path",
                    description="需要生成测试的源文件路径",
                    required=True
                ),
                types.PromptArgument(
                    name="test_framework",
                    description="测试框架:pytest/unittest/jest",
                    required=False
                )
            ]
        )
    ]


@app.get_prompt()
async def get_prompt(
    name: str,
    arguments: dict[str, str] | None
) -> types.GetPromptResult:
    """返回填充参数后的完整提示"""
    arguments = arguments or {}
    
    if name == "code_review":
        file_path = arguments.get("file_path", "")
        focus = arguments.get("focus", "all")
        
        # 尝试读取文件内容
        try:
            full_path = WORKSPACE / file_path.lstrip("/")
            code_content = full_path.read_text(encoding="utf-8")
        except FileNotFoundError:
            code_content = f"[文件 {file_path} 不存在,请先创建]"
        
        focus_instruction = {
            "performance": "重点关注时间和空间复杂度、潜在的性能瓶颈",
            "security": "重点关注安全漏洞、输入验证、权限检查",
            "readability": "重点关注代码可读性、命名规范、注释质量",
            "all": "全面审查:性能、安全性、可读性、可维护性"
        }.get(focus, "全面审查")
        
        return types.GetPromptResult(
            description=f"代码审查:{file_path}",
            messages=[
                types.PromptMessage(
                    role="user",
                    content=types.TextContent(
                        type="text",
                        text=f"""请对以下代码进行专业的代码审查。

文件路径:{file_path}
审查重点:{focus_instruction}

{code_content}


请按以下格式输出审查结果:
1. **总体评估**(1-2句话)
2. **发现的问题**(分级:严重/警告/建议)
3. **优化建议**(具体可操作的改进点)
4. **示例改进代码**(针对最重要的问题)
"""
                    )
                )
            ]
        )
    
    elif name == "explain_code":
        file_path = arguments.get("file_path", "")
        try:
            full_path = WORKSPACE / file_path.lstrip("/")
            code_content = full_path.read_text(encoding="utf-8")
        except FileNotFoundError:
            code_content = f"[文件 {file_path} 不存在]"
        
        return types.GetPromptResult(
            description=f"代码解释:{file_path}",
            messages=[
                types.PromptMessage(
                    role="user",
                    content=types.TextContent(
                        type="text",
                        text=f"""请详细解释以下代码的功能和工作原理:

文件:{file_path}

{code_content}


解释应包括:
- 代码的整体目的
- 关键算法和数据结构
- 重要函数/方法的作用
- 潜在的使用场景和限制
"""
                    )
                )
            ]
        )
    
    raise ValueError(f"未知提示模板:{name}")

35.2.5 启动 Server

async def main():
    """启动 MCP Server(stdio 模式)"""
    async with stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            app.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

35.3 TypeScript 实现完整 MCP Server

35.3.1 项目初始化

mkdir mcp-todo-server && cd mcp-todo-server
npm init -y
npm install @modelcontextprotocol/sdk
npm install -D typescript @types/node ts-node
npx tsc --init

package.json 关键配置:

{
  "name": "mcp-todo-server",
  "version": "1.0.0",
  "type": "module",
  "bin": {
    "mcp-todo-server": "./dist/index.js"
  },
  "scripts": {
    "build": "tsc",
    "start": "node dist/index.js",
    "dev": "ts-node --esm src/index.ts"
  }
}

35.3.2 完整 Todo 管理 Server

// src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
  ListResourcesRequestSchema,
  ReadResourceRequestSchema,
  ListPromptsRequestSchema,
  GetPromptRequestSchema,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs";
import * as path from "path";

// 数据存储(实际项目中应替换为数据库)
interface Todo {
  id: string;
  title: string;
  description?: string;
  completed: boolean;
  priority: "low" | "medium" | "high";
  tags: string[];
  createdAt: string;
  completedAt?: string;
}

const DATA_FILE = path.join(process.cwd(), "todos.json");

function loadTodos(): Todo[] {
  if (!fs.existsSync(DATA_FILE)) {
    return [];
  }
  return JSON.parse(fs.readFileSync(DATA_FILE, "utf-8"));
}

function saveTodos(todos: Todo[]): void {
  fs.writeFileSync(DATA_FILE, JSON.stringify(todos, null, 2));
}

function generateId(): string {
  return Date.now().toString(36) + Math.random().toString(36).substr(2);
}

// 创建 Server
const server = new Server(
  {
    name: "todo-manager",
    version: "1.0.0",
  },
  {
    capabilities: {
      tools: {},
      resources: {},
      prompts: {},
    },
  }
);

// 工具列表
server.setRequestHandler(ListToolsRequestSchema, async () => ({
  tools: [
    {
      name: "create_todo",
      description: "创建一个新的待办事项",
      inputSchema: {
        type: "object",
        properties: {
          title: { type: "string", description: "待办事项标题" },
          description: { type: "string", description: "详细描述(可选)" },
          priority: {
            type: "string",
            enum: ["low", "medium", "high"],
            default: "medium",
            description: "优先级",
          },
          tags: {
            type: "array",
            items: { type: "string" },
            description: "标签列表",
            default: [],
          },
        },
        required: ["title"],
      },
    },
    {
      name: "list_todos",
      description: "列出所有待办事项,支持过滤",
      inputSchema: {
        type: "object",
        properties: {
          completed: {
            type: "boolean",
            description: "过滤完成状态(不设置则显示全部)",
          },
          priority: {
            type: "string",
            enum: ["low", "medium", "high"],
            description: "按优先级过滤",
          },
          tag: { type: "string", description: "按标签过滤" },
        },
      },
    },
    {
      name: "complete_todo",
      description: "将待办事项标记为已完成",
      inputSchema: {
        type: "object",
        properties: {
          id: { type: "string", description: "待办事项 ID" },
        },
        required: ["id"],
      },
    },
    {
      name: "delete_todo",
      description: "删除一个待办事项",
      inputSchema: {
        type: "object",
        properties: {
          id: { type: "string", description: "待办事项 ID" },
        },
        required: ["id"],
      },
    },
    {
      name: "search_todos",
      description: "搜索包含关键词的待办事项",
      inputSchema: {
        type: "object",
        properties: {
          query: { type: "string", description: "搜索关键词" },
        },
        required: ["query"],
      },
    },
  ],
}));

// 工具调用处理
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args = {} } = request.params;

  switch (name) {
    case "create_todo": {
      const todos = loadTodos();
      const newTodo: Todo = {
        id: generateId(),
        title: args.title as string,
        description: args.description as string | undefined,
        completed: false,
        priority: (args.priority as Todo["priority"]) || "medium",
        tags: (args.tags as string[]) || [],
        createdAt: new Date().toISOString(),
      };
      todos.push(newTodo);
      saveTodos(todos);
      return {
        content: [
          {
            type: "text",
            text: `✅ 已创建待办事项:\nID: ${newTodo.id}\n标题: ${newTodo.title}\n优先级: ${newTodo.priority}`,
          },
        ],
      };
    }

    case "list_todos": {
      let todos = loadTodos();
      if (typeof args.completed === "boolean") {
        todos = todos.filter((t) => t.completed === args.completed);
      }
      if (args.priority) {
        todos = todos.filter((t) => t.priority === args.priority);
      }
      if (args.tag) {
        todos = todos.filter((t) => t.tags.includes(args.tag as string));
      }

      if (todos.length === 0) {
        return { content: [{ type: "text", text: "暂无匹配的待办事项" }] };
      }

      const text = todos
        .map((t) => {
          const status = t.completed ? "✅" : "⏳";
          const tags = t.tags.length > 0 ? ` [${t.tags.join(", ")}]` : "";
          return `${status} [${t.priority.toUpperCase()}] ${t.title}${tags}\n   ID: ${t.id}`;
        })
        .join("\n\n");

      return { content: [{ type: "text", text }] };
    }

    case "complete_todo": {
      const todos = loadTodos();
      const todo = todos.find((t) => t.id === args.id);
      if (!todo) {
        return {
          content: [{ type: "text", text: `错误:未找到 ID 为 ${args.id} 的待办事项` }],
          isError: true,
        };
      }
      todo.completed = true;
      todo.completedAt = new Date().toISOString();
      saveTodos(todos);
      return {
        content: [{ type: "text", text: `✅ 已完成:${todo.title}` }],
      };
    }

    case "delete_todo": {
      const todos = loadTodos();
      const index = todos.findIndex((t) => t.id === args.id);
      if (index === -1) {
        return {
          content: [{ type: "text", text: `错误:未找到 ID 为 ${args.id} 的待办事项` }],
          isError: true,
        };
      }
      const deleted = todos.splice(index, 1)[0];
      saveTodos(todos);
      return {
        content: [{ type: "text", text: `🗑️ 已删除:${deleted.title}` }],
      };
    }

    case "search_todos": {
      const todos = loadTodos();
      const query = (args.query as string).toLowerCase();
      const results = todos.filter(
        (t) =>
          t.title.toLowerCase().includes(query) ||
          t.description?.toLowerCase().includes(query) ||
          t.tags.some((tag) => tag.toLowerCase().includes(query))
      );

      if (results.length === 0) {
        return {
          content: [{ type: "text", text: `未找到包含 "${args.query}" 的待办事项` }],
        };
      }

      const text = results
        .map((t) => `[${t.id}] ${t.completed ? "✅" : "⏳"} ${t.title}`)
        .join("\n");
      return { content: [{ type: "text", text: `找到 ${results.length} 项:\n${text}` }] };
    }

    default:
      throw new Error(`未知工具:${name}`);
  }
});

// 资源列表
server.setRequestHandler(ListResourcesRequestSchema, async () => ({
  resources: [
    {
      uri: "todos://all",
      name: "所有待办事项",
      description: "完整的待办事项数据(JSON 格式)",
      mimeType: "application/json",
    },
    {
      uri: "todos://stats",
      name: "统计信息",
      description: "待办事项的完成率、优先级分布等统计",
      mimeType: "text/plain",
    },
  ],
}));

// 资源读取
server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
  const { uri } = request.params;

  if (uri === "todos://all") {
    const todos = loadTodos();
    return {
      contents: [
        {
          uri,
          mimeType: "application/json",
          text: JSON.stringify(todos, null, 2),
        },
      ],
    };
  }

  if (uri === "todos://stats") {
    const todos = loadTodos();
    const completed = todos.filter((t) => t.completed).length;
    const byPriority = {
      high: todos.filter((t) => t.priority === "high").length,
      medium: todos.filter((t) => t.priority === "medium").length,
      low: todos.filter((t) => t.priority === "low").length,
    };
    const stats = `待办事项统计
总计:${todos.length}
已完成:${completed}(${todos.length ? Math.round((completed / todos.length) * 100) : 0}%)
未完成:${todos.length - completed}

优先级分布:
- 高优先级:${byPriority.high}
- 中优先级:${byPriority.medium}  
- 低优先级:${byPriority.low}`;

    return { contents: [{ uri, mimeType: "text/plain", text: stats }] };
  }

  throw new Error(`未知资源 URI:${uri}`);
});

// 提示模板
server.setRequestHandler(ListPromptsRequestSchema, async () => ({
  prompts: [
    {
      name: "daily_review",
      description: "生成今日待办事项的日报总结",
      arguments: [],
    },
    {
      name: "prioritize",
      description: "分析待办事项并给出优先级建议",
      arguments: [],
    },
  ],
}));

server.setRequestHandler(GetPromptRequestSchema, async (request) => {
  const { name } = request.params;
  const todos = loadTodos();
  const pending = todos.filter((t) => !t.completed);

  if (name === "daily_review") {
    return {
      description: "日报生成",
      messages: [
        {
          role: "user",
          content: {
            type: "text",
            text: `请基于以下待办事项数据,生成一份简洁的日报:

当前未完成事项(${pending.length} 项):
${pending.map((t) => `- [${t.priority}] ${t.title}`).join("\n")}

请分析:1)今日应优先处理哪些事项?2)有哪些可以批量处理?3)是否有已超期风险的事项?`,
          },
        },
      ],
    };
  }

  throw new Error(`未知提示:${name}`);
});

// 启动 Server
const transport = new StdioServerTransport();
server.connect(transport).then(() => {
  process.stderr.write("MCP Todo Server 已启动\n");
});

35.4 调试与测试

35.4.1 使用 MCP Inspector

# 安装 MCP Inspector(官方调试工具)
npm install -g @modelcontextprotocol/inspector

# 启动 Inspector 连接 Python Server
mcp-inspector python server.py

# 启动 Inspector 连接 Node.js Server
mcp-inspector node dist/index.js

Inspector 提供了 Web UI,可以:

35.4.2 使用 Claude Desktop 测试

将 Server 添加到 Claude Desktop 配置:

{
  "mcpServers": {
    "project-assistant": {
      "command": "python",
      "args": ["/path/to/server.py"],
      "env": {
        "WORKSPACE": "/path/to/your/workspace"
      }
    },
    "todo-manager": {
      "command": "node",
      "args": ["/path/to/dist/index.js"]
    }
  }
}

35.4.3 自动化测试

# test_server.py
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def test_mcp_server():
    server_params = StdioServerParameters(
        command="python",
        args=["server.py"],
        env={"WORKSPACE": "/tmp/test_workspace"}
    )
    
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # 初始化
            await session.initialize()
            
            # 测试工具列表
            tools = await session.list_tools()
            assert len(tools.tools) > 0, "应该有工具"
            print(f"工具数量:{len(tools.tools)}")
            
            # 测试写文件
            result = await session.call_tool("write_file", {
                "path": "test.txt",
                "content": "Hello, MCP!"
            })
            assert not result.isError, "写文件不应该报错"
            
            # 测试读文件
            result = await session.call_tool("read_file", {"path": "test.txt"})
            assert "Hello, MCP!" in result.content[0].text
            
            # 测试资源列表
            resources = await session.list_resources()
            print(f"资源数量:{len(resources.resources)}")
            
            print("所有测试通过!")

asyncio.run(test_mcp_server())

35.5 发布 MCP Server

35.5.1 发布到 npm(TypeScript/Node.js)

# 构建
npm run build

# 测试打包后的版本
npx . 

# 发布到 npm
npm publish

# 用户可以通过 npx 使用
# Claude Desktop 配置:
# {"command": "npx", "args": ["-y", "your-package-name"]}

35.5.2 发布到 PyPI(Python)

# pyproject.toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "mcp-project-assistant"
version = "1.0.0"
description = "MCP Server for project file management"
requires-python = ">=3.10"
dependencies = ["mcp>=1.0.0"]

[project.scripts]
mcp-project-assistant = "server:main_sync"
pip build
pip publish

小结

本章通过两个完整的 MCP Server 实现(Python 项目助手和 TypeScript Todo 管理器),演示了如何从零构建具备 Tools、Resources 和 Prompts 三种能力的 MCP Server。

关键实现要点:

  1. 使用官方 SDK,避免手动实现 JSON-RPC 协议
  2. 工具的 inputSchema 要清晰、精确,直接影响 AI 的调用质量
  3. 安全检查(路径穿越防护、输入验证)是生产级 Server 的必备要素
  4. 使用 MCP Inspector 进行交互式调试
  5. 错误响应应设置 isError: true,让 AI 能识别工具失败

下一章将介绍如何在 Claude Code 和 Claude Desktop 等客户端中集成 MCP Server。

本章评分
4.8  / 5  (3 评分)

💬 留言讨论