第 51 章

构建 MCP Server

构建 MCP Server

L1:概念层——MCP 是什么,为什么重要

AI 工具集成的碎片化困境

在 MCP(Model Context Protocol)出现之前,AI 工具集成是一片混乱的战场。每个 AI 平台、每个助手应用,都有自己独特的工具集成方式。想把一个数据库查询工具接入 Claude?你需要实现一套特定的 API。同样的工具接入 GPT?又是另一套。接入 Gemini?再另一套。

这种碎片化造成了巨大的重复劳动。开发者为同一个功能编写三遍不同的集成代码,而且每当 AI 平台更新 API,所有集成都需要同步更新。这不仅浪费开发资源,更严重阻碍了 AI 工具生态的繁荣。

MCP 的出现就像 HTTP 协议对于 Web 的意义:一个通用的、开放的标准,让任何工具都能以统一的方式接入任何 AI 系统。

MCP 的核心价值主张

MCP(Model Context Protocol)是 Anthropic 于 2024 年 11 月发布的开放标准协议。它定义了 AI 模型(client)与外部工具提供者(server)之间的通信规范。

为什么 MCP 重要?

统一接口:一个 MCP Server 可以同时被 Claude Desktop、任何支持 MCP 的 IDE 插件、以及任何自定义的 AI 应用使用,无需为每个客户端单独实现。

关注点分离:工具实现者只需关注工具的业务逻辑,不需要了解上层 AI 应用的实现细节。AI 应用开发者只需对接 MCP 协议,不需要关心各个工具的内部实现。

安全性:MCP 的设计天然支持将工具运行在独立进程中,与 AI 应用隔离。敏感数据(比如数据库密码)只存在于 MCP Server 进程内,不会暴露给 AI 应用。

可组合性:MCP Proxy 可以将多个 MCP Server 聚合成一个,为 AI 应用提供统一的入口。

stdio 与 HTTP 传输的选择

MCP 支持两种传输层:

stdio(标准输入/输出)传输

HTTP+SSE(Server-Sent Events)传输

本章主要讲解 stdio 传输(更常见的本地部署场景),在 L4 中会介绍 HTTP+SSE 传输。


L2:原理层——MCP 协议详解

JSON-RPC 2.0 基础

MCP 建立在 JSON-RPC 2.0 协议之上。JSON-RPC 是一个轻量级的远程过程调用协议,非常适合作为 MCP 的基础:

// 请求(Request)
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "query_database",
    "arguments": {
      "sql": "SELECT * FROM users LIMIT 10"
    }
  }
}

// 响应(Response)
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "[{\"id\": 1, \"name\": \"Alice\"}, ...]"
      }
    ]
  }
}

// 错误响应
{
  "jsonrpc": "2.0",
  "id": 1,
  "error": {
    "code": -32602,
    "message": "Invalid SQL syntax"
  }
}

// 通知(Notification,无 id,不需要响应)
{
  "jsonrpc": "2.0",
  "method": "notifications/tools/list_changed"
}

初始化握手(Initialize Handshake)

MCP 会话始于初始化握手:

// 1. 客户端发送 initialize 请求
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "initialize",
  "params": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "roots": {"listChanged": true},
      "sampling": {}
    },
    "clientInfo": {
      "name": "Claude Desktop",
      "version": "0.7.0"
    }
  }
}

// 2. Server 响应,宣告自己的能力
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "protocolVersion": "2024-11-05",
    "capabilities": {
      "tools": {"listChanged": true},
      "resources": {"subscribe": true, "listChanged": true},
      "prompts": {"listChanged": true}
    },
    "serverInfo": {
      "name": "my-mcp-server",
      "version": "1.0.0"
    }
  }
}

// 3. 客户端发送 initialized 通知(确认完成)
{
  "jsonrpc": "2.0",
  "method": "initialized"
}

tools/list 与 tools/call

列出工具

// 请求
{"jsonrpc": "2.0", "id": 2, "method": "tools/list"}

