第 67 章

生产安全:权限控制与数据隔离

第67章:生产安全:权限控制与数据隔离

将 Agent 部署到生产环境,安全不再是"可以考虑的选项",而是"不做就出事的必答题"。一个权限过于宽泛的 Agent 相当于在企业内部放了一个无监管的超级用户。本章聚焦生产级安全:最小权限原则的工具层实现、文件系统沙箱、网络访问控制、多租户数据隔离,以及最关键的密钥管理——让 Agent 永远接触不到明文凭证。


67.1 最小权限原则:工具层实现

67.1.1 什么是工具层最小权限

传统系统中,最小权限原则(Principle of Least Privilege, PoLP)指进程只拥有完成其任务所必需的最少权限。在 Agent 体系中,这个原则延伸到工具层:Agent 只能调用完成当前任务所必需的工具集合,且每个工具的能力也被约束到最小必要范围。

最小权限层次结构:
┌─────────────────────────────────────────┐
│  全部工具能力(Universe)               │
│  ┌─────────────────────────────────┐   │
│  │  角色工具集(Role Toolset)     │   │
│  │  ┌─────────────────────────┐   │   │
│  │  │  任务工具集(Task)     │   │   │
│  │  │  ┌─────────────────┐   │   │   │
│  │  │  │ 操作权限(Op)  │   │   │   │
│  │  │  └─────────────────┘   │   │   │
│  │  └─────────────────────────┘   │   │
│  └─────────────────────────────────┘   │
└─────────────────────────────────────────┘

67.1.2 工具权限矩阵设计

from dataclasses import dataclass, field
from typing import Optional, Set, Dict
from enum import Enum

class ToolCapability(Enum):
    # 文件系统
    FILE_READ = "file:read"
    FILE_WRITE = "file:write"
    FILE_DELETE = "file:delete"
    FILE_EXECUTE = "file:execute"
    
    # 网络
    NET_HTTP_GET = "net:http:get"
    NET_HTTP_POST = "net:http:post"
    NET_WEBSOCKET = "net:websocket"
    NET_SMTP = "net:smtp"
    
    # 代码执行
    CODE_PYTHON = "code:python"
    CODE_BASH = "code:bash"
    CODE_SQL = "code:sql"
    
    # 外部服务
    EXT_GITHUB = "ext:github"
    EXT_SLACK = "ext:slack"
    EXT_STRIPE = "ext:stripe"

@dataclass
class ToolPermission:
    """单个工具的权限配置"""
    tool_name: str
    capabilities: Set[ToolCapability]
    
    # 文件系统约束
    allowed_paths: list[str] = field(default_factory=list)
    denied_paths: list[str] = field(default_factory=lambda: ["/etc", "/root", "/sys", "/proc"])
    
    # 网络约束
    allowed_domains: list[str] = field(default_factory=list)
    denied_domains: list[str] = field(default_factory=lambda: ["169.254.0.0/16"])  # 云元数据
    
    # 资源限制
    max_file_size_mb: int = 10
    max_requests_per_minute: int = 60
    max_execution_seconds: int = 30
    
    # 操作约束
    require_confirmation: bool = False  # 危险操作需要人工确认
    audit_all_calls: bool = True

@dataclass
class AgentRole:
    """Agent 角色的权限配置"""
    role_name: str
    description: str
    tool_permissions: Dict[str, ToolPermission]
    
    def can_use_tool(self, tool_name: str) -> bool:
        return tool_name in self.tool_permissions
    
    def get_tool_permission(self, tool_name: str) -> Optional[ToolPermission]:
        return self.tool_permissions.get(tool_name)

# 定义角色
def create_data_analyst_role() -> AgentRole:
    """数据分析师角色:只能读数据,不能写/删"""
    return AgentRole(
        role_name="data_analyst",
        description="数据分析只读角色",
        tool_permissions={
            "read_file": ToolPermission(
                tool_name="read_file",
                capabilities={ToolCapability.FILE_READ},
                allowed_paths=["/data/reports/", "/data/exports/"],
                denied_paths=["/data/secrets/", "/etc/"],
                max_file_size_mb=50,
            ),
            "execute_sql": ToolPermission(
                tool_name="execute_sql",
                capabilities={ToolCapability.CODE_SQL},
                # 只允许 SELECT,禁止 INSERT/UPDATE/DELETE/DROP
                allowed_paths=[],  # SQL不用文件路径
                max_requests_per_minute=30,
            ),
            "web_search": ToolPermission(
                tool_name="web_search",
                capabilities={ToolCapability.NET_HTTP_GET},
                allowed_domains=["google.com", "bing.com", "scholar.google.com"],
                max_requests_per_minute=20,
            ),
        }
    )

