构建 MCP Server
构建 MCP Server
L1:概念层——MCP 是什么,为什么重要
AI 工具集成的碎片化困境
在 MCP(Model Context Protocol)出现之前,AI 工具集成是一片混乱的战场。每个 AI 平台、每个助手应用,都有自己独特的工具集成方式。想把一个数据库查询工具接入 Claude?你需要实现一套特定的 API。同样的工具接入 GPT?又是另一套。接入 Gemini?再另一套。
这种碎片化造成了巨大的重复劳动。开发者为同一个功能编写三遍不同的集成代码,而且每当 AI 平台更新 API,所有集成都需要同步更新。这不仅浪费开发资源,更严重阻碍了 AI 工具生态的繁荣。
MCP 的出现就像 HTTP 协议对于 Web 的意义:一个通用的、开放的标准,让任何工具都能以统一的方式接入任何 AI 系统。
MCP 的核心价值主张
MCP(Model Context Protocol)是 Anthropic 于 2024 年 11 月发布的开放标准协议。它定义了 AI 模型(client)与外部工具提供者(server)之间的通信规范。
为什么 MCP 重要?
统一接口:一个 MCP Server 可以同时被 Claude Desktop、任何支持 MCP 的 IDE 插件、以及任何自定义的 AI 应用使用,无需为每个客户端单独实现。
关注点分离:工具实现者只需关注工具的业务逻辑,不需要了解上层 AI 应用的实现细节。AI 应用开发者只需对接 MCP 协议,不需要关心各个工具的内部实现。
安全性:MCP 的设计天然支持将工具运行在独立进程中,与 AI 应用隔离。敏感数据(比如数据库密码)只存在于 MCP Server 进程内,不会暴露给 AI 应用。
可组合性:MCP Proxy 可以将多个 MCP Server 聚合成一个,为 AI 应用提供统一的入口。
stdio 与 HTTP 传输的选择
MCP 支持两种传输层:
stdio(标准输入/输出)传输:
- MCP Server 作为子进程启动,通过 stdin/stdout 与客户端通信
- 优点:简单、天然隔离、无需网络配置、进程结束时自动清理
- 适用场景:本地开发工具、IDE 插件、命令行工具
HTTP+SSE(Server-Sent Events)传输:
- MCP Server 作为独立的 HTTP 服务运行
- 优点:可部署在远程服务器、支持多客户端并发连接、可通过负载均衡器扩展
- 适用场景:云端共享工具、企业内部服务、需要认证的 API
本章主要讲解 stdio 传输(更常见的本地部署场景),在 L4 中会介绍 HTTP+SSE 传输。
L2:原理层——MCP 协议详解
JSON-RPC 2.0 基础
MCP 建立在 JSON-RPC 2.0 协议之上。JSON-RPC 是一个轻量级的远程过程调用协议,非常适合作为 MCP 的基础:
// 请求(Request)
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "query_database",
"arguments": {
"sql": "SELECT * FROM users LIMIT 10"
}
}
}
// 响应(Response)
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [
{
"type": "text",
"text": "[{\"id\": 1, \"name\": \"Alice\"}, ...]"
}
]
}
}
// 错误响应
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32602,
"message": "Invalid SQL syntax"
}
}
// 通知(Notification,无 id,不需要响应)
{
"jsonrpc": "2.0",
"method": "notifications/tools/list_changed"
}
初始化握手(Initialize Handshake)
MCP 会话始于初始化握手:
// 1. 客户端发送 initialize 请求
{
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {"listChanged": true},
"sampling": {}
},
"clientInfo": {
"name": "Claude Desktop",
"version": "0.7.0"
}
}
}
// 2. Server 响应,宣告自己的能力
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": {"listChanged": true},
"resources": {"subscribe": true, "listChanged": true},
"prompts": {"listChanged": true}
},
"serverInfo": {
"name": "my-mcp-server",
"version": "1.0.0"
}
}
}
// 3. 客户端发送 initialized 通知(确认完成)
{
"jsonrpc": "2.0",
"method": "initialized"
}
tools/list 与 tools/call
列出工具:
// 请求
{"jsonrpc": "2.0", "id": 2, "method": "tools/list"}
// 响应
{
"jsonrpc": "2.0",
"id": 2,
"result": {
"tools": [
{
"name": "query_database",
"description": "Execute a read-only SQL query against the database",
"inputSchema": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "The SQL query to execute (SELECT only)"
}
},
"required": ["sql"]
}
}
]
}
}
调用工具:
// 请求
{
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "query_database",
"arguments": {"sql": "SELECT count(*) FROM orders WHERE status = 'pending'"}
}
}
// 响应
{
"jsonrpc": "2.0",
"id": 3,
"result": {
"content": [
{
"type": "text",
"text": "Result: 42 pending orders"
}
],
"isError": false
}
}
Resources(资源访问)
Resources 允许 MCP Server 暴露文件、URL 或其他数据资源,供 AI 读取:
// 列出资源
{"jsonrpc": "2.0", "id": 4, "method": "resources/list"}
// 响应包含资源列表:{ "resources": [{"uri": "file:///logs/app.log", "name": "App Log", "mimeType": "text/plain"}] }
// 读取资源
{"jsonrpc": "2.0", "id": 5, "method": "resources/read", "params": {"uri": "file:///logs/app.log"}}
Prompts(可复用提示模板)
Prompts 是服务端定义的可复用提示模板,让用户可以快速调用常见的 AI 工作流:
// 列出 prompts
{"jsonrpc": "2.0", "id": 6, "method": "prompts/list"}
// 获取具体 prompt
{
"jsonrpc": "2.0",
"id": 7,
"method": "prompts/get",
"params": {
"name": "analyze_slow_queries",
"arguments": {"threshold_ms": "1000"}
}
}
// 响应:包含填充了参数的消息列表,可直接发送给 LLM
Sampling(请求客户端 LLM 生成)
Sampling 是 MCP 的高级特性:MCP Server 可以请求客户端使用其内置的 LLM 生成文本。这使得 MCP Server 本身也可以拥有 AI 能力,而不需要自己维护 API key:
// Server 发送给 Client 的 sampling 请求
{
"jsonrpc": "2.0",
"id": 8,
"method": "sampling/createMessage",
"params": {
"messages": [
{"role": "user", "content": {"type": "text", "text": "Summarize this log: ..."}}
],
"maxTokens": 1000
}
}
L3:代码实践——用 Go 构建完整 MCP Server
核心框架:JSON-RPC 调度器
// mcp/server.go
package mcp
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"sync"
"sync/atomic"
)
// JSONRPCRequest 代表一个 JSON-RPC 请求
type JSONRPCRequest struct {
JSONRPC string `json:"jsonrpc"`
ID *json.RawMessage `json:"id,omitempty"` // nil 表示通知
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
}
// JSONRPCResponse 代表一个 JSON-RPC 响应
type JSONRPCResponse struct {
JSONRPC string `json:"jsonrpc"`
ID *json.RawMessage `json:"id,omitempty"`
Result interface{} `json:"result,omitempty"`
Error *JSONRPCError `json:"error,omitempty"`
}
// JSONRPCError 代表 JSON-RPC 错误
type JSONRPCError struct {
Code int `json:"code"`
Message string `json:"message"`
}
// 标准 JSON-RPC 错误码
const (
ErrParseError = -32700
ErrInvalidRequest = -32600
ErrMethodNotFound = -32601
ErrInvalidParams = -32602
ErrInternalError = -32603
)
// Handler 是方法处理器的类型
type Handler func(ctx context.Context, params json.RawMessage) (interface{}, error)
// Server 是 MCP 服务器的核心
type Server struct {
name string
version string
handlers map[string]Handler
tools *ToolRegistry
mu sync.RWMutex
reader *bufio.Reader
writer io.Writer
wrMu sync.Mutex
nextID atomic.Int64
logger *slog.Logger
}
func NewServer(name, version string) *Server {
s := &Server{
name: name,
version: version,
handlers: make(map[string]Handler),
tools: NewToolRegistry(),
reader: bufio.NewReader(os.Stdin),
writer: os.Stdout,
logger: slog.New(slog.NewTextHandler(os.Stderr, nil)),
}
// 注册内置方法
s.handlers["initialize"] = s.handleInitialize
s.handlers["initialized"] = s.handleInitialized
s.handlers["tools/list"] = s.handleToolsList
s.handlers["tools/call"] = s.handleToolsCall
s.handlers["resources/list"] = s.handleResourcesList
s.handlers["resources/read"] = s.handleResourcesRead
s.handlers["prompts/list"] = s.handlePromptsList
s.handlers["prompts/get"] = s.handlePromptsGet
return s
}
// Run 启动 MCP 服务器,从 stdin 读取请求,向 stdout 写入响应
func (s *Server) Run(ctx context.Context) error {
s.logger.Info("MCP server started", "name", s.name, "version", s.version)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
line, err := s.reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
s.logger.Info("Client disconnected")
return nil
}
return fmt.Errorf("read error: %w", err)
}
if len(line) == 0 {
continue
}
// 异步处理请求,保持顺序响应
go s.handleRequest(ctx, line)
}
}
func (s *Server) handleRequest(ctx context.Context, data []byte) {
var req JSONRPCRequest
if err := json.Unmarshal(data, &req); err != nil {
s.sendError(nil, ErrParseError, "Parse error: "+err.Error())
return
}
if req.JSONRPC != "2.0" {
s.sendError(req.ID, ErrInvalidRequest, "Invalid JSON-RPC version")
return
}
s.logger.Debug("Handling request", "method", req.Method, "id", string(jsonOrNull(req.ID)))
handler, ok := s.handlers[req.Method]
if !ok {
// 通知不需要响应
if req.ID == nil {
return
}
s.sendError(req.ID, ErrMethodNotFound, fmt.Sprintf("Method not found: %s", req.Method))
return
}
result, err := handler(ctx, req.Params)
if err != nil {
s.sendError(req.ID, ErrInternalError, err.Error())
return
}
// 通知(没有 id)不需要响应
if req.ID == nil {
return
}
s.sendResult(req.ID, result)
}
func (s *Server) sendResult(id *json.RawMessage, result interface{}) {
resp := JSONRPCResponse{
JSONRPC: "2.0",
ID: id,
Result: result,
}
s.writeResponse(resp)
}
func (s *Server) sendError(id *json.RawMessage, code int, message string) {
resp := JSONRPCResponse{
JSONRPC: "2.0",
ID: id,
Error: &JSONRPCError{Code: code, Message: message},
}
s.writeResponse(resp)
}
func (s *Server) writeResponse(resp JSONRPCResponse) {
data, err := json.Marshal(resp)
if err != nil {
s.logger.Error("Failed to marshal response", "error", err)
return
}
s.wrMu.Lock()
defer s.wrMu.Unlock()
_, err = fmt.Fprintf(s.writer, "%s\n", data)
if err != nil {
s.logger.Error("Failed to write response", "error", err)
}
}
func jsonOrNull(v *json.RawMessage) []byte {
if v == nil {
return []byte("null")
}
return *v
}
内置方法处理器
// mcp/handlers.go
package mcp
import (
"context"
"encoding/json"
)
type ServerCapabilities struct {
Tools *ToolsCapability `json:"tools,omitempty"`
Resources *ResourcesCapability `json:"resources,omitempty"`
Prompts *PromptsCapability `json:"prompts,omitempty"`
}
type ToolsCapability struct{ ListChanged bool `json:"listChanged"` }
type ResourcesCapability struct {
Subscribe bool `json:"subscribe"`
ListChanged bool `json:"listChanged"`
}
type PromptsCapability struct{ ListChanged bool `json:"listChanged"` }
func (s *Server) handleInitialize(ctx context.Context, params json.RawMessage) (interface{}, error) {
return map[string]interface{}{
"protocolVersion": "2024-11-05",
"capabilities": ServerCapabilities{
Tools: &ToolsCapability{ListChanged: false},
Resources: &ResourcesCapability{Subscribe: false, ListChanged: false},
Prompts: &PromptsCapability{ListChanged: false},
},
"serverInfo": map[string]string{
"name": s.name,
"version": s.version,
},
}, nil
}
func (s *Server) handleInitialized(ctx context.Context, params json.RawMessage) (interface{}, error) {
s.logger.Info("Client initialized successfully")
return nil, nil
}
func (s *Server) handleToolsList(ctx context.Context, params json.RawMessage) (interface{}, error) {
return map[string]interface{}{
"tools": s.tools.List(),
}, nil
}
func (s *Server) handleToolsCall(ctx context.Context, params json.RawMessage) (interface{}, error) {
var req struct {
Name string `json:"name"`
Arguments json.RawMessage `json:"arguments"`
}
if err := json.Unmarshal(params, &req); err != nil {
return nil, err
}
result, isError, err := s.tools.Call(ctx, req.Name, req.Arguments)
if err != nil {
// 工具执行错误以 isError=true 返回,而不是 JSON-RPC 错误
return map[string]interface{}{
"content": []map[string]string{
{"type": "text", "text": "Error: " + err.Error()},
},
"isError": true,
}, nil
}
return map[string]interface{}{
"content": []map[string]string{
{"type": "text", "text": result},
},
"isError": isError,
}, nil
}
func (s *Server) handleResourcesList(ctx context.Context, params json.RawMessage) (interface{}, error) {
return map[string]interface{}{"resources": []interface{}{}}, nil
}
func (s *Server) handleResourcesRead(ctx context.Context, params json.RawMessage) (interface{}, error) {
return nil, fmt.Errorf("resource not found")
}
func (s *Server) handlePromptsList(ctx context.Context, params json.RawMessage) (interface{}, error) {
return map[string]interface{}{"prompts": []interface{}{}}, nil
}
func (s *Server) handlePromptsGet(ctx context.Context, params json.RawMessage) (interface{}, error) {
return nil, fmt.Errorf("prompt not found")
}
工具注册表与三个实用工具
// mcp/tools.go
package mcp
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
_ "github.com/lib/pq"
)
// ToolDefinition 是 MCP 工具定义
type ToolDefinition struct {
Name string `json:"name"`
Description string `json:"description"`
InputSchema json.RawMessage `json:"inputSchema"`
}
// MCPToolHandler 是工具处理器,返回结果字符串和是否错误
type MCPToolHandler func(ctx context.Context, args json.RawMessage) (string, bool, error)
// ToolRegistry 管理 MCP 工具
type ToolRegistry struct {
defs []ToolDefinition
handlers map[string]MCPToolHandler
}
func NewToolRegistry() *ToolRegistry {
return &ToolRegistry{handlers: make(map[string]MCPToolHandler)}
}
func (r *ToolRegistry) Register(def ToolDefinition, handler MCPToolHandler) {
r.defs = append(r.defs, def)
r.handlers[def.Name] = handler
}
func (r *ToolRegistry) List() []ToolDefinition {
return r.defs
}
func (r *ToolRegistry) Call(ctx context.Context, name string, args json.RawMessage) (string, bool, error) {
handler, ok := r.handlers[name]
if !ok {
return "", true, fmt.Errorf("tool not found: %s", name)
}
return handler(ctx, args)
}
// --- 工具 1:数据库查询 ---
type DatabaseTool struct {
db *sql.DB
}
func NewDatabaseTool(dsn string) (*DatabaseTool, error) {
db, err := sql.Open("postgres", dsn)
if err != nil {
return nil, fmt.Errorf("open database: %w", err)
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("ping database: %w", err)
}
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(5)
return &DatabaseTool{db: db}, nil
}
func (t *DatabaseTool) Definition() ToolDefinition {
return ToolDefinition{
Name: "query_database",
Description: "Execute a read-only SQL query against the PostgreSQL database. Returns results as formatted text. Only SELECT statements are allowed for security.",
InputSchema: json.RawMessage(`{
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "The SQL SELECT query to execute"
},
"max_rows": {
"type": "integer",
"description": "Maximum number of rows to return (default: 100)",
"default": 100
}
},
"required": ["sql"]
}`),
}
}
func (t *DatabaseTool) Handle(ctx context.Context, args json.RawMessage) (string, bool, error) {
var req struct {
SQL string `json:"sql"`
MaxRows int `json:"max_rows"`
}
if err := json.Unmarshal(args, &req); err != nil {
return "", true, fmt.Errorf("invalid arguments: %w", err)
}
if req.MaxRows == 0 {
req.MaxRows = 100
}
// 安全检查:只允许 SELECT 语句
sqlUpper := strings.ToUpper(strings.TrimSpace(req.SQL))
if !strings.HasPrefix(sqlUpper, "SELECT") {
return "", true, fmt.Errorf("only SELECT statements are allowed")
}
// 添加 LIMIT 防止返回过多数据
limitedSQL := fmt.Sprintf("SELECT * FROM (%s) AS q LIMIT %d", req.SQL, req.MaxRows)
rows, err := t.db.QueryContext(ctx, limitedSQL)
if err != nil {
return "", true, fmt.Errorf("query error: %w", err)
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
return "", true, err
}
var result strings.Builder
result.WriteString(strings.Join(columns, "\t") + "\n")
result.WriteString(strings.Repeat("-", 40) + "\n")
rowCount := 0
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for i := range values {
valuePtrs[i] = &values[i]
}
for rows.Next() {
if err := rows.Scan(valuePtrs...); err != nil {
return "", true, err
}
var parts []string
for _, v := range values {
parts = append(parts, fmt.Sprintf("%v", v))
}
result.WriteString(strings.Join(parts, "\t") + "\n")
rowCount++
}
result.WriteString(fmt.Sprintf("\n%d row(s) returned", rowCount))
return result.String(), false, nil
}
// --- 工具 2:文件读取 ---
type FileReaderTool struct {
allowedDirs []string
}
func NewFileReaderTool(allowedDirs []string) *FileReaderTool {
return &FileReaderTool{allowedDirs: allowedDirs}
}
func (t *FileReaderTool) Definition() ToolDefinition {
return ToolDefinition{
Name: "read_file",
Description: "Read the contents of a file from the allowed directories. Use this to access logs, configuration files, or source code.",
InputSchema: json.RawMessage(`{
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Absolute path to the file"
}
},
"required": ["path"]
}`),
}
}
func (t *FileReaderTool) Handle(ctx context.Context, args json.RawMessage) (string, bool, error) {
var req struct {
Path string `json:"path"`
}
if err := json.Unmarshal(args, &req); err != nil {
return "", true, err
}
// 安全检查:只允许访问指定目录
allowed := false
for _, dir := range t.allowedDirs {
if strings.HasPrefix(req.Path, dir) {
allowed = true
break
}
}
if !allowed || strings.Contains(req.Path, "..") {
return "", true, fmt.Errorf("access denied: path not in allowed directories")
}
f, err := os.Open(req.Path)
if err != nil {
return "", true, fmt.Errorf("open file: %w", err)
}
defer f.Close()
content, err := io.ReadAll(io.LimitReader(f, 512*1024)) // 最多 512KB
if err != nil {
return "", true, err
}
return string(content), false, nil
}
// --- 工具 3:HTTP 请求 ---
type HTTPFetcherTool struct {
client *http.Client
}
func NewHTTPFetcherTool() *HTTPFetcherTool {
return &HTTPFetcherTool{
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (t *HTTPFetcherTool) Definition() ToolDefinition {
return ToolDefinition{
Name: "fetch_url",
Description: "Fetch the content of a URL via HTTP GET. Returns the response body as text. Useful for accessing REST APIs or reading web pages.",
InputSchema: json.RawMessage(`{
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The URL to fetch"
},
"headers": {
"type": "object",
"description": "Optional HTTP headers to include",
"additionalProperties": {"type": "string"}
}
},
"required": ["url"]
}`),
}
}
func (t *HTTPFetcherTool) Handle(ctx context.Context, args json.RawMessage) (string, bool, error) {
var req struct {
URL string `json:"url"`
Headers map[string]string `json:"headers"`
}
if err := json.Unmarshal(args, &req); err != nil {
return "", true, err
}
httpReq, err := http.NewRequestWithContext(ctx, "GET", req.URL, nil)
if err != nil {
return "", true, fmt.Errorf("create request: %w", err)
}
for k, v := range req.Headers {
httpReq.Header.Set(k, v)
}
resp, err := t.client.Do(httpReq)
if err != nil {
return "", true, fmt.Errorf("fetch failed: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 最多 1MB
if err != nil {
return "", true, err
}
result := fmt.Sprintf("HTTP %d\n\n%s", resp.StatusCode, string(body))
return result, resp.StatusCode >= 400, nil
}
主程序:打包为独立可执行文件
// cmd/my-mcp-server/main.go
package main
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/yourorg/mcp"
)
func main() {
// 从环境变量读取配置
dbDSN := os.Getenv("DATABASE_URL")
allowedDirs := []string{"/var/logs", "/etc/myapp"}
// 创建 MCP 服务器
server := mcp.NewServer("my-tools-server", "1.0.0")
// 注册工具
if dbDSN != "" {
dbTool, err := mcp.NewDatabaseTool(dbDSN)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to connect to database: %v\n", err)
} else {
server.RegisterTool(dbTool.Definition(), dbTool.Handle)
}
}
fileTool := mcp.NewFileReaderTool(allowedDirs)
server.RegisterTool(fileTool.Definition(), fileTool.Handle)
httpTool := mcp.NewHTTPFetcherTool()
server.RegisterTool(httpTool.Definition(), httpTool.Handle)
// 优雅关闭
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
cancel()
}()
if err := server.Run(ctx); err != nil && err != context.Canceled {
fmt.Fprintf(os.Stderr, "Server error: %v\n", err)
os.Exit(1)
}
}
集成到 Claude Desktop
在 ~/Library/Application Support/Claude/claude_desktop_config.json 中添加:
{
"mcpServers": {
"my-tools": {
"command": "/usr/local/bin/my-mcp-server",
"env": {
"DATABASE_URL": "postgres://user:pass@localhost/mydb"
}
}
}
}
重启 Claude Desktop 后,你的工具就可以在对话中使用了。
L4:进阶——HTTP+SSE、认证、MCP Proxy 与动态发现
MCP over HTTP+SSE 传输
HTTP+SSE 传输使 MCP Server 可以作为持久化的网络服务运行:
// mcp/http_server.go
package mcp
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
)
type HTTPTransport struct {
server *Server
clients sync.Map // sessionID -> chan []byte
}
func NewHTTPTransport(server *Server) *HTTPTransport {
return &HTTPTransport{server: server}
}
func (t *HTTPTransport) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/sse":
t.handleSSE(w, r)
case "/message":
t.handleMessage(w, r)
default:
http.NotFound(w, r)
}
}
// handleSSE 建立 SSE 连接,服务器通过此连接推送响应
func (t *HTTPTransport) handleSSE(w http.ResponseWriter, r *http.Request) {
sessionID := generateSessionID()
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
ch := make(chan []byte, 100)
t.clients.Store(sessionID, ch)
defer t.clients.Delete(sessionID)
// 发送端点信息
fmt.Fprintf(w, "event: endpoint\ndata: /message?sessionId=%s\n\n", sessionID)
w.(http.Flusher).Flush()
for {
select {
case <-r.Context().Done():
return
case data := <-ch:
fmt.Fprintf(w, "data: %s\n\n", data)
w.(http.Flusher).Flush()
}
}
}
// handleMessage 接收客户端的 JSON-RPC 请求
func (t *HTTPTransport) handleMessage(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
sessionID := r.URL.Query().Get("sessionId")
ch, ok := t.clients.Load(sessionID)
if !ok {
http.Error(w, "Session not found", http.StatusNotFound)
return
}
var req JSONRPCRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid JSON", http.StatusBadRequest)
return
}
// 处理请求,将响应发送到 SSE 通道
go func() {
result, handlerErr := t.server.handleRequestInternal(r.Context(), &req)
if req.ID == nil {
return // 通知不需要响应
}
var resp JSONRPCResponse
if handlerErr != nil {
resp = JSONRPCResponse{
JSONRPC: "2.0",
ID: req.ID,
Error: &JSONRPCError{Code: ErrInternalError, Message: handlerErr.Error()},
}
} else {
resp = JSONRPCResponse{JSONRPC: "2.0", ID: req.ID, Result: result}
}
data, _ := json.Marshal(resp)
ch.(chan []byte) <- data
}()
w.WriteHeader(http.StatusAccepted)
}
MCP Server 认证
对于生产部署,需要认证机制:
// 基于 API Key 的认证中间件
func AuthMiddleware(validKeys map[string]string, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
apiKey := r.Header.Get("X-API-Key")
if apiKey == "" {
// 也检查 Bearer token
auth := r.Header.Get("Authorization")
if strings.HasPrefix(auth, "Bearer ") {
apiKey = strings.TrimPrefix(auth, "Bearer ")
}
}
clientName, ok := validKeys[apiKey]
if !ok {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
// 将客户端信息注入 context
ctx := context.WithValue(r.Context(), clientKey{}, clientName)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
MCP Proxy:聚合多个 MCP Server
MCP Proxy 将多个 MCP Server 聚合为一个,对客户端透明:
// mcp/proxy.go
type MCPProxy struct {
upstreams map[string]*UpstreamServer
mu sync.RWMutex
}
type UpstreamServer struct {
name string
prefix string // 工具名前缀,如 "db_" "file_"
// 与上游 server 的通信通道
stdin io.WriteCloser
stdout *bufio.Reader
mu sync.Mutex
}
// 聚合所有上游的工具列表
func (p *MCPProxy) AggregateTools(ctx context.Context) ([]ToolDefinition, error) {
p.mu.RLock()
defer p.mu.RUnlock()
var allTools []ToolDefinition
for _, upstream := range p.upstreams {
tools, err := upstream.ListTools(ctx)
if err != nil {
continue // 降级:忽略失败的上游
}
// 添加前缀避免名称冲突
for _, t := range tools {
t.Name = upstream.prefix + t.Name
allTools = append(allTools, t)
}
}
return allTools, nil
}
// 路由工具调用到对应的上游 server
func (p *MCPProxy) RouteToolCall(ctx context.Context, name string, args json.RawMessage) (string, error) {
p.mu.RLock()
defer p.mu.RUnlock()
for _, upstream := range p.upstreams {
if strings.HasPrefix(name, upstream.prefix) {
originalName := strings.TrimPrefix(name, upstream.prefix)
return upstream.CallTool(ctx, originalName, args)
}
}
return "", fmt.Errorf("no upstream found for tool: %s", name)
}
mark3labs/mcp-go 库深度解析
mark3labs/mcp-go 是目前最成熟的 Go MCP 库,封装了大量样板代码:
// 使用 mcp-go 库的等价实现
import "github.com/mark3labs/mcp-go/mcp"
import "github.com/mark3labs/mcp-go/server"
func buildServerWithLibrary() *server.MCPServer {
s := server.NewMCPServer(
"my-tools-server",
"1.0.0",
server.WithToolCapabilities(false),
)
// 添加工具(使用更简洁的 DSL)
s.AddTool(mcp.NewTool("query_database",
mcp.WithDescription("Execute a read-only SQL query"),
mcp.WithString("sql",
mcp.Required(),
mcp.Description("The SQL SELECT query"),
),
mcp.WithNumber("max_rows",
mcp.Description("Maximum rows to return"),
),
), handleDatabaseQuery)
return s
}
func handleDatabaseQuery(ctx context.Context, req mcp.CallToolRequest) (*mcp.CallToolResult, error) {
sql, _ := req.Params.Arguments["sql"].(string)
// ... 执行查询 ...
return mcp.NewToolResultText(result), nil
}
相比手写 JSON-RPC 处理器,mcp-go 库:
- 自动处理协议握手和版本协商
- 提供类型安全的参数提取
- 内置 stdio 和 SSE 传输支持
- 自动处理并发和消息队列
小结
构建一个生产级 MCP Server 需要掌握:
- JSON-RPC 2.0 基础:请求/响应/通知的区别,错误码的含义
- MCP 生命周期:初始化握手→能力协商→工具调用→关闭
- 工具设计:清晰的描述、精确的 schema、健壮的错误处理
- 安全性:路径限制、SQL 注入防护、认证机制
- 可扩展性:HTTP+SSE 传输、Proxy 模式、动态工具注册
MCP 的真正价值在于生态效应:当越来越多的工具实现 MCP 协议,AI 助手的能力边界将持续扩展,而每个工具只需实现一次。