// 响应
{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "tools": [
      {
        "name": "query_database",
        "description": "Execute a read-only SQL query against the database",
        "inputSchema": {
          "type": "object",
          "properties": {
            "sql": {
              "type": "string",
              "description": "The SQL query to execute (SELECT only)"
            }
          },
          "required": ["sql"]
        }
      }
    ]
  }
}

调用工具

// 请求
{
  "jsonrpc": "2.0",
  "id": 3,
  "method": "tools/call",
  "params": {
    "name": "query_database",
    "arguments": {"sql": "SELECT count(*) FROM orders WHERE status = 'pending'"}
  }
}

// 响应
{
  "jsonrpc": "2.0",
  "id": 3,
  "result": {
    "content": [
      {
        "type": "text",
        "text": "Result: 42 pending orders"
      }
    ],
    "isError": false
  }
}

Resources(资源访问)

Resources 允许 MCP Server 暴露文件、URL 或其他数据资源,供 AI 读取:

// 列出资源
{"jsonrpc": "2.0", "id": 4, "method": "resources/list"}
// 响应包含资源列表:{ "resources": [{"uri": "file:///logs/app.log", "name": "App Log", "mimeType": "text/plain"}] }

// 读取资源
{"jsonrpc": "2.0", "id": 5, "method": "resources/read", "params": {"uri": "file:///logs/app.log"}}

Prompts(可复用提示模板)

Prompts 是服务端定义的可复用提示模板,让用户可以快速调用常见的 AI 工作流:

// 列出 prompts
{"jsonrpc": "2.0", "id": 6, "method": "prompts/list"}

// 获取具体 prompt
{
  "jsonrpc": "2.0",
  "id": 7,
  "method": "prompts/get",
  "params": {
    "name": "analyze_slow_queries",
    "arguments": {"threshold_ms": "1000"}
  }
}
// 响应:包含填充了参数的消息列表,可直接发送给 LLM

Sampling(请求客户端 LLM 生成)

Sampling 是 MCP 的高级特性:MCP Server 可以请求客户端使用其内置的 LLM 生成文本。这使得 MCP Server 本身也可以拥有 AI 能力,而不需要自己维护 API key:

// Server 发送给 Client 的 sampling 请求
{
  "jsonrpc": "2.0",
  "id": 8,
  "method": "sampling/createMessage",
  "params": {
    "messages": [
      {"role": "user", "content": {"type": "text", "text": "Summarize this log: ..."}}
    ],
    "maxTokens": 1000
  }
}

L3:代码实践——用 Go 构建完整 MCP Server

核心框架:JSON-RPC 调度器

// mcp/server.go
package mcp

import (
    "bufio"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "log/slog"
    "os"
    "sync"
    "sync/atomic"
)

// JSONRPCRequest 代表一个 JSON-RPC 请求
type JSONRPCRequest struct {
    JSONRPC string          `json:"jsonrpc"`
    ID      *json.RawMessage `json:"id,omitempty"` // nil 表示通知
    Method  string          `json:"method"`
    Params  json.RawMessage `json:"params,omitempty"`
}

// JSONRPCResponse 代表一个 JSON-RPC 响应
type JSONRPCResponse struct {
    JSONRPC string           `json:"jsonrpc"`
    ID      *json.RawMessage `json:"id,omitempty"`
    Result  interface{}      `json:"result,omitempty"`
    Error   *JSONRPCError    `json:"error,omitempty"`
}

// JSONRPCError 代表 JSON-RPC 错误
type JSONRPCError struct {
    Code    int    `json:"code"`
    Message string `json:"message"`
}

// 标准 JSON-RPC 错误码
const (
    ErrParseError     = -32700
    ErrInvalidRequest = -32600
    ErrMethodNotFound = -32601
    ErrInvalidParams  = -32602
    ErrInternalError  = -32603
)

// Handler 是方法处理器的类型
type Handler func(ctx context.Context, params json.RawMessage) (interface{}, error)