def create_devops_role() -> AgentRole:
    """DevOps 角色:可以执行脚本,但路径受限"""
    return AgentRole(
        role_name="devops",
        description="DevOps 受限执行角色",
        tool_permissions={
            "execute_bash": ToolPermission(
                tool_name="execute_bash",
                capabilities={ToolCapability.CODE_BASH},
                allowed_paths=["/opt/scripts/", "/var/app/"],
                denied_paths=["/etc/", "/root/", "/home/", "/boot/"],
                max_execution_seconds=120,
                require_confirmation=True,  # 执行脚本需要人工确认
            ),
            "read_file": ToolPermission(
                tool_name="read_file",
                capabilities={ToolCapability.FILE_READ},
                allowed_paths=["/var/log/", "/opt/app/logs/"],
            ),
        }
    )

67.1.3 运行时权限校验

class PermissionEnforcer:
    """运行时权限执行器"""
    
    def __init__(self, role: AgentRole):
        self.role = role
        self.call_counts: Dict[str, list] = {}  # 用于速率限制
    
    def check_and_execute(self, tool_name: str, args: dict) -> dict:
        """检查权限并执行工具调用"""
        
        # 1. 工具存在检查
        if not self.role.can_use_tool(tool_name):
            return self._deny(f"Tool '{tool_name}' not authorized for role '{self.role.role_name}'")
        
        permission = self.role.get_tool_permission(tool_name)
        
        # 2. 路径访问检查
        if "path" in args:
            path_check = self._check_path(args["path"], permission)
            if not path_check["allowed"]:
                return self._deny(path_check["reason"])
        
        # 3. 域名访问检查
        if "url" in args:
            domain_check = self._check_domain(args["url"], permission)
            if not domain_check["allowed"]:
                return self._deny(domain_check["reason"])
        
        # 4. 速率限制检查
        rate_check = self._check_rate_limit(tool_name, permission)
        if not rate_check["allowed"]:
            return self._deny(rate_check["reason"])
        
        # 5. SQL 注入防护(针对 SQL 工具)
        if tool_name == "execute_sql" and "query" in args:
            sql_check = self._check_sql_safety(args["query"])
            if not sql_check["allowed"]:
                return self._deny(sql_check["reason"])
        
        # 6. 危险操作需要确认
        if permission.require_confirmation:
            return self._request_confirmation(tool_name, args)
        
        # 7. 执行工具
        return self._execute_tool(tool_name, args, permission)
    
    def _check_path(self, path: str, permission: ToolPermission) -> dict:
        """检查文件路径访问权限"""
        import os
        abs_path = os.path.abspath(path)
        
        # 检查黑名单(优先)
        for denied in permission.denied_paths:
            if abs_path.startswith(os.path.abspath(denied)):
                return {"allowed": False, "reason": f"Path '{path}' is in deny list"}
        
        # 检查白名单(如果配置了)
        if permission.allowed_paths:
            allowed = any(
                abs_path.startswith(os.path.abspath(p))
                for p in permission.allowed_paths
            )
            if not allowed:
                return {"allowed": False, "reason": f"Path '{path}' not in allow list"}
        
        return {"allowed": True}
    
    def _check_sql_safety(self, query: str) -> dict:
        """检查 SQL 语句安全性"""
        query_upper = query.strip().upper()
        
        # 禁止的操作(对数据分析角色)
        dangerous_keywords = ["INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER", "CREATE"]
        
        for keyword in dangerous_keywords:
            if query_upper.startswith(keyword) or f" {keyword} " in query_upper:
                return {"allowed": False, "reason": f"SQL operation '{keyword}' not allowed"}
        
        return {"allowed": True}
    
    def _deny(self, reason: str) -> dict:
        import logging
        logging.warning(f"Permission denied: {reason}")
        return {
            "success": False,
            "error": "Permission denied",
            "reason": reason,
            "suggestion": "Contact administrator if this operation is required",
        }
    
    def _request_confirmation(self, tool_name: str, args: dict) -> dict:
        """向人类操作员请求确认(用于高风险操作)"""
        return {
            "success": False,
            "requires_confirmation": True,
            "confirmation_request": {
                "tool": tool_name,
                "args": args,
                "message": f"High-risk operation requires human approval. Please review and confirm.",
                "approval_token": self._generate_approval_token(tool_name, args),
            }
        }

