Production Security: Permission Control and Data Isolation
Chapter 67: Production Security: Permission Control and Data Isolation
Deploying an agent to production means security is no longer an optional consideration—it's a requirement you pay for through incidents. An overly permissive agent is equivalent to placing an unsupervised superuser inside your organization. This chapter focuses on production-grade security: implementing the Principle of Least Privilege at the tool layer, file system sandboxing, network access control, multi-tenant data isolation, and—most critically—secret management that ensures the agent never touches plaintext credentials.
67.1 Principle of Least Privilege: Tool-Layer Implementation
67.1.1 What Is Tool-Layer Least Privilege?
In traditional systems, the Principle of Least Privilege (PoLP) means processes have only the minimum permissions required to do their job. In agent systems, this principle extends to the tool layer: an agent may only invoke tools necessary for the current task, and each tool's capabilities are themselves constrained to the minimum required scope.
Permission Hierarchy:
┌──────────────────────────────────────┐
│ All Tool Capabilities (Universe) │
│ ┌────────────────────────────────┐ │
│ │ Role Toolset │ │
│ │ ┌──────────────────────────┐ │ │
│ │ │ Task Toolset │ │ │
│ │ │ ┌────────────────────┐ │ │ │
│ │ │ │ Operation Scope │ │ │ │
│ │ │ └────────────────────┘ │ │ │
│ │ └──────────────────────────┘ │ │
│ └────────────────────────────────┘ │
└──────────────────────────────────────┘
67.1.2 Tool Permission Matrix
from dataclasses import dataclass, field
from typing import Optional, Set, Dict
from enum import Enum
class ToolCapability(Enum):
FILE_READ = "file:read"
FILE_WRITE = "file:write"
FILE_DELETE = "file:delete"
FILE_EXECUTE = "file:execute"
NET_HTTP_GET = "net:http:get"
NET_HTTP_POST = "net:http:post"
NET_SMTP = "net:smtp"
CODE_PYTHON = "code:python"
CODE_BASH = "code:bash"
CODE_SQL = "code:sql"
EXT_GITHUB = "ext:github"
EXT_SLACK = "ext:slack"
@dataclass
class ToolPermission:
tool_name: str
capabilities: Set[ToolCapability]
allowed_paths: list[str] = field(default_factory=list)
denied_paths: list[str] = field(default_factory=lambda: ["/etc", "/root", "/sys", "/proc"])
allowed_domains: list[str] = field(default_factory=list)
denied_domains: list[str] = field(default_factory=lambda: ["169.254.0.0/16"])
max_file_size_mb: int = 10
max_requests_per_minute: int = 60
max_execution_seconds: int = 30
require_confirmation: bool = False
audit_all_calls: bool = True
@dataclass
class AgentRole:
role_name: str
description: str
tool_permissions: Dict[str, ToolPermission]
def can_use_tool(self, name: str) -> bool:
return name in self.tool_permissions
def get_tool_permission(self, name: str) -> Optional[ToolPermission]:
return self.tool_permissions.get(name)
# Example roles
def create_data_analyst_role() -> AgentRole:
return AgentRole(
role_name="data_analyst",
description="Read-only data analysis role",
tool_permissions={
"read_file": ToolPermission(
tool_name="read_file",
capabilities={ToolCapability.FILE_READ},
allowed_paths=["/data/reports/", "/data/exports/"],
denied_paths=["/data/secrets/", "/etc/"],
max_file_size_mb=50,
),
"execute_sql": ToolPermission(
tool_name="execute_sql",
capabilities={ToolCapability.CODE_SQL},
max_requests_per_minute=30,
),
"web_search": ToolPermission(
tool_name="web_search",
capabilities={ToolCapability.NET_HTTP_GET},
allowed_domains=["google.com", "bing.com", "scholar.google.com"],
max_requests_per_minute=20,
),
}
)
67.1.3 Runtime Permission Enforcement
import os, logging
from pathlib import Path
class PermissionEnforcer:
def __init__(self, role: AgentRole):
self.role = role
self.call_counts: Dict[str, list] = {}
def check_and_execute(self, tool_name: str, args: dict) -> dict:
if not self.role.can_use_tool(tool_name):
return self._deny(f"Tool '{tool_name}' not authorized for role '{self.role.role_name}'")
perm = self.role.get_tool_permission(tool_name)
if "path" in args:
check = self._check_path(args["path"], perm)
if not check["allowed"]: return self._deny(check["reason"])
if "url" in args:
check = self._check_domain(args["url"], perm)
if not check["allowed"]: return self._deny(check["reason"])
if not self._check_rate_limit(tool_name, perm)["allowed"]:
return self._deny("Rate limit exceeded")
if tool_name == "execute_sql" and "query" in args:
check = self._check_sql_safety(args["query"])
if not check["allowed"]: return self._deny(check["reason"])
if perm.require_confirmation:
return self._request_confirmation(tool_name, args)
return self._execute_tool(tool_name, args, perm)
def _check_path(self, path: str, perm: ToolPermission) -> dict:
abs_path = os.path.abspath(path)
for denied in perm.denied_paths:
if abs_path.startswith(os.path.abspath(denied)):
return {"allowed": False, "reason": f"Path '{path}' is in deny list"}
if perm.allowed_paths:
if not any(abs_path.startswith(os.path.abspath(p)) for p in perm.allowed_paths):
return {"allowed": False, "reason": f"Path '{path}' not in allow list"}
return {"allowed": True}
def _check_sql_safety(self, query: str) -> dict:
upper = query.strip().upper()
for kw in ["INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER", "CREATE"]:
if upper.startswith(kw) or f" {kw} " in upper:
return {"allowed": False, "reason": f"SQL operation '{kw}' not allowed"}
return {"allowed": True}
def _deny(self, reason: str) -> dict:
logging.warning(f"Permission denied: {reason}")
return {"success": False, "error": "Permission denied", "reason": reason}
67.2 File System Sandboxing
67.2.1 Container Isolation (Recommended for Production)
# docker-compose.yml
version: '3.8'
services:
hermes-agent:
image: hermes-agent:latest
security_opt:
- no-new-privileges:true
- seccomp:seccomp_profile.json
read_only: true
tmpfs:
- /tmp:size=100m,noexec,nosuid
- /var/run:size=10m
user: "1001:1001"
cap_drop: [ALL]
deploy:
resources:
limits: { memory: 2G, cpus: '1.0' }
networks: [agent-internal]
volumes:
- type: bind
source: /data/knowledge-base
target: /data/kb
read_only: true
secrets: [hermes_api_key, database_credentials]
networks:
agent-internal:
driver: bridge
internal: true # No direct external internet access
secrets:
hermes_api_key: { external: true }
database_credentials: { external: true }
67.2.2 seccomp System Call Whitelist
{
"defaultAction": "SCMP_ACT_ERRNO",
"syscalls": [{
"names": [
"read", "write", "open", "close", "stat", "fstat",
"mmap", "mprotect", "munmap", "brk", "rt_sigaction",
"socket", "connect", "sendto", "recvfrom", "shutdown",
"clone", "fork", "execve", "exit", "wait4",
"getcwd", "chdir", "rename", "mkdir", "unlink",
"futex", "prctl", "getdents64", "lseek", "fcntl",
"openat", "newfstatat", "exit_group"
],
"action": "SCMP_ACT_ALLOW"
}]
}
67.3 Network Access Control
import ipaddress, socket
from urllib.parse import urlparse
class NetworkAccessController:
def __init__(self, config: dict):
self.allowed_domains: set[str] = set(config.get("allowed_domains", []))
self.blocked_ranges = [
ipaddress.IPv4Network("10.0.0.0/8"),
ipaddress.IPv4Network("172.16.0.0/12"),
ipaddress.IPv4Network("192.168.0.0/16"),
ipaddress.IPv4Network("127.0.0.0/8"),
ipaddress.IPv4Network("169.254.0.0/16"), # Cloud metadata
ipaddress.IPv4Network("100.64.0.0/10"),
]
self.allowed_schemes = {"https"}
self.allowed_ports = {443}
def validate(self, url: str) -> dict:
parsed = urlparse(url)
if parsed.scheme not in self.allowed_schemes:
return {"allowed": False, "reason": f"Scheme '{parsed.scheme}' not allowed"}
port = parsed.port or 443
if port not in self.allowed_ports:
return {"allowed": False, "reason": f"Port {port} not allowed"}
domain = parsed.netloc.lower().split(":")[0]
if not any(domain == a or domain.endswith(f".{a}") for a in self.allowed_domains):
return {"allowed": False, "reason": f"Domain '{domain}' not in allowlist"}
# SSRF protection
try:
ip = socket.gethostbyname(domain)
ip_addr = ipaddress.IPv4Address(ip)
for blocked in self.blocked_ranges:
if ip_addr in blocked:
return {"allowed": False, "reason": f"IP {ip} in blocked range {blocked}"}
except socket.gaierror:
return {"allowed": False, "reason": "DNS resolution failed"}
return {"allowed": True}
async def safe_fetch(self, url: str) -> dict:
import aiohttp
check = self.validate(url)
if not check["allowed"]:
return {"success": False, "error": check["reason"]}
async with aiohttp.ClientSession() as session:
async with session.get(
url, ssl=True,
timeout=aiohttp.ClientTimeout(total=30),
max_redirects=3
) as resp:
content = await resp.read()
if len(content) > 10 * 1024 * 1024:
return {"success": False, "error": "Response exceeds 10 MB limit"}
return {"success": True, "status": resp.status,
"content": content.decode("utf-8", errors="replace")}
67.4 Multi-Tenant Data Isolation
67.4.1 Isolation Strategy Comparison
| Strategy | Isolation Strength | Cost | Use Case |
|---|---|---|---|
| Row-Level Security (RLS) | Low | Low | Single DB, trusted internal users |
| Schema isolation | Medium | Medium | Same instance, medium data volume |
| Separate DB instances | High | High | Strict compliance requirements |
| Fully separate deployments | Highest | Highest | Finance / healthcare |
67.4.2 PostgreSQL Row-Level Security
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
class TenantIsolatedSession:
def __init__(self, connection_string: str):
self.engine = create_engine(connection_string)
self._setup_rls()
def _setup_rls(self):
with self.engine.connect() as conn:
conn.execute("""
ALTER TABLE agent_conversations ENABLE ROW LEVEL SECURITY;
CREATE POLICY tenant_isolation
ON agent_conversations
USING (tenant_id = current_setting('app.current_tenant_id'));
""")
def get_session(self, tenant_id: str) -> Session:
SessionLocal = sessionmaker(bind=self.engine)
session = SessionLocal()
# Inject tenant context — PostgreSQL RLS uses this for filtering
session.execute(f"SET app.current_tenant_id = '{tenant_id}'")
return session
class TenantAwareAgentMemory:
def __init__(self, db: TenantIsolatedSession):
self.db = db
def save_conversation(self, tenant_id: str, conversation: dict) -> str:
session = self.db.get_session(tenant_id)
try:
record = ConversationRecord(**{**conversation, "tenant_id": tenant_id})
session.add(record); session.commit()
return record.id
finally:
session.close()
def get_history(self, tenant_id: str, conv_id: str) -> list:
session = self.db.get_session(tenant_id)
try:
# Double enforcement: RLS + explicit filter
return session.query(ConversationRecord).filter(
ConversationRecord.tenant_id == tenant_id,
ConversationRecord.id == conv_id,
).all()
finally:
session.close()
67.5 Secret Management: The Agent Never Sees Plaintext
67.5.1 Architecture
Wrong (never do this):
Agent ← API_KEY=sk-xxx ← Environment Variable / .env file
Right (production):
Agent → request(role + scope) → Vault/KMS
Agent ← short-lived token (15 min) ← Vault/KMS
↕
Encrypted plaintext keys
67.5.2 HashiCorp Vault Integration
import hvac, threading
from datetime import datetime, timedelta
class SecretManager:
def __init__(self, vault_url: str, vault_token: str):
self.client = hvac.Client(url=vault_url, token=vault_token)
self._cache: dict = {}
self._lock = threading.Lock()
def get_api_key(self, service: str) -> str:
key = f"secret:{service}"
with self._lock:
cached = self._cache.get(key)
if cached and cached["expires_at"] > datetime.utcnow():
return cached["value"]
secret = self.client.secrets.kv.v2.read_secret_version(
path=f"hermes-agent/{service}", mount_point="secret")
api_key = secret["data"]["data"]["api_key"]
ttl = secret["data"]["data"].get("ttl", 900)
with self._lock:
self._cache[key] = {
"value": api_key,
"expires_at": datetime.utcnow() + timedelta(seconds=ttl - 60),
}
return api_key
def get_db_credentials(self, db_role: str) -> dict:
"""Dynamic database credentials — new user/password every call."""
creds = self.client.secrets.database.generate_credentials(
name=db_role, mount_point="database")
return {
"username": creds["data"]["username"],
"password": creds["data"]["password"],
"lease_id": creds["lease_id"],
}
def revoke(self, lease_id: str):
"""Immediately revoke credentials after task completion."""
self.client.sys.revoke_lease(lease_id=lease_id)
class AgentCredentialProxy:
"""Agent calls this proxy; the proxy injects credentials. Agent never sees keys."""
def __init__(self, secrets: SecretManager):
self.secrets = secrets
async def call_openai(self, messages: list, model: str = "gpt-4") -> dict:
import aiohttp
key = self.secrets.get_api_key("openai")
async with aiohttp.ClientSession() as s:
async with s.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {key}"},
json={"model": model, "messages": messages}
) as r:
return await r.json()
67.5.3 Secret Management Best Practices
| Category | Prohibited | Recommended |
|---|---|---|
| Storage | Hardcoded in code, .env files, agent context, logs | HashiCorp Vault, AWS Secrets Manager, GCP Secret Manager |
| Lifetime | Static long-lived keys | 15-minute TTL for high-privilege services |
| Rotation | Manual rotation | Automatic rotation every 30 days |
| Revocation | Wait for expiry | Revoke immediately upon task completion or breach |
67.6 Complete Production Security Configuration
# hermes-agent-production.yaml
security:
permission_model:
type: role_based
default_role: read_only
roles:
data_analyst:
allowed_tools: [read_file, execute_sql, web_search]
file_paths: [/data/reports/, /data/exports/]
sql_operations: [SELECT]
devops:
allowed_tools: [execute_bash, read_file, write_file]
file_paths: [/opt/scripts/, /var/log/]
require_confirmation: true
sandbox:
type: docker
memory_limit: 2Gi
cpu_limit: "1.0"
read_only_root: true
allowed_syscalls: seccomp_profile.json
network:
mode: whitelist
allowed_domains:
- api.hermes.nousresearch.com
- api.openai.com
- google.com
force_https: true
block_private_ranges: true
max_response_size_mb: 10
secrets:
backend: vault
vault_url: https://vault.internal.company.com
auth_method: kubernetes
default_ttl: 15m
data_isolation:
strategy: row_level_security
tenant_field: tenant_id
enforce_at: [database, vector_store, file_storage]
audit:
enabled: true
log_level: ALL
storage: append_only_s3
retention_days: 365
Chapter Summary
This chapter built a complete production security perimeter for Hermes Agent:
- Least Privilege: Role permission matrix, tool capability enumeration, runtime enforcement—fine-grained control at the tool layer
- File system sandbox: Docker containers with read-only root, seccomp system call filtering, non-root user
- Network access control: Domain allowlist + private IP blocklist (SSRF prevention), forced HTTPS, response size limits
- Multi-tenant isolation: PostgreSQL Row-Level Security with explicit tenant_id double-filtering
- Secret management: Vault dynamic credentials; agent never sees plaintext keys; immediate revocation after task completion
Discussion Questions
- The biggest practical challenge of least privilege is "permissions too strict for the agent to complete its task." How would you find the balance between security and utility?
- Docker container isolation is not foolproof—container escapes are real. For extremely high-security scenarios, what additional layers would you add on top of containers?
- Vault dynamic credentials introduce a single point of failure: Vault itself. How would you maintain security while improving availability?
- RLS relies on
current_settingbeing set correctly. If an attacker gains direct access to the connection pool, RLS fails. How would you design multi-layer data isolation to defend against this?