// Server 是 MCP 服务器的核心
type Server struct {
    name     string
    version  string
    handlers map[string]Handler
    tools    *ToolRegistry
    mu       sync.RWMutex

    reader *bufio.Reader
    writer io.Writer
    wrMu   sync.Mutex

    nextID atomic.Int64
    logger *slog.Logger
}

func NewServer(name, version string) *Server {
    s := &Server{
        name:     name,
        version:  version,
        handlers: make(map[string]Handler),
        tools:    NewToolRegistry(),
        reader:   bufio.NewReader(os.Stdin),
        writer:   os.Stdout,
        logger:   slog.New(slog.NewTextHandler(os.Stderr, nil)),
    }

    // 注册内置方法
    s.handlers["initialize"] = s.handleInitialize
    s.handlers["initialized"] = s.handleInitialized
    s.handlers["tools/list"] = s.handleToolsList
    s.handlers["tools/call"] = s.handleToolsCall
    s.handlers["resources/list"] = s.handleResourcesList
    s.handlers["resources/read"] = s.handleResourcesRead
    s.handlers["prompts/list"] = s.handlePromptsList
    s.handlers["prompts/get"] = s.handlePromptsGet

    return s
}

// Run 启动 MCP 服务器,从 stdin 读取请求,向 stdout 写入响应
func (s *Server) Run(ctx context.Context) error {
    s.logger.Info("MCP server started", "name", s.name, "version", s.version)

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        line, err := s.reader.ReadBytes('\n')
        if err != nil {
            if err == io.EOF {
                s.logger.Info("Client disconnected")
                return nil
            }
            return fmt.Errorf("read error: %w", err)
        }

        if len(line) == 0 {
            continue
        }

        // 异步处理请求,保持顺序响应
        go s.handleRequest(ctx, line)
    }
}

func (s *Server) handleRequest(ctx context.Context, data []byte) {
    var req JSONRPCRequest
    if err := json.Unmarshal(data, &req); err != nil {
        s.sendError(nil, ErrParseError, "Parse error: "+err.Error())
        return
    }

    if req.JSONRPC != "2.0" {
        s.sendError(req.ID, ErrInvalidRequest, "Invalid JSON-RPC version")
        return
    }

    s.logger.Debug("Handling request", "method", req.Method, "id", string(jsonOrNull(req.ID)))

    handler, ok := s.handlers[req.Method]
    if !ok {
        // 通知不需要响应
        if req.ID == nil {
            return
        }
        s.sendError(req.ID, ErrMethodNotFound, fmt.Sprintf("Method not found: %s", req.Method))
        return
    }

    result, err := handler(ctx, req.Params)
    if err != nil {
        s.sendError(req.ID, ErrInternalError, err.Error())
        return
    }

    // 通知(没有 id)不需要响应
    if req.ID == nil {
        return
    }

    s.sendResult(req.ID, result)
}

func (s *Server) sendResult(id *json.RawMessage, result interface{}) {
    resp := JSONRPCResponse{
        JSONRPC: "2.0",
        ID:      id,
        Result:  result,
    }
    s.writeResponse(resp)
}

func (s *Server) sendError(id *json.RawMessage, code int, message string) {
    resp := JSONRPCResponse{
        JSONRPC: "2.0",
        ID:      id,
        Error:   &JSONRPCError{Code: code, Message: message},
    }
    s.writeResponse(resp)
}

func (s *Server) writeResponse(resp JSONRPCResponse) {
    data, err := json.Marshal(resp)
    if err != nil {
        s.logger.Error("Failed to marshal response", "error", err)
        return
    }

    s.wrMu.Lock()
    defer s.wrMu.Unlock()

    _, err = fmt.Fprintf(s.writer, "%s\n", data)
    if err != nil {
        s.logger.Error("Failed to write response", "error", err)
    }
}

func jsonOrNull(v *json.RawMessage) []byte {
    if v == nil {
        return []byte("null")
    }
    return *v
}