67.2 文件系统沙箱

67.2.1 chroot 沙箱

#!/bin/bash
# 为 Hermes Agent 创建 chroot 沙箱

# 1. 创建沙箱根目录结构
SANDBOX_ROOT="/opt/hermes-sandbox"
mkdir -p $SANDBOX_ROOT/{bin,lib,lib64,usr,tmp,data,scripts}

# 2. 复制必要的系统二进制文件
for binary in python3 bash ls cat grep awk sed find; do
    cp $(which $binary) $SANDBOX_ROOT/bin/
done

# 3. 复制依赖库(使用 ldd 找依赖)
copy_libs() {
    ldd "$1" | grep "=>" | awk '{print $3}' | while read lib; do
        [ -f "$lib" ] && cp --parents "$lib" $SANDBOX_ROOT/
    done
}
for binary in $SANDBOX_ROOT/bin/*; do
    copy_libs "$binary"
done

# 4. 设置权限
chmod 755 $SANDBOX_ROOT
chmod 1777 $SANDBOX_ROOT/tmp  # sticky bit

# 5. 挂载只读数据
mount --bind /data/readonly $SANDBOX_ROOT/data
mount -o remount,ro $SANDBOX_ROOT/data

# 6. 以非特权用户在沙箱中运行
useradd -r -s /bin/false hermes-agent
chroot $SANDBOX_ROOT su -s /bin/bash hermes-agent -c "python3 /scripts/agent.py"

67.2.2 容器化隔离(推荐生产方案)

# docker-compose.yml - Hermes Agent 生产容器配置
version: '3.8'

services:
  hermes-agent:
    image: hermes-agent:latest
    
    # 安全配置
    security_opt:
      - no-new-privileges:true     # 禁止权限提升
      - seccomp:seccomp_profile.json  # 系统调用白名单
    
    # 只读根文件系统
    read_only: true
    
    # 允许写入的临时目录
    tmpfs:
      - /tmp:size=100m,noexec,nosuid
      - /var/run:size=10m
    
    # 用户映射
    user: "1001:1001"  # 非root用户
    
    # 能力限制(删除所有 Linux capabilities)
    cap_drop:
      - ALL
    cap_add:
      - NET_BIND_SERVICE  # 仅允许绑定端口(如果需要)
    
    # 资源限制
    deploy:
      resources:
        limits:
          memory: 2G
          cpus: '1.0'
        reservations:
          memory: 512M
    
    # 网络隔离
    networks:
      - agent-internal  # 内部网络
    
    # 数据卷(只读)
    volumes:
      - type: bind
        source: /data/knowledge-base
        target: /data/kb
        read_only: true
      - type: volume
        source: agent-workspace
        target: /workspace
    
    # 环境变量(不含敏感信息)
    environment:
      - HERMES_MODE=production
      - LOG_LEVEL=INFO
      - ALLOWED_DOMAINS=api.hermes.com,api.openai.com
    
    # 密钥通过 Docker Secrets 注入(不走环境变量)
    secrets:
      - hermes_api_key
      - database_credentials

secrets:
  hermes_api_key:
    external: true
  database_credentials:
    external: true

networks:
  agent-internal:
    driver: bridge
    internal: true  # 禁止直接访问外网

volumes:
  agent-workspace:
    driver: local
// seccomp_profile.json - 系统调用白名单
{
  "defaultAction": "SCMP_ACT_ERRNO",
  "syscalls": [
    {
      "names": [
        "read", "write", "open", "close", "stat", "fstat",
        "mmap", "mprotect", "munmap", "brk", "rt_sigaction",
        "rt_sigprocmask", "ioctl", "access", "pipe", "select",
        "sched_yield", "mremap", "msync", "mincore", "madvise",
        "dup", "dup2", "nanosleep", "getpid", "socket", "connect",
        "accept", "sendto", "recvfrom", "shutdown", "bind",
        "listen", "getsockname", "getpeername", "socketpair",
        "setsockopt", "getsockopt", "clone", "fork", "execve",
        "exit", "wait4", "kill", "uname", "getcwd", "chdir",
        "rename", "mkdir", "rmdir", "unlink", "readlink",
        "chmod", "getuid", "getgid", "geteuid", "getegid",
        "futex", "prctl", "getdents64", "lseek", "fcntl",
        "openat", "newfstatat", "set_tid_address", "exit_group"
      ],
      "action": "SCMP_ACT_ALLOW"
    }
  ]
}

67.3 网络访问控制

67.3.1 出站流量白名单

import ipaddress
from urllib.parse import urlparse
from typing import Optional

class NetworkAccessController:
    """Agent 网络访问控制器"""
    
    def __init__(self, config: dict):
        # 允许的域名(白名单模式)
        self.allowed_domains: set[str] = set(config.get("allowed_domains", []))
        
        # 禁止的 IP 段(黑名单,用于防止 SSRF)
        self.blocked_ip_ranges: list[ipaddress.IPv4Network] = [
            ipaddress.IPv4Network("10.0.0.0/8"),      # 私有网络
            ipaddress.IPv4Network("172.16.0.0/12"),   # 私有网络
            ipaddress.IPv4Network("192.168.0.0/16"),  # 私有网络
            ipaddress.IPv4Network("127.0.0.0/8"),     # 回环
            ipaddress.IPv4Network("169.254.0.0/16"),  # 链路本地(云元数据)
            ipaddress.IPv4Network("100.64.0.0/10"),   # 共享地址空间
        ]
        
        # 允许的协议和端口
        self.allowed_schemes: set[str] = {"https"}  # 仅允许 HTTPS
        self.allowed_ports: set[int] = {443}
    
    def validate(self, url: str) -> dict:
        """验证 URL 是否允许访问"""
        
        try:
            parsed = urlparse(url)
        except Exception as e:
            return {"allowed": False, "reason": f"URL parse error: {e}"}
        
        # 1. 协议检查
        if parsed.scheme not in self.allowed_schemes:
            return {"allowed": False, "reason": f"Scheme '{parsed.scheme}' not allowed (use HTTPS)"}
        
        # 2. 端口检查
        port = parsed.port or 443
        if port not in self.allowed_ports:
            return {"allowed": False, "reason": f"Port {port} not allowed"}
        
        # 3. 域名白名单检查
        domain = parsed.netloc.lower()
        if ":" in domain:
            domain = domain.split(":")[0]
        
        domain_allowed = any(
            domain == allowed or domain.endswith(f".{allowed}")
            for allowed in self.allowed_domains
        )
        if not domain_allowed:
            return {"allowed": False, "reason": f"Domain '{domain}' not in allowlist"}
        
        # 4. SSRF 防护:解析 IP 并检查是否为私有地址
        try:
            import socket
            ip = socket.gethostbyname(domain)
            ip_addr = ipaddress.IPv4Address(ip)
            
            for blocked_range in self.blocked_ip_ranges:
                if ip_addr in blocked_range:
                    return {
                        "allowed": False,
                        "reason": f"IP {ip} resolves to blocked range {blocked_range}"
                    }
        except socket.gaierror:
            return {"allowed": False, "reason": "DNS resolution failed"}
        
        return {"allowed": True, "resolved_ip": ip}
    
    async def safe_fetch(self, url: str, headers: dict = None) -> dict:
        """安全的 HTTP 请求(带访问控制)"""
        import aiohttp
        
        validation = self.validate(url)
        if not validation["allowed"]:
            return {"success": False, "error": validation["reason"]}
        
        # 安全请求头
        safe_headers = {
            "User-Agent": "Hermes-Agent/1.0",
            **(headers or {}),
        }
        # 删除可能泄露内部信息的头
        for dangerous_header in ["X-Forwarded-For", "X-Real-IP"]:
            safe_headers.pop(dangerous_header, None)
        
        async with aiohttp.ClientSession() as session:
            async with session.get(
                url,
                headers=safe_headers,
                timeout=aiohttp.ClientTimeout(total=30),
                max_redirects=3,  # 限制重定向次数(防止重定向攻击)
                allow_redirects=True,
                ssl=True,  # 强制验证 SSL 证书
            ) as response:
                # 限制响应大小(防止内存耗尽)
                content = await response.read()
                if len(content) > 10 * 1024 * 1024:  # 10MB 上限
                    return {"success": False, "error": "Response too large (>10MB)"}
                
                return {
                    "success": True,
                    "status": response.status,
                    "content": content.decode("utf-8", errors="replace"),
                    "content_type": response.headers.get("content-type", ""),
                }

67.4 多租户数据隔离

67.4.1 租户数据隔离方案对比

方案 隔离强度 成本 适用场景
行级隔离(Row-Level Security) 单数据库,信任内部用户
Schema 隔离 同实例,数据量中等
数据库实例隔离 合规要求严格
独立部署 最高 最高 金融/医疗等高敏行业

67.4.2 行级安全隔离实现

from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, Session

class TenantIsolatedSession:
    """
    多租户行级安全 Session
    所有查询自动附加租户过滤条件
    """
    
    def __init__(self, connection_string: str):
        self.engine = create_engine(connection_string)
        self._setup_rls()
    
    def _setup_rls(self):
        """在 PostgreSQL 中设置行级安全策略"""
        with self.engine.connect() as conn:
            # 启用行级安全
            conn.execute("""
                ALTER TABLE agent_conversations ENABLE ROW LEVEL SECURITY;
                ALTER TABLE agent_tool_calls ENABLE ROW LEVEL SECURITY;
                ALTER TABLE agent_files ENABLE ROW LEVEL SECURITY;
                
                -- 策略:用户只能访问自己租户的数据
                CREATE POLICY tenant_isolation_policy 
                ON agent_conversations
                USING (tenant_id = current_setting('app.current_tenant_id'));
                
                CREATE POLICY tenant_isolation_policy 
                ON agent_tool_calls
                USING (tenant_id = current_setting('app.current_tenant_id'));
                
                CREATE POLICY tenant_isolation_policy 
                ON agent_files
                USING (tenant_id = current_setting('app.current_tenant_id'));
            """)
    
    def get_session(self, tenant_id: str) -> Session:
        """获取特定租户的 Session(自动注入租户上下文)"""
        Session = sessionmaker(bind=self.engine)
        session = Session()
        
        # 设置租户上下文(PostgreSQL RLS 会自动过滤)
        session.execute(f"SET app.current_tenant_id = '{tenant_id}'")
        
        return session

class TenantAwareAgentMemory:
    """多租户感知的 Agent 记忆存储"""
    
    def __init__(self, db: TenantIsolatedSession):
        self.db = db
    
    def save_conversation(self, tenant_id: str, conversation: dict) -> str:
        """保存对话记录(自动关联租户)"""
        session = self.db.get_session(tenant_id)
        
        try:
            conversation_with_tenant = {
                **conversation,
                "tenant_id": tenant_id,  # 强制注入 tenant_id
            }
            # 即使 RLS 失效,tenant_id 字段也确保了数据标记
            record = ConversationRecord(**conversation_with_tenant)
            session.add(record)
            session.commit()
            return record.id
        finally:
            session.close()
    
    def get_conversation_history(self, tenant_id: str, conversation_id: str) -> list:
        """获取对话历史(只返回本租户数据)"""
        session = self.db.get_session(tenant_id)
        
        try:
            # 双重保险:RLS + 显式 tenant_id 过滤
            return session.query(ConversationRecord).filter(
                ConversationRecord.tenant_id == tenant_id,  # 显式过滤
                ConversationRecord.id == conversation_id,
            ).all()
        finally:
            session.close()

67.5 密钥管理:让 Agent 永不接触明文凭证

67.5.1 密钥管理架构

错误做法(绝对禁止):
┌─────────────┐     API_KEY=sk-xxx      ┌──────────┐
│   Agent     │ ←──────────────────────  │  环境变量 │
└─────────────┘                          └──────────┘

正确做法(生产级):
┌─────────────┐    请求凭证(role+scope)  ┌──────────────┐
│   Agent     │ ──────────────────────→ │  Vault/KMS   │
│             │ ←── 短期Token(15min) ─── │  密钥服务     │
└─────────────┘                          └──────────────┘
                                                ↕
                                         加密存储的明文密钥

67.5.2 HashiCorp Vault 集成

import hvac
import os
from datetime import datetime, timedelta
from typing import Optional
import threading

class SecretManager:
    """
    密钥管理器:从 Vault 动态获取凭证
    Agent 永远看不到明文 API Key,只获取短期 Token
    """
    
    def __init__(self, vault_url: str, vault_token: str):
        self.client = hvac.Client(url=vault_url, token=vault_token)
        self._cache: dict = {}
        self._cache_lock = threading.Lock()
    
    def get_api_key(self, service: str) -> str:
        """
        获取服务的 API Key
        - 优先从缓存获取(如果未过期)
        - 自动续期
        - 返回给 Agent 的是短期有效 Token,不是原始 Key
        """
        cache_key = f"secret:{service}"
        
        with self._cache_lock:
            cached = self._cache.get(cache_key)
            if cached and cached["expires_at"] > datetime.utcnow():
                return cached["value"]
        
        # 从 Vault 获取
        try:
            secret = self.client.secrets.kv.v2.read_secret_version(
                path=f"hermes-agent/{service}",
                mount_point="secret",
            )
            
            api_key = secret["data"]["data"]["api_key"]
            expires_in = secret["data"]["data"].get("ttl", 900)  # 默认15分钟
            
            with self._cache_lock:
                self._cache[cache_key] = {
                    "value": api_key,
                    "expires_at": datetime.utcnow() + timedelta(seconds=expires_in - 60)
                }
            
            return api_key
            
        except Exception as e:
            raise RuntimeError(f"Failed to retrieve secret for '{service}': {e}")
    
    def get_database_credentials(self, db_role: str) -> dict:
        """
        获取数据库动态凭证(每次生成新的临时用户名/密码)
        使用 Vault 的 Database Secrets Engine
        """
        creds = self.client.secrets.database.generate_credentials(
            name=db_role,
            mount_point="database",
        )
        
        return {
            "username": creds["data"]["username"],
            "password": creds["data"]["password"],
            "lease_id": creds["lease_id"],
            "lease_duration": creds["lease_duration"],  # 自动过期
        }
    
    def revoke_credentials(self, lease_id: str):
        """主动吊销凭证(任务完成后立即吊销)"""
        self.client.sys.revoke_lease(lease_id=lease_id)


class AgentCredentialProxy:
    """
    Agent 凭证代理
    Agent 调用这个代理来访问外部服务,
    代理负责注入真实的 API Key,Agent 本身看不到
    """
    
    def __init__(self, secret_manager: SecretManager):
        self.secrets = secret_manager
    
    async def call_openai(self, messages: list, model: str = "gpt-4") -> dict:
        """代理调用 OpenAI API,Agent 不接触 API Key"""
        import aiohttp
        
        api_key = self.secrets.get_api_key("openai")
        
        async with aiohttp.ClientSession() as session:
            async with session.post(
                "https://api.openai.com/v1/chat/completions",
                headers={
                    "Authorization": f"Bearer {api_key}",
                    "Content-Type": "application/json",
                },
                json={"model": model, "messages": messages}
            ) as response:
                return await response.json()
    
    async def call_github_api(self, endpoint: str, method: str = "GET") -> dict:
        """代理调用 GitHub API"""
        import aiohttp
        
        token = self.secrets.get_api_key("github")
        
        async with aiohttp.ClientSession() as session:
            async with session.request(
                method,
                f"https://api.github.com{endpoint}",
                headers={
                    "Authorization": f"token {token}",
                    "Accept": "application/vnd.github.v3+json",
                }
            ) as response:
                return await response.json()

67.5.3 密钥管理最佳实践总结

SECRETS_BEST_PRACTICES = {
    "存储": {
        "❌ 禁止": [
            "明文硬编码在代码中",
            "存储在环境变量(.env文件)",
            "存储在 Agent 的上下文或记忆中",
            "存储在日志中(确保日志脱敏)",
        ],
        "✅ 推荐": [
            "HashiCorp Vault",
            "AWS Secrets Manager",
            "GCP Secret Manager",
            "Azure Key Vault",
        ]
    },
    "生命周期": {
        "最短有效期": "15分钟(对于高权限服务)",
        "自动轮转": "每30天自动轮转一次",
        "泄露响应": "发现泄露立即吊销,<5分钟内完成",
    },
    "访问控制": {
        "最小权限": "每个服务账号只能访问自己所需的密钥",
        "审计": "所有密钥访问必须记录日志",
        "MFA": "高权限密钥访问需要 MFA",
    }
}

67.6 安全配置完整示例

# hermes-agent-production.yaml - 生产级完整安全配置

agent:
  name: "hermes-production"
  version: "3.0"

security:
  # 权限控制
  permission_model:
    type: "role_based"
    default_role: "read_only"
    roles:
      data_analyst:
        allowed_tools: ["read_file", "execute_sql", "web_search"]
        file_paths: ["/data/reports/", "/data/exports/"]
        sql_operations: ["SELECT"]
        web_domains: ["google.com", "scholar.google.com"]
      
      devops:
        allowed_tools: ["execute_bash", "read_file", "write_file"]
        file_paths: ["/opt/scripts/", "/var/log/"]
        require_confirmation: true
  
  # 沙箱配置
  sandbox:
    type: "docker"
    image: "hermes-sandbox:latest"
    memory_limit: "2Gi"
    cpu_limit: "1.0"
    network_mode: "restricted"
    read_only_root: true
    allowed_syscalls: "seccomp_profile.json"
  
  # 网络控制
  network:
    mode: "whitelist"
    allowed_domains:
      - "api.hermes.nousresearch.com"
      - "api.openai.com"
      - "google.com"
      - "wikipedia.org"
    force_https: true
    block_private_ranges: true
    max_response_size_mb: 10
  
  # 密钥管理
  secrets:
    backend: "vault"
    vault_url: "https://vault.internal.company.com"
    auth_method: "kubernetes"  # 使用 K8s ServiceAccount 认证
    default_ttl: "15m"
    max_ttl: "1h"
  
  # 数据隔离
  data_isolation:
    strategy: "row_level_security"
    tenant_field: "tenant_id"
    enforce_at: ["database", "vector_store", "file_storage"]
  
  # 审计日志
  audit:
    enabled: true
    log_level: "ALL"  # 记录所有工具调用
    storage: "append_only_s3"
    bucket: "hermes-audit-logs-prod"
    encryption: "AES-256"
    retention_days: 365

本章小结

本章建立了 Agent 生产安全的完整防线:

  1. 最小权限:角色权限矩阵、工具能力枚举、运行时权限校验,工具层精细化控制
  2. 文件系统沙箱:chroot(开发测试)和 Docker 容器(生产推荐),seccomp 系统调用过滤
  3. 网络访问控制:域名白名单 + 私有 IP 黑名单(防 SSRF),强制 HTTPS,响应大小限制
  4. 多租户隔离:PostgreSQL 行级安全策略(RLS),双重过滤防止越租户访问
  5. 密钥管理:Vault 动态凭证,Agent 永不接触明文 Key,任务完成后立即吊销

思考题

  1. 最小权限原则在实践中最大的挑战是"权限太严格导致 Agent 无法完成任务"。你会如何在安全和实用性之间找到平衡点?
  2. Docker 容器隔离不是无敌的——容器逃逸漏洞真实存在。对于极高安全要求的场景,你会叠加哪些额外的防御层?
  3. Vault 动态凭证方案的弱点是"Vault 本身成为单点故障"。如何在保证安全的同时提高系统的可用性?
  4. 行级安全策略(RLS)依赖于 current_setting 被正确设置。如果攻击者找到了直接操作数据库连接池的方式,RLS 就失效了。你会如何设计多层数据隔离来防御这种情况?
本章评分
4.6  / 5  (3 评分)

💬 留言讨论