第 67 章
生产安全:权限控制与数据隔离
第67章:生产安全:权限控制与数据隔离
将 Agent 部署到生产环境,安全不再是"可以考虑的选项",而是"不做就出事的必答题"。一个权限过于宽泛的 Agent 相当于在企业内部放了一个无监管的超级用户。本章聚焦生产级安全:最小权限原则的工具层实现、文件系统沙箱、网络访问控制、多租户数据隔离,以及最关键的密钥管理——让 Agent 永远接触不到明文凭证。
67.1 最小权限原则:工具层实现
67.1.1 什么是工具层最小权限
传统系统中,最小权限原则(Principle of Least Privilege, PoLP)指进程只拥有完成其任务所必需的最少权限。在 Agent 体系中,这个原则延伸到工具层:Agent 只能调用完成当前任务所必需的工具集合,且每个工具的能力也被约束到最小必要范围。
最小权限层次结构:
┌─────────────────────────────────────────┐
│ 全部工具能力(Universe) │
│ ┌─────────────────────────────────┐ │
│ │ 角色工具集(Role Toolset) │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ 任务工具集(Task) │ │ │
│ │ │ ┌─────────────────┐ │ │ │
│ │ │ │ 操作权限(Op) │ │ │ │
│ │ │ └─────────────────┘ │ │ │
│ │ └─────────────────────────┘ │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
67.1.2 工具权限矩阵设计
from dataclasses import dataclass, field
from typing import Optional, Set, Dict
from enum import Enum
class ToolCapability(Enum):
# 文件系统
FILE_READ = "file:read"
FILE_WRITE = "file:write"
FILE_DELETE = "file:delete"
FILE_EXECUTE = "file:execute"
# 网络
NET_HTTP_GET = "net:http:get"
NET_HTTP_POST = "net:http:post"
NET_WEBSOCKET = "net:websocket"
NET_SMTP = "net:smtp"
# 代码执行
CODE_PYTHON = "code:python"
CODE_BASH = "code:bash"
CODE_SQL = "code:sql"
# 外部服务
EXT_GITHUB = "ext:github"
EXT_SLACK = "ext:slack"
EXT_STRIPE = "ext:stripe"
@dataclass
class ToolPermission:
"""单个工具的权限配置"""
tool_name: str
capabilities: Set[ToolCapability]
# 文件系统约束
allowed_paths: list[str] = field(default_factory=list)
denied_paths: list[str] = field(default_factory=lambda: ["/etc", "/root", "/sys", "/proc"])
# 网络约束
allowed_domains: list[str] = field(default_factory=list)
denied_domains: list[str] = field(default_factory=lambda: ["169.254.0.0/16"]) # 云元数据
# 资源限制
max_file_size_mb: int = 10
max_requests_per_minute: int = 60
max_execution_seconds: int = 30
# 操作约束
require_confirmation: bool = False # 危险操作需要人工确认
audit_all_calls: bool = True
@dataclass
class AgentRole:
"""Agent 角色的权限配置"""
role_name: str
description: str
tool_permissions: Dict[str, ToolPermission]
def can_use_tool(self, tool_name: str) -> bool:
return tool_name in self.tool_permissions
def get_tool_permission(self, tool_name: str) -> Optional[ToolPermission]:
return self.tool_permissions.get(tool_name)
# 定义角色
def create_data_analyst_role() -> AgentRole:
"""数据分析师角色:只能读数据,不能写/删"""
return AgentRole(
role_name="data_analyst",
description="数据分析只读角色",
tool_permissions={
"read_file": ToolPermission(
tool_name="read_file",
capabilities={ToolCapability.FILE_READ},
allowed_paths=["/data/reports/", "/data/exports/"],
denied_paths=["/data/secrets/", "/etc/"],
max_file_size_mb=50,
),
"execute_sql": ToolPermission(
tool_name="execute_sql",
capabilities={ToolCapability.CODE_SQL},
# 只允许 SELECT,禁止 INSERT/UPDATE/DELETE/DROP
allowed_paths=[], # SQL不用文件路径
max_requests_per_minute=30,
),
"web_search": ToolPermission(
tool_name="web_search",
capabilities={ToolCapability.NET_HTTP_GET},
allowed_domains=["google.com", "bing.com", "scholar.google.com"],
max_requests_per_minute=20,
),
}
)
def create_devops_role() -> AgentRole:
"""DevOps 角色:可以执行脚本,但路径受限"""
return AgentRole(
role_name="devops",
description="DevOps 受限执行角色",
tool_permissions={
"execute_bash": ToolPermission(
tool_name="execute_bash",
capabilities={ToolCapability.CODE_BASH},
allowed_paths=["/opt/scripts/", "/var/app/"],
denied_paths=["/etc/", "/root/", "/home/", "/boot/"],
max_execution_seconds=120,
require_confirmation=True, # 执行脚本需要人工确认
),
"read_file": ToolPermission(
tool_name="read_file",
capabilities={ToolCapability.FILE_READ},
allowed_paths=["/var/log/", "/opt/app/logs/"],
),
}
)
67.1.3 运行时权限校验
class PermissionEnforcer:
"""运行时权限执行器"""
def __init__(self, role: AgentRole):
self.role = role
self.call_counts: Dict[str, list] = {} # 用于速率限制
def check_and_execute(self, tool_name: str, args: dict) -> dict:
"""检查权限并执行工具调用"""
# 1. 工具存在检查
if not self.role.can_use_tool(tool_name):
return self._deny(f"Tool '{tool_name}' not authorized for role '{self.role.role_name}'")
permission = self.role.get_tool_permission(tool_name)
# 2. 路径访问检查
if "path" in args:
path_check = self._check_path(args["path"], permission)
if not path_check["allowed"]:
return self._deny(path_check["reason"])
# 3. 域名访问检查
if "url" in args:
domain_check = self._check_domain(args["url"], permission)
if not domain_check["allowed"]:
return self._deny(domain_check["reason"])
# 4. 速率限制检查
rate_check = self._check_rate_limit(tool_name, permission)
if not rate_check["allowed"]:
return self._deny(rate_check["reason"])
# 5. SQL 注入防护(针对 SQL 工具)
if tool_name == "execute_sql" and "query" in args:
sql_check = self._check_sql_safety(args["query"])
if not sql_check["allowed"]:
return self._deny(sql_check["reason"])
# 6. 危险操作需要确认
if permission.require_confirmation:
return self._request_confirmation(tool_name, args)
# 7. 执行工具
return self._execute_tool(tool_name, args, permission)
def _check_path(self, path: str, permission: ToolPermission) -> dict:
"""检查文件路径访问权限"""
import os
abs_path = os.path.abspath(path)
# 检查黑名单(优先)
for denied in permission.denied_paths:
if abs_path.startswith(os.path.abspath(denied)):
return {"allowed": False, "reason": f"Path '{path}' is in deny list"}
# 检查白名单(如果配置了)
if permission.allowed_paths:
allowed = any(
abs_path.startswith(os.path.abspath(p))
for p in permission.allowed_paths
)
if not allowed:
return {"allowed": False, "reason": f"Path '{path}' not in allow list"}
return {"allowed": True}
def _check_sql_safety(self, query: str) -> dict:
"""检查 SQL 语句安全性"""
query_upper = query.strip().upper()
# 禁止的操作(对数据分析角色)
dangerous_keywords = ["INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER", "CREATE"]
for keyword in dangerous_keywords:
if query_upper.startswith(keyword) or f" {keyword} " in query_upper:
return {"allowed": False, "reason": f"SQL operation '{keyword}' not allowed"}
return {"allowed": True}
def _deny(self, reason: str) -> dict:
import logging
logging.warning(f"Permission denied: {reason}")
return {
"success": False,
"error": "Permission denied",
"reason": reason,
"suggestion": "Contact administrator if this operation is required",
}
def _request_confirmation(self, tool_name: str, args: dict) -> dict:
"""向人类操作员请求确认(用于高风险操作)"""
return {
"success": False,
"requires_confirmation": True,
"confirmation_request": {
"tool": tool_name,
"args": args,
"message": f"High-risk operation requires human approval. Please review and confirm.",
"approval_token": self._generate_approval_token(tool_name, args),
}
}
67.2 文件系统沙箱
67.2.1 chroot 沙箱
#!/bin/bash
# 为 Hermes Agent 创建 chroot 沙箱
# 1. 创建沙箱根目录结构
SANDBOX_ROOT="/opt/hermes-sandbox"
mkdir -p $SANDBOX_ROOT/{bin,lib,lib64,usr,tmp,data,scripts}
# 2. 复制必要的系统二进制文件
for binary in python3 bash ls cat grep awk sed find; do
cp $(which $binary) $SANDBOX_ROOT/bin/
done
# 3. 复制依赖库(使用 ldd 找依赖)
copy_libs() {
ldd "$1" | grep "=>" | awk '{print $3}' | while read lib; do
[ -f "$lib" ] && cp --parents "$lib" $SANDBOX_ROOT/
done
}
for binary in $SANDBOX_ROOT/bin/*; do
copy_libs "$binary"
done
# 4. 设置权限
chmod 755 $SANDBOX_ROOT
chmod 1777 $SANDBOX_ROOT/tmp # sticky bit
# 5. 挂载只读数据
mount --bind /data/readonly $SANDBOX_ROOT/data
mount -o remount,ro $SANDBOX_ROOT/data
# 6. 以非特权用户在沙箱中运行
useradd -r -s /bin/false hermes-agent
chroot $SANDBOX_ROOT su -s /bin/bash hermes-agent -c "python3 /scripts/agent.py"
67.2.2 容器化隔离(推荐生产方案)
# docker-compose.yml - Hermes Agent 生产容器配置
version: '3.8'
services:
hermes-agent:
image: hermes-agent:latest
# 安全配置
security_opt:
- no-new-privileges:true # 禁止权限提升
- seccomp:seccomp_profile.json # 系统调用白名单
# 只读根文件系统
read_only: true
# 允许写入的临时目录
tmpfs:
- /tmp:size=100m,noexec,nosuid
- /var/run:size=10m
# 用户映射
user: "1001:1001" # 非root用户
# 能力限制(删除所有 Linux capabilities)
cap_drop:
- ALL
cap_add:
- NET_BIND_SERVICE # 仅允许绑定端口(如果需要)
# 资源限制
deploy:
resources:
limits:
memory: 2G
cpus: '1.0'
reservations:
memory: 512M
# 网络隔离
networks:
- agent-internal # 内部网络
# 数据卷(只读)
volumes:
- type: bind
source: /data/knowledge-base
target: /data/kb
read_only: true
- type: volume
source: agent-workspace
target: /workspace
# 环境变量(不含敏感信息)
environment:
- HERMES_MODE=production
- LOG_LEVEL=INFO
- ALLOWED_DOMAINS=api.hermes.com,api.openai.com
# 密钥通过 Docker Secrets 注入(不走环境变量)
secrets:
- hermes_api_key
- database_credentials
secrets:
hermes_api_key:
external: true
database_credentials:
external: true
networks:
agent-internal:
driver: bridge
internal: true # 禁止直接访问外网
volumes:
agent-workspace:
driver: local
// seccomp_profile.json - 系统调用白名单
{
"defaultAction": "SCMP_ACT_ERRNO",
"syscalls": [
{
"names": [
"read", "write", "open", "close", "stat", "fstat",
"mmap", "mprotect", "munmap", "brk", "rt_sigaction",
"rt_sigprocmask", "ioctl", "access", "pipe", "select",
"sched_yield", "mremap", "msync", "mincore", "madvise",
"dup", "dup2", "nanosleep", "getpid", "socket", "connect",
"accept", "sendto", "recvfrom", "shutdown", "bind",
"listen", "getsockname", "getpeername", "socketpair",
"setsockopt", "getsockopt", "clone", "fork", "execve",
"exit", "wait4", "kill", "uname", "getcwd", "chdir",
"rename", "mkdir", "rmdir", "unlink", "readlink",
"chmod", "getuid", "getgid", "geteuid", "getegid",
"futex", "prctl", "getdents64", "lseek", "fcntl",
"openat", "newfstatat", "set_tid_address", "exit_group"
],
"action": "SCMP_ACT_ALLOW"
}
]
}
67.3 网络访问控制
67.3.1 出站流量白名单
import ipaddress
from urllib.parse import urlparse
from typing import Optional
class NetworkAccessController:
"""Agent 网络访问控制器"""
def __init__(self, config: dict):
# 允许的域名(白名单模式)
self.allowed_domains: set[str] = set(config.get("allowed_domains", []))
# 禁止的 IP 段(黑名单,用于防止 SSRF)
self.blocked_ip_ranges: list[ipaddress.IPv4Network] = [
ipaddress.IPv4Network("10.0.0.0/8"), # 私有网络
ipaddress.IPv4Network("172.16.0.0/12"), # 私有网络
ipaddress.IPv4Network("192.168.0.0/16"), # 私有网络
ipaddress.IPv4Network("127.0.0.0/8"), # 回环
ipaddress.IPv4Network("169.254.0.0/16"), # 链路本地(云元数据)
ipaddress.IPv4Network("100.64.0.0/10"), # 共享地址空间
]
# 允许的协议和端口
self.allowed_schemes: set[str] = {"https"} # 仅允许 HTTPS
self.allowed_ports: set[int] = {443}
def validate(self, url: str) -> dict:
"""验证 URL 是否允许访问"""
try:
parsed = urlparse(url)
except Exception as e:
return {"allowed": False, "reason": f"URL parse error: {e}"}
# 1. 协议检查
if parsed.scheme not in self.allowed_schemes:
return {"allowed": False, "reason": f"Scheme '{parsed.scheme}' not allowed (use HTTPS)"}
# 2. 端口检查
port = parsed.port or 443
if port not in self.allowed_ports:
return {"allowed": False, "reason": f"Port {port} not allowed"}
# 3. 域名白名单检查
domain = parsed.netloc.lower()
if ":" in domain:
domain = domain.split(":")[0]
domain_allowed = any(
domain == allowed or domain.endswith(f".{allowed}")
for allowed in self.allowed_domains
)
if not domain_allowed:
return {"allowed": False, "reason": f"Domain '{domain}' not in allowlist"}
# 4. SSRF 防护:解析 IP 并检查是否为私有地址
try:
import socket
ip = socket.gethostbyname(domain)
ip_addr = ipaddress.IPv4Address(ip)
for blocked_range in self.blocked_ip_ranges:
if ip_addr in blocked_range:
return {
"allowed": False,
"reason": f"IP {ip} resolves to blocked range {blocked_range}"
}
except socket.gaierror:
return {"allowed": False, "reason": "DNS resolution failed"}
return {"allowed": True, "resolved_ip": ip}
async def safe_fetch(self, url: str, headers: dict = None) -> dict:
"""安全的 HTTP 请求(带访问控制)"""
import aiohttp
validation = self.validate(url)
if not validation["allowed"]:
return {"success": False, "error": validation["reason"]}
# 安全请求头
safe_headers = {
"User-Agent": "Hermes-Agent/1.0",
**(headers or {}),
}
# 删除可能泄露内部信息的头
for dangerous_header in ["X-Forwarded-For", "X-Real-IP"]:
safe_headers.pop(dangerous_header, None)
async with aiohttp.ClientSession() as session:
async with session.get(
url,
headers=safe_headers,
timeout=aiohttp.ClientTimeout(total=30),
max_redirects=3, # 限制重定向次数(防止重定向攻击)
allow_redirects=True,
ssl=True, # 强制验证 SSL 证书
) as response:
# 限制响应大小(防止内存耗尽)
content = await response.read()
if len(content) > 10 * 1024 * 1024: # 10MB 上限
return {"success": False, "error": "Response too large (>10MB)"}
return {
"success": True,
"status": response.status,
"content": content.decode("utf-8", errors="replace"),
"content_type": response.headers.get("content-type", ""),
}
67.4 多租户数据隔离
67.4.1 租户数据隔离方案对比
| 方案 | 隔离强度 | 成本 | 适用场景 |
|---|---|---|---|
| 行级隔离(Row-Level Security) | 低 | 低 | 单数据库,信任内部用户 |
| Schema 隔离 | 中 | 中 | 同实例,数据量中等 |
| 数据库实例隔离 | 高 | 高 | 合规要求严格 |
| 独立部署 | 最高 | 最高 | 金融/医疗等高敏行业 |
67.4.2 行级安全隔离实现
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, Session
class TenantIsolatedSession:
"""
多租户行级安全 Session
所有查询自动附加租户过滤条件
"""
def __init__(self, connection_string: str):
self.engine = create_engine(connection_string)
self._setup_rls()
def _setup_rls(self):
"""在 PostgreSQL 中设置行级安全策略"""
with self.engine.connect() as conn:
# 启用行级安全
conn.execute("""
ALTER TABLE agent_conversations ENABLE ROW LEVEL SECURITY;
ALTER TABLE agent_tool_calls ENABLE ROW LEVEL SECURITY;
ALTER TABLE agent_files ENABLE ROW LEVEL SECURITY;
-- 策略:用户只能访问自己租户的数据
CREATE POLICY tenant_isolation_policy
ON agent_conversations
USING (tenant_id = current_setting('app.current_tenant_id'));
CREATE POLICY tenant_isolation_policy
ON agent_tool_calls
USING (tenant_id = current_setting('app.current_tenant_id'));
CREATE POLICY tenant_isolation_policy
ON agent_files
USING (tenant_id = current_setting('app.current_tenant_id'));
""")
def get_session(self, tenant_id: str) -> Session:
"""获取特定租户的 Session(自动注入租户上下文)"""
Session = sessionmaker(bind=self.engine)
session = Session()
# 设置租户上下文(PostgreSQL RLS 会自动过滤)
session.execute(f"SET app.current_tenant_id = '{tenant_id}'")
return session
class TenantAwareAgentMemory:
"""多租户感知的 Agent 记忆存储"""
def __init__(self, db: TenantIsolatedSession):
self.db = db
def save_conversation(self, tenant_id: str, conversation: dict) -> str:
"""保存对话记录(自动关联租户)"""
session = self.db.get_session(tenant_id)
try:
conversation_with_tenant = {
**conversation,
"tenant_id": tenant_id, # 强制注入 tenant_id
}
# 即使 RLS 失效,tenant_id 字段也确保了数据标记
record = ConversationRecord(**conversation_with_tenant)
session.add(record)
session.commit()
return record.id
finally:
session.close()
def get_conversation_history(self, tenant_id: str, conversation_id: str) -> list:
"""获取对话历史(只返回本租户数据)"""
session = self.db.get_session(tenant_id)
try:
# 双重保险:RLS + 显式 tenant_id 过滤
return session.query(ConversationRecord).filter(
ConversationRecord.tenant_id == tenant_id, # 显式过滤
ConversationRecord.id == conversation_id,
).all()
finally:
session.close()
67.5 密钥管理:让 Agent 永不接触明文凭证
67.5.1 密钥管理架构
错误做法(绝对禁止):
┌─────────────┐ API_KEY=sk-xxx ┌──────────┐
│ Agent │ ←────────────────────── │ 环境变量 │
└─────────────┘ └──────────┘
正确做法(生产级):
┌─────────────┐ 请求凭证(role+scope) ┌──────────────┐
│ Agent │ ──────────────────────→ │ Vault/KMS │
│ │ ←── 短期Token(15min) ─── │ 密钥服务 │
└─────────────┘ └──────────────┘
↕
加密存储的明文密钥
67.5.2 HashiCorp Vault 集成
import hvac
import os
from datetime import datetime, timedelta
from typing import Optional
import threading
class SecretManager:
"""
密钥管理器:从 Vault 动态获取凭证
Agent 永远看不到明文 API Key,只获取短期 Token
"""
def __init__(self, vault_url: str, vault_token: str):
self.client = hvac.Client(url=vault_url, token=vault_token)
self._cache: dict = {}
self._cache_lock = threading.Lock()
def get_api_key(self, service: str) -> str:
"""
获取服务的 API Key
- 优先从缓存获取(如果未过期)
- 自动续期
- 返回给 Agent 的是短期有效 Token,不是原始 Key
"""
cache_key = f"secret:{service}"
with self._cache_lock:
cached = self._cache.get(cache_key)
if cached and cached["expires_at"] > datetime.utcnow():
return cached["value"]
# 从 Vault 获取
try:
secret = self.client.secrets.kv.v2.read_secret_version(
path=f"hermes-agent/{service}",
mount_point="secret",
)
api_key = secret["data"]["data"]["api_key"]
expires_in = secret["data"]["data"].get("ttl", 900) # 默认15分钟
with self._cache_lock:
self._cache[cache_key] = {
"value": api_key,
"expires_at": datetime.utcnow() + timedelta(seconds=expires_in - 60)
}
return api_key
except Exception as e:
raise RuntimeError(f"Failed to retrieve secret for '{service}': {e}")
def get_database_credentials(self, db_role: str) -> dict:
"""
获取数据库动态凭证(每次生成新的临时用户名/密码)
使用 Vault 的 Database Secrets Engine
"""
creds = self.client.secrets.database.generate_credentials(
name=db_role,
mount_point="database",
)
return {
"username": creds["data"]["username"],
"password": creds["data"]["password"],
"lease_id": creds["lease_id"],
"lease_duration": creds["lease_duration"], # 自动过期
}
def revoke_credentials(self, lease_id: str):
"""主动吊销凭证(任务完成后立即吊销)"""
self.client.sys.revoke_lease(lease_id=lease_id)
class AgentCredentialProxy:
"""
Agent 凭证代理
Agent 调用这个代理来访问外部服务,
代理负责注入真实的 API Key,Agent 本身看不到
"""
def __init__(self, secret_manager: SecretManager):
self.secrets = secret_manager
async def call_openai(self, messages: list, model: str = "gpt-4") -> dict:
"""代理调用 OpenAI API,Agent 不接触 API Key"""
import aiohttp
api_key = self.secrets.get_api_key("openai")
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.openai.com/v1/chat/completions",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={"model": model, "messages": messages}
) as response:
return await response.json()
async def call_github_api(self, endpoint: str, method: str = "GET") -> dict:
"""代理调用 GitHub API"""
import aiohttp
token = self.secrets.get_api_key("github")
async with aiohttp.ClientSession() as session:
async with session.request(
method,
f"https://api.github.com{endpoint}",
headers={
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json",
}
) as response:
return await response.json()
67.5.3 密钥管理最佳实践总结
SECRETS_BEST_PRACTICES = {
"存储": {
"❌ 禁止": [
"明文硬编码在代码中",
"存储在环境变量(.env文件)",
"存储在 Agent 的上下文或记忆中",
"存储在日志中(确保日志脱敏)",
],
"✅ 推荐": [
"HashiCorp Vault",
"AWS Secrets Manager",
"GCP Secret Manager",
"Azure Key Vault",
]
},
"生命周期": {
"最短有效期": "15分钟(对于高权限服务)",
"自动轮转": "每30天自动轮转一次",
"泄露响应": "发现泄露立即吊销,<5分钟内完成",
},
"访问控制": {
"最小权限": "每个服务账号只能访问自己所需的密钥",
"审计": "所有密钥访问必须记录日志",
"MFA": "高权限密钥访问需要 MFA",
}
}
67.6 安全配置完整示例
# hermes-agent-production.yaml - 生产级完整安全配置
agent:
name: "hermes-production"
version: "3.0"
security:
# 权限控制
permission_model:
type: "role_based"
default_role: "read_only"
roles:
data_analyst:
allowed_tools: ["read_file", "execute_sql", "web_search"]
file_paths: ["/data/reports/", "/data/exports/"]
sql_operations: ["SELECT"]
web_domains: ["google.com", "scholar.google.com"]
devops:
allowed_tools: ["execute_bash", "read_file", "write_file"]
file_paths: ["/opt/scripts/", "/var/log/"]
require_confirmation: true
# 沙箱配置
sandbox:
type: "docker"
image: "hermes-sandbox:latest"
memory_limit: "2Gi"
cpu_limit: "1.0"
network_mode: "restricted"
read_only_root: true
allowed_syscalls: "seccomp_profile.json"
# 网络控制
network:
mode: "whitelist"
allowed_domains:
- "api.hermes.nousresearch.com"
- "api.openai.com"
- "google.com"
- "wikipedia.org"
force_https: true
block_private_ranges: true
max_response_size_mb: 10
# 密钥管理
secrets:
backend: "vault"
vault_url: "https://vault.internal.company.com"
auth_method: "kubernetes" # 使用 K8s ServiceAccount 认证
default_ttl: "15m"
max_ttl: "1h"
# 数据隔离
data_isolation:
strategy: "row_level_security"
tenant_field: "tenant_id"
enforce_at: ["database", "vector_store", "file_storage"]
# 审计日志
audit:
enabled: true
log_level: "ALL" # 记录所有工具调用
storage: "append_only_s3"
bucket: "hermes-audit-logs-prod"
encryption: "AES-256"
retention_days: 365
本章小结
本章建立了 Agent 生产安全的完整防线:
- 最小权限:角色权限矩阵、工具能力枚举、运行时权限校验,工具层精细化控制
- 文件系统沙箱:chroot(开发测试)和 Docker 容器(生产推荐),seccomp 系统调用过滤
- 网络访问控制:域名白名单 + 私有 IP 黑名单(防 SSRF),强制 HTTPS,响应大小限制
- 多租户隔离:PostgreSQL 行级安全策略(RLS),双重过滤防止越租户访问
- 密钥管理:Vault 动态凭证,Agent 永不接触明文 Key,任务完成后立即吊销
思考题
- 最小权限原则在实践中最大的挑战是"权限太严格导致 Agent 无法完成任务"。你会如何在安全和实用性之间找到平衡点?
- Docker 容器隔离不是无敌的——容器逃逸漏洞真实存在。对于极高安全要求的场景,你会叠加哪些额外的防御层?
- Vault 动态凭证方案的弱点是"Vault 本身成为单点故障"。如何在保证安全的同时提高系统的可用性?
- 行级安全策略(RLS)依赖于
current_setting被正确设置。如果攻击者找到了直接操作数据库连接池的方式,RLS 就失效了。你会如何设计多层数据隔离来防御这种情况?