内置方法处理器

// mcp/handlers.go
package mcp

import (
    "context"
    "encoding/json"
)

type ServerCapabilities struct {
    Tools     *ToolsCapability     `json:"tools,omitempty"`
    Resources *ResourcesCapability `json:"resources,omitempty"`
    Prompts   *PromptsCapability   `json:"prompts,omitempty"`
}

type ToolsCapability     struct{ ListChanged bool `json:"listChanged"` }
type ResourcesCapability struct {
    Subscribe   bool `json:"subscribe"`
    ListChanged bool `json:"listChanged"`
}
type PromptsCapability struct{ ListChanged bool `json:"listChanged"` }

func (s *Server) handleInitialize(ctx context.Context, params json.RawMessage) (interface{}, error) {
    return map[string]interface{}{
        "protocolVersion": "2024-11-05",
        "capabilities": ServerCapabilities{
            Tools:     &ToolsCapability{ListChanged: false},
            Resources: &ResourcesCapability{Subscribe: false, ListChanged: false},
            Prompts:   &PromptsCapability{ListChanged: false},
        },
        "serverInfo": map[string]string{
            "name":    s.name,
            "version": s.version,
        },
    }, nil
}

func (s *Server) handleInitialized(ctx context.Context, params json.RawMessage) (interface{}, error) {
    s.logger.Info("Client initialized successfully")
    return nil, nil
}

func (s *Server) handleToolsList(ctx context.Context, params json.RawMessage) (interface{}, error) {
    return map[string]interface{}{
        "tools": s.tools.List(),
    }, nil
}

func (s *Server) handleToolsCall(ctx context.Context, params json.RawMessage) (interface{}, error) {
    var req struct {
        Name      string          `json:"name"`
        Arguments json.RawMessage `json:"arguments"`
    }
    if err := json.Unmarshal(params, &req); err != nil {
        return nil, err
    }

    result, isError, err := s.tools.Call(ctx, req.Name, req.Arguments)
    if err != nil {
        // 工具执行错误以 isError=true 返回,而不是 JSON-RPC 错误
        return map[string]interface{}{
            "content": []map[string]string{
                {"type": "text", "text": "Error: " + err.Error()},
            },
            "isError": true,
        }, nil
    }

    return map[string]interface{}{
        "content": []map[string]string{
            {"type": "text", "text": result},
        },
        "isError": isError,
    }, nil
}

func (s *Server) handleResourcesList(ctx context.Context, params json.RawMessage) (interface{}, error) {
    return map[string]interface{}{"resources": []interface{}{}}, nil
}

func (s *Server) handleResourcesRead(ctx context.Context, params json.RawMessage) (interface{}, error) {
    return nil, fmt.Errorf("resource not found")
}

func (s *Server) handlePromptsList(ctx context.Context, params json.RawMessage) (interface{}, error) {
    return map[string]interface{}{"prompts": []interface{}{}}, nil
}

func (s *Server) handlePromptsGet(ctx context.Context, params json.RawMessage) (interface{}, error) {
    return nil, fmt.Errorf("prompt not found")
}

工具注册表与三个实用工具

// mcp/tools.go
package mcp

import (
    "context"
    "database/sql"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "os"
    "strings"
    "time"

    _ "github.com/lib/pq"
)

// ToolDefinition 是 MCP 工具定义
type ToolDefinition struct {
    Name        string          `json:"name"`
    Description string          `json:"description"`
    InputSchema json.RawMessage `json:"inputSchema"`
}

// MCPToolHandler 是工具处理器,返回结果字符串和是否错误
type MCPToolHandler func(ctx context.Context, args json.RawMessage) (string, bool, error)

// ToolRegistry 管理 MCP 工具
type ToolRegistry struct {
    defs     []ToolDefinition
    handlers map[string]MCPToolHandler
}

func NewToolRegistry() *ToolRegistry {
    return &ToolRegistry{handlers: make(map[string]MCPToolHandler)}
}

func (r *ToolRegistry) Register(def ToolDefinition, handler MCPToolHandler) {
    r.defs = append(r.defs, def)
    r.handlers[def.Name] = handler
}

func (r *ToolRegistry) List() []ToolDefinition {
    return r.defs
}

func (r *ToolRegistry) Call(ctx context.Context, name string, args json.RawMessage) (string, bool, error) {
    handler, ok := r.handlers[name]
    if !ok {
        return "", true, fmt.Errorf("tool not found: %s", name)
    }
    return handler(ctx, args)
}

// --- 工具 1:数据库查询 ---

type DatabaseTool struct {
    db *sql.DB
}

func NewDatabaseTool(dsn string) (*DatabaseTool, error) {
    db, err := sql.Open("postgres", dsn)
    if err != nil {
        return nil, fmt.Errorf("open database: %w", err)
    }
    if err := db.Ping(); err != nil {
        return nil, fmt.Errorf("ping database: %w", err)
    }
    db.SetMaxOpenConns(10)
    db.SetMaxIdleConns(5)
    return &DatabaseTool{db: db}, nil
}

func (t *DatabaseTool) Definition() ToolDefinition {
    return ToolDefinition{
        Name:        "query_database",
        Description: "Execute a read-only SQL query against the PostgreSQL database. Returns results as formatted text. Only SELECT statements are allowed for security.",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "sql": {
                    "type": "string",
                    "description": "The SQL SELECT query to execute"
                },
                "max_rows": {
                    "type": "integer",
                    "description": "Maximum number of rows to return (default: 100)",
                    "default": 100
                }
            },
            "required": ["sql"]
        }`),
    }
}

func (t *DatabaseTool) Handle(ctx context.Context, args json.RawMessage) (string, bool, error) {
    var req struct {
        SQL     string `json:"sql"`
        MaxRows int    `json:"max_rows"`
    }
    if err := json.Unmarshal(args, &req); err != nil {
        return "", true, fmt.Errorf("invalid arguments: %w", err)
    }
    if req.MaxRows == 0 {
        req.MaxRows = 100
    }

    // 安全检查:只允许 SELECT 语句
    sqlUpper := strings.ToUpper(strings.TrimSpace(req.SQL))
    if !strings.HasPrefix(sqlUpper, "SELECT") {
        return "", true, fmt.Errorf("only SELECT statements are allowed")
    }

    // 添加 LIMIT 防止返回过多数据
    limitedSQL := fmt.Sprintf("SELECT * FROM (%s) AS q LIMIT %d", req.SQL, req.MaxRows)

    rows, err := t.db.QueryContext(ctx, limitedSQL)
    if err != nil {
        return "", true, fmt.Errorf("query error: %w", err)
    }
    defer rows.Close()

    columns, err := rows.Columns()
    if err != nil {
        return "", true, err
    }

    var result strings.Builder
    result.WriteString(strings.Join(columns, "\t") + "\n")
    result.WriteString(strings.Repeat("-", 40) + "\n")

    rowCount := 0
    values := make([]interface{}, len(columns))
    valuePtrs := make([]interface{}, len(columns))
    for i := range values {
        valuePtrs[i] = &values[i]
    }

    for rows.Next() {
        if err := rows.Scan(valuePtrs...); err != nil {
            return "", true, err
        }
        var parts []string
        for _, v := range values {
            parts = append(parts, fmt.Sprintf("%v", v))
        }
        result.WriteString(strings.Join(parts, "\t") + "\n")
        rowCount++
    }

    result.WriteString(fmt.Sprintf("\n%d row(s) returned", rowCount))
    return result.String(), false, nil
}

// --- 工具 2:文件读取 ---

type FileReaderTool struct {
    allowedDirs []string
}

func NewFileReaderTool(allowedDirs []string) *FileReaderTool {
    return &FileReaderTool{allowedDirs: allowedDirs}
}

func (t *FileReaderTool) Definition() ToolDefinition {
    return ToolDefinition{
        Name:        "read_file",
        Description: "Read the contents of a file from the allowed directories. Use this to access logs, configuration files, or source code.",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "path": {
                    "type": "string",
                    "description": "Absolute path to the file"
                }
            },
            "required": ["path"]
        }`),
    }
}

func (t *FileReaderTool) Handle(ctx context.Context, args json.RawMessage) (string, bool, error) {
    var req struct {
        Path string `json:"path"`
    }
    if err := json.Unmarshal(args, &req); err != nil {
        return "", true, err
    }

    // 安全检查:只允许访问指定目录
    allowed := false
    for _, dir := range t.allowedDirs {
        if strings.HasPrefix(req.Path, dir) {
            allowed = true
            break
        }
    }
    if !allowed || strings.Contains(req.Path, "..") {
        return "", true, fmt.Errorf("access denied: path not in allowed directories")
    }

    f, err := os.Open(req.Path)
    if err != nil {
        return "", true, fmt.Errorf("open file: %w", err)
    }
    defer f.Close()

    content, err := io.ReadAll(io.LimitReader(f, 512*1024)) // 最多 512KB
    if err != nil {
        return "", true, err
    }

    return string(content), false, nil
}

// --- 工具 3:HTTP 请求 ---

type HTTPFetcherTool struct {
    client *http.Client
}

func NewHTTPFetcherTool() *HTTPFetcherTool {
    return &HTTPFetcherTool{
        client: &http.Client{Timeout: 30 * time.Second},
    }
}

func (t *HTTPFetcherTool) Definition() ToolDefinition {
    return ToolDefinition{
        Name:        "fetch_url",
        Description: "Fetch the content of a URL via HTTP GET. Returns the response body as text. Useful for accessing REST APIs or reading web pages.",
        InputSchema: json.RawMessage(`{
            "type": "object",
            "properties": {
                "url": {
                    "type": "string",
                    "description": "The URL to fetch"
                },
                "headers": {
                    "type": "object",
                    "description": "Optional HTTP headers to include",
                    "additionalProperties": {"type": "string"}
                }
            },
            "required": ["url"]
        }`),
    }
}

func (t *HTTPFetcherTool) Handle(ctx context.Context, args json.RawMessage) (string, bool, error) {
    var req struct {
        URL     string            `json:"url"`
        Headers map[string]string `json:"headers"`
    }
    if err := json.Unmarshal(args, &req); err != nil {
        return "", true, err
    }

    httpReq, err := http.NewRequestWithContext(ctx, "GET", req.URL, nil)
    if err != nil {
        return "", true, fmt.Errorf("create request: %w", err)
    }

    for k, v := range req.Headers {
        httpReq.Header.Set(k, v)
    }

    resp, err := t.client.Do(httpReq)
    if err != nil {
        return "", true, fmt.Errorf("fetch failed: %w", err)
    }
    defer resp.Body.Close()

    body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 最多 1MB
    if err != nil {
        return "", true, err
    }

    result := fmt.Sprintf("HTTP %d\n\n%s", resp.StatusCode, string(body))
    return result, resp.StatusCode >= 400, nil
}

主程序:打包为独立可执行文件

// cmd/my-mcp-server/main.go
package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "syscall"

    "github.com/yourorg/mcp"
)

func main() {
    // 从环境变量读取配置
    dbDSN := os.Getenv("DATABASE_URL")
    allowedDirs := []string{"/var/logs", "/etc/myapp"}

    // 创建 MCP 服务器
    server := mcp.NewServer("my-tools-server", "1.0.0")

    // 注册工具
    if dbDSN != "" {
        dbTool, err := mcp.NewDatabaseTool(dbDSN)
        if err != nil {
            fmt.Fprintf(os.Stderr, "Failed to connect to database: %v\n", err)
        } else {
            server.RegisterTool(dbTool.Definition(), dbTool.Handle)
        }
    }

    fileTool := mcp.NewFileReaderTool(allowedDirs)
    server.RegisterTool(fileTool.Definition(), fileTool.Handle)

    httpTool := mcp.NewHTTPFetcherTool()
    server.RegisterTool(httpTool.Definition(), httpTool.Handle)

    // 优雅关闭
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigCh
        cancel()
    }()

    if err := server.Run(ctx); err != nil && err != context.Canceled {
        fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
        os.Exit(1)
    }
}

集成到 Claude Desktop

~/Library/Application Support/Claude/claude_desktop_config.json 中添加:

{
  "mcpServers": {
    "my-tools": {
      "command": "/usr/local/bin/my-mcp-server",
      "env": {
        "DATABASE_URL": "postgres://user:pass@localhost/mydb"
      }
    }
  }
}

重启 Claude Desktop 后,你的工具就可以在对话中使用了。


L4:进阶——HTTP+SSE、认证、MCP Proxy 与动态发现

MCP over HTTP+SSE 传输

HTTP+SSE 传输使 MCP Server 可以作为持久化的网络服务运行:

// mcp/http_server.go
package mcp

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "sync"
)

type HTTPTransport struct {
    server  *Server
    clients sync.Map // sessionID -> chan []byte
}

func NewHTTPTransport(server *Server) *HTTPTransport {
    return &HTTPTransport{server: server}
}

func (t *HTTPTransport) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    switch r.URL.Path {
    case "/sse":
        t.handleSSE(w, r)
    case "/message":
        t.handleMessage(w, r)
    default:
        http.NotFound(w, r)
    }
}

// handleSSE 建立 SSE 连接,服务器通过此连接推送响应
func (t *HTTPTransport) handleSSE(w http.ResponseWriter, r *http.Request) {
    sessionID := generateSessionID()

    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")
    w.Header().Set("Access-Control-Allow-Origin", "*")

    ch := make(chan []byte, 100)
    t.clients.Store(sessionID, ch)
    defer t.clients.Delete(sessionID)

    // 发送端点信息
    fmt.Fprintf(w, "event: endpoint\ndata: /message?sessionId=%s\n\n", sessionID)
    w.(http.Flusher).Flush()

    for {
        select {
        case <-r.Context().Done():
            return
        case data := <-ch:
            fmt.Fprintf(w, "data: %s\n\n", data)
            w.(http.Flusher).Flush()
        }
    }
}

// handleMessage 接收客户端的 JSON-RPC 请求
func (t *HTTPTransport) handleMessage(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
        return
    }

    sessionID := r.URL.Query().Get("sessionId")
    ch, ok := t.clients.Load(sessionID)
    if !ok {
        http.Error(w, "Session not found", http.StatusNotFound)
        return
    }

    var req JSONRPCRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "Invalid JSON", http.StatusBadRequest)
        return
    }

    // 处理请求,将响应发送到 SSE 通道
    go func() {
        result, handlerErr := t.server.handleRequestInternal(r.Context(), &req)
        if req.ID == nil {
            return // 通知不需要响应
        }

        var resp JSONRPCResponse
        if handlerErr != nil {
            resp = JSONRPCResponse{
                JSONRPC: "2.0",
                ID:      req.ID,
                Error:   &JSONRPCError{Code: ErrInternalError, Message: handlerErr.Error()},
            }
        } else {
            resp = JSONRPCResponse{JSONRPC: "2.0", ID: req.ID, Result: result}
        }

        data, _ := json.Marshal(resp)
        ch.(chan []byte) <- data
    }()

    w.WriteHeader(http.StatusAccepted)
}

MCP Server 认证

对于生产部署,需要认证机制:

// 基于 API Key 的认证中间件
func AuthMiddleware(validKeys map[string]string, next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        apiKey := r.Header.Get("X-API-Key")
        if apiKey == "" {
            // 也检查 Bearer token
            auth := r.Header.Get("Authorization")
            if strings.HasPrefix(auth, "Bearer ") {
                apiKey = strings.TrimPrefix(auth, "Bearer ")
            }
        }

        clientName, ok := validKeys[apiKey]
        if !ok {
            http.Error(w, "Unauthorized", http.StatusUnauthorized)
            return
        }

        // 将客户端信息注入 context
        ctx := context.WithValue(r.Context(), clientKey{}, clientName)
        next.ServeHTTP(w, r.WithContext(ctx))
    })
}

MCP Proxy:聚合多个 MCP Server

MCP Proxy 将多个 MCP Server 聚合为一个,对客户端透明:

// mcp/proxy.go
type MCPProxy struct {
    upstreams map[string]*UpstreamServer
    mu        sync.RWMutex
}

type UpstreamServer struct {
    name   string
    prefix string // 工具名前缀,如 "db_" "file_"
    // 与上游 server 的通信通道
    stdin  io.WriteCloser
    stdout *bufio.Reader
    mu     sync.Mutex
}

// 聚合所有上游的工具列表
func (p *MCPProxy) AggregateTools(ctx context.Context) ([]ToolDefinition, error) {
    p.mu.RLock()
    defer p.mu.RUnlock()

    var allTools []ToolDefinition
    for _, upstream := range p.upstreams {
        tools, err := upstream.ListTools(ctx)
        if err != nil {
            continue // 降级:忽略失败的上游
        }
        // 添加前缀避免名称冲突
        for _, t := range tools {
            t.Name = upstream.prefix + t.Name
            allTools = append(allTools, t)
        }
    }
    return allTools, nil
}

// 路由工具调用到对应的上游 server
func (p *MCPProxy) RouteToolCall(ctx context.Context, name string, args json.RawMessage) (string, error) {
    p.mu.RLock()
    defer p.mu.RUnlock()

    for _, upstream := range p.upstreams {
        if strings.HasPrefix(name, upstream.prefix) {
            originalName := strings.TrimPrefix(name, upstream.prefix)
            return upstream.CallTool(ctx, originalName, args)
        }
    }
    return "", fmt.Errorf("no upstream found for tool: %s", name)
}

mark3labs/mcp-go 库深度解析

mark3labs/mcp-go 是目前最成熟的 Go MCP 库,封装了大量样板代码:

// 使用 mcp-go 库的等价实现
import "github.com/mark3labs/mcp-go/mcp"
import "github.com/mark3labs/mcp-go/server"

func buildServerWithLibrary() *server.MCPServer {
    s := server.NewMCPServer(
        "my-tools-server",
        "1.0.0",
        server.WithToolCapabilities(false),
    )

    // 添加工具(使用更简洁的 DSL)
    s.AddTool(mcp.NewTool("query_database",
        mcp.WithDescription("Execute a read-only SQL query"),
        mcp.WithString("sql",
            mcp.Required(),
            mcp.Description("The SQL SELECT query"),
        ),
        mcp.WithNumber("max_rows",
            mcp.Description("Maximum rows to return"),
        ),
    ), handleDatabaseQuery)

    return s
}

func handleDatabaseQuery(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
    sql, _ := req.Params.Arguments["sql"].(string)
    // ... 执行查询 ...
    return mcp.NewToolResultText(result), nil
}

相比手写 JSON-RPC 处理器,mcp-go 库:

小结

构建一个生产级 MCP Server 需要掌握:

  1. JSON-RPC 2.0 基础:请求/响应/通知的区别,错误码的含义
  2. MCP 生命周期:初始化握手→能力协商→工具调用→关闭
  3. 工具设计:清晰的描述、精确的 schema、健壮的错误处理
  4. 安全性:路径限制、SQL 注入防护、认证机制
  5. 可扩展性:HTTP+SSE 传输、Proxy 模式、动态工具注册

MCP 的真正价值在于生态效应:当越来越多的工具实现 MCP 协议,AI 助手的能力边界将持续扩展,而每个工具只需实现一次。

本章评分
4.8  / 5  (3 评分)

💬 留言讨论