Chapter 67

Production Security: Permission Control and Data Isolation

Chapter 67: Production Security: Permission Control and Data Isolation

Deploying an agent to production means security is no longer an optional considerationโ€”it's a requirement you pay for through incidents. An overly permissive agent is equivalent to placing an unsupervised superuser inside your organization. This chapter focuses on production-grade security: implementing the Principle of Least Privilege at the tool layer, file system sandboxing, network access control, multi-tenant data isolation, andโ€”most criticallyโ€”secret management that ensures the agent never touches plaintext credentials.


67.1 Principle of Least Privilege: Tool-Layer Implementation

67.1.1 What Is Tool-Layer Least Privilege?

In traditional systems, the Principle of Least Privilege (PoLP) means processes have only the minimum permissions required to do their job. In agent systems, this principle extends to the tool layer: an agent may only invoke tools necessary for the current task, and each tool's capabilities are themselves constrained to the minimum required scope.

Permission Hierarchy:
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  All Tool Capabilities (Universe)   โ”‚
โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚
โ”‚  โ”‚  Role Toolset                  โ”‚  โ”‚
โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  Task Toolset            โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”  โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  โ”‚  Operation Scope   โ”‚  โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚  โ”‚  โ”‚
โ”‚  โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚  โ”‚
โ”‚  โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜  โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

67.1.2 Tool Permission Matrix

from dataclasses import dataclass, field
from typing import Optional, Set, Dict
from enum import Enum

class ToolCapability(Enum):
    FILE_READ     = "file:read"
    FILE_WRITE    = "file:write"
    FILE_DELETE   = "file:delete"
    FILE_EXECUTE  = "file:execute"
    NET_HTTP_GET  = "net:http:get"
    NET_HTTP_POST = "net:http:post"
    NET_SMTP      = "net:smtp"
    CODE_PYTHON   = "code:python"
    CODE_BASH     = "code:bash"
    CODE_SQL      = "code:sql"
    EXT_GITHUB    = "ext:github"
    EXT_SLACK     = "ext:slack"

@dataclass
class ToolPermission:
    tool_name: str
    capabilities: Set[ToolCapability]
    allowed_paths: list[str] = field(default_factory=list)
    denied_paths: list[str] = field(default_factory=lambda: ["/etc", "/root", "/sys", "/proc"])
    allowed_domains: list[str] = field(default_factory=list)
    denied_domains: list[str] = field(default_factory=lambda: ["169.254.0.0/16"])
    max_file_size_mb: int = 10
    max_requests_per_minute: int = 60
    max_execution_seconds: int = 30
    require_confirmation: bool = False
    audit_all_calls: bool = True

@dataclass
class AgentRole:
    role_name: str
    description: str
    tool_permissions: Dict[str, ToolPermission]

    def can_use_tool(self, name: str) -> bool:
        return name in self.tool_permissions

    def get_tool_permission(self, name: str) -> Optional[ToolPermission]:
        return self.tool_permissions.get(name)

# Example roles
def create_data_analyst_role() -> AgentRole:
    return AgentRole(
        role_name="data_analyst",
        description="Read-only data analysis role",
        tool_permissions={
            "read_file": ToolPermission(
                tool_name="read_file",
                capabilities={ToolCapability.FILE_READ},
                allowed_paths=["/data/reports/", "/data/exports/"],
                denied_paths=["/data/secrets/", "/etc/"],
                max_file_size_mb=50,
            ),
            "execute_sql": ToolPermission(
                tool_name="execute_sql",
                capabilities={ToolCapability.CODE_SQL},
                max_requests_per_minute=30,
            ),
            "web_search": ToolPermission(
                tool_name="web_search",
                capabilities={ToolCapability.NET_HTTP_GET},
                allowed_domains=["google.com", "bing.com", "scholar.google.com"],
                max_requests_per_minute=20,
            ),
        }
    )

67.1.3 Runtime Permission Enforcement

import os, logging
from pathlib import Path

class PermissionEnforcer:
    def __init__(self, role: AgentRole):
        self.role = role
        self.call_counts: Dict[str, list] = {}

    def check_and_execute(self, tool_name: str, args: dict) -> dict:
        if not self.role.can_use_tool(tool_name):
            return self._deny(f"Tool '{tool_name}' not authorized for role '{self.role.role_name}'")

        perm = self.role.get_tool_permission(tool_name)

        if "path" in args:
            check = self._check_path(args["path"], perm)
            if not check["allowed"]: return self._deny(check["reason"])

        if "url" in args:
            check = self._check_domain(args["url"], perm)
            if not check["allowed"]: return self._deny(check["reason"])

        if not self._check_rate_limit(tool_name, perm)["allowed"]:
            return self._deny("Rate limit exceeded")

        if tool_name == "execute_sql" and "query" in args:
            check = self._check_sql_safety(args["query"])
            if not check["allowed"]: return self._deny(check["reason"])

        if perm.require_confirmation:
            return self._request_confirmation(tool_name, args)

        return self._execute_tool(tool_name, args, perm)

    def _check_path(self, path: str, perm: ToolPermission) -> dict:
        abs_path = os.path.abspath(path)
        for denied in perm.denied_paths:
            if abs_path.startswith(os.path.abspath(denied)):
                return {"allowed": False, "reason": f"Path '{path}' is in deny list"}
        if perm.allowed_paths:
            if not any(abs_path.startswith(os.path.abspath(p)) for p in perm.allowed_paths):
                return {"allowed": False, "reason": f"Path '{path}' not in allow list"}
        return {"allowed": True}

    def _check_sql_safety(self, query: str) -> dict:
        upper = query.strip().upper()
        for kw in ["INSERT", "UPDATE", "DELETE", "DROP", "TRUNCATE", "ALTER", "CREATE"]:
            if upper.startswith(kw) or f" {kw} " in upper:
                return {"allowed": False, "reason": f"SQL operation '{kw}' not allowed"}
        return {"allowed": True}

    def _deny(self, reason: str) -> dict:
        logging.warning(f"Permission denied: {reason}")
        return {"success": False, "error": "Permission denied", "reason": reason}

67.2 File System Sandboxing

# docker-compose.yml
version: '3.8'
services:
  hermes-agent:
    image: hermes-agent:latest
    security_opt:
      - no-new-privileges:true
      - seccomp:seccomp_profile.json
    read_only: true
    tmpfs:
      - /tmp:size=100m,noexec,nosuid
      - /var/run:size=10m
    user: "1001:1001"
    cap_drop: [ALL]
    deploy:
      resources:
        limits: { memory: 2G, cpus: '1.0' }
    networks: [agent-internal]
    volumes:
      - type: bind
        source: /data/knowledge-base
        target: /data/kb
        read_only: true
    secrets: [hermes_api_key, database_credentials]

networks:
  agent-internal:
    driver: bridge
    internal: true   # No direct external internet access

secrets:
  hermes_api_key: { external: true }
  database_credentials: { external: true }

67.2.2 seccomp System Call Whitelist

{
  "defaultAction": "SCMP_ACT_ERRNO",
  "syscalls": [{
    "names": [
      "read", "write", "open", "close", "stat", "fstat",
      "mmap", "mprotect", "munmap", "brk", "rt_sigaction",
      "socket", "connect", "sendto", "recvfrom", "shutdown",
      "clone", "fork", "execve", "exit", "wait4",
      "getcwd", "chdir", "rename", "mkdir", "unlink",
      "futex", "prctl", "getdents64", "lseek", "fcntl",
      "openat", "newfstatat", "exit_group"
    ],
    "action": "SCMP_ACT_ALLOW"
  }]
}

67.3 Network Access Control

import ipaddress, socket
from urllib.parse import urlparse

class NetworkAccessController:
    def __init__(self, config: dict):
        self.allowed_domains: set[str] = set(config.get("allowed_domains", []))
        self.blocked_ranges = [
            ipaddress.IPv4Network("10.0.0.0/8"),
            ipaddress.IPv4Network("172.16.0.0/12"),
            ipaddress.IPv4Network("192.168.0.0/16"),
            ipaddress.IPv4Network("127.0.0.0/8"),
            ipaddress.IPv4Network("169.254.0.0/16"),  # Cloud metadata
            ipaddress.IPv4Network("100.64.0.0/10"),
        ]
        self.allowed_schemes = {"https"}
        self.allowed_ports = {443}

    def validate(self, url: str) -> dict:
        parsed = urlparse(url)

        if parsed.scheme not in self.allowed_schemes:
            return {"allowed": False, "reason": f"Scheme '{parsed.scheme}' not allowed"}

        port = parsed.port or 443
        if port not in self.allowed_ports:
            return {"allowed": False, "reason": f"Port {port} not allowed"}

        domain = parsed.netloc.lower().split(":")[0]
        if not any(domain == a or domain.endswith(f".{a}") for a in self.allowed_domains):
            return {"allowed": False, "reason": f"Domain '{domain}' not in allowlist"}

        # SSRF protection
        try:
            ip = socket.gethostbyname(domain)
            ip_addr = ipaddress.IPv4Address(ip)
            for blocked in self.blocked_ranges:
                if ip_addr in blocked:
                    return {"allowed": False, "reason": f"IP {ip} in blocked range {blocked}"}
        except socket.gaierror:
            return {"allowed": False, "reason": "DNS resolution failed"}

        return {"allowed": True}

    async def safe_fetch(self, url: str) -> dict:
        import aiohttp
        check = self.validate(url)
        if not check["allowed"]:
            return {"success": False, "error": check["reason"]}

        async with aiohttp.ClientSession() as session:
            async with session.get(
                url, ssl=True,
                timeout=aiohttp.ClientTimeout(total=30),
                max_redirects=3
            ) as resp:
                content = await resp.read()
                if len(content) > 10 * 1024 * 1024:
                    return {"success": False, "error": "Response exceeds 10 MB limit"}
                return {"success": True, "status": resp.status,
                        "content": content.decode("utf-8", errors="replace")}

67.4 Multi-Tenant Data Isolation

67.4.1 Isolation Strategy Comparison

Strategy Isolation Strength Cost Use Case
Row-Level Security (RLS) Low Low Single DB, trusted internal users
Schema isolation Medium Medium Same instance, medium data volume
Separate DB instances High High Strict compliance requirements
Fully separate deployments Highest Highest Finance / healthcare

67.4.2 PostgreSQL Row-Level Security

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session

class TenantIsolatedSession:
    def __init__(self, connection_string: str):
        self.engine = create_engine(connection_string)
        self._setup_rls()

    def _setup_rls(self):
        with self.engine.connect() as conn:
            conn.execute("""
                ALTER TABLE agent_conversations ENABLE ROW LEVEL SECURITY;

                CREATE POLICY tenant_isolation
                ON agent_conversations
                USING (tenant_id = current_setting('app.current_tenant_id'));
            """)

    def get_session(self, tenant_id: str) -> Session:
        SessionLocal = sessionmaker(bind=self.engine)
        session = SessionLocal()
        # Inject tenant context โ€” PostgreSQL RLS uses this for filtering
        session.execute(f"SET app.current_tenant_id = '{tenant_id}'")
        return session

class TenantAwareAgentMemory:
    def __init__(self, db: TenantIsolatedSession):
        self.db = db

    def save_conversation(self, tenant_id: str, conversation: dict) -> str:
        session = self.db.get_session(tenant_id)
        try:
            record = ConversationRecord(**{**conversation, "tenant_id": tenant_id})
            session.add(record); session.commit()
            return record.id
        finally:
            session.close()

    def get_history(self, tenant_id: str, conv_id: str) -> list:
        session = self.db.get_session(tenant_id)
        try:
            # Double enforcement: RLS + explicit filter
            return session.query(ConversationRecord).filter(
                ConversationRecord.tenant_id == tenant_id,
                ConversationRecord.id == conv_id,
            ).all()
        finally:
            session.close()

67.5 Secret Management: The Agent Never Sees Plaintext

67.5.1 Architecture

Wrong (never do this):
Agent โ† API_KEY=sk-xxx โ† Environment Variable / .env file

Right (production):
Agent โ†’ request(role + scope) โ†’ Vault/KMS
Agent โ† short-lived token (15 min) โ† Vault/KMS
                                          โ†•
                               Encrypted plaintext keys

67.5.2 HashiCorp Vault Integration

import hvac, threading
from datetime import datetime, timedelta

class SecretManager:
    def __init__(self, vault_url: str, vault_token: str):
        self.client = hvac.Client(url=vault_url, token=vault_token)
        self._cache: dict = {}
        self._lock = threading.Lock()

    def get_api_key(self, service: str) -> str:
        key = f"secret:{service}"
        with self._lock:
            cached = self._cache.get(key)
            if cached and cached["expires_at"] > datetime.utcnow():
                return cached["value"]

        secret = self.client.secrets.kv.v2.read_secret_version(
            path=f"hermes-agent/{service}", mount_point="secret")
        api_key = secret["data"]["data"]["api_key"]
        ttl = secret["data"]["data"].get("ttl", 900)

        with self._lock:
            self._cache[key] = {
                "value": api_key,
                "expires_at": datetime.utcnow() + timedelta(seconds=ttl - 60),
            }
        return api_key

    def get_db_credentials(self, db_role: str) -> dict:
        """Dynamic database credentials โ€” new user/password every call."""
        creds = self.client.secrets.database.generate_credentials(
            name=db_role, mount_point="database")
        return {
            "username": creds["data"]["username"],
            "password": creds["data"]["password"],
            "lease_id": creds["lease_id"],
        }

    def revoke(self, lease_id: str):
        """Immediately revoke credentials after task completion."""
        self.client.sys.revoke_lease(lease_id=lease_id)


class AgentCredentialProxy:
    """Agent calls this proxy; the proxy injects credentials. Agent never sees keys."""

    def __init__(self, secrets: SecretManager):
        self.secrets = secrets

    async def call_openai(self, messages: list, model: str = "gpt-4") -> dict:
        import aiohttp
        key = self.secrets.get_api_key("openai")
        async with aiohttp.ClientSession() as s:
            async with s.post(
                "https://api.openai.com/v1/chat/completions",
                headers={"Authorization": f"Bearer {key}"},
                json={"model": model, "messages": messages}
            ) as r:
                return await r.json()

67.5.3 Secret Management Best Practices

Category Prohibited Recommended
Storage Hardcoded in code, .env files, agent context, logs HashiCorp Vault, AWS Secrets Manager, GCP Secret Manager
Lifetime Static long-lived keys 15-minute TTL for high-privilege services
Rotation Manual rotation Automatic rotation every 30 days
Revocation Wait for expiry Revoke immediately upon task completion or breach

67.6 Complete Production Security Configuration

# hermes-agent-production.yaml
security:
  permission_model:
    type: role_based
    default_role: read_only
    roles:
      data_analyst:
        allowed_tools: [read_file, execute_sql, web_search]
        file_paths: [/data/reports/, /data/exports/]
        sql_operations: [SELECT]
      devops:
        allowed_tools: [execute_bash, read_file, write_file]
        file_paths: [/opt/scripts/, /var/log/]
        require_confirmation: true

  sandbox:
    type: docker
    memory_limit: 2Gi
    cpu_limit: "1.0"
    read_only_root: true
    allowed_syscalls: seccomp_profile.json

  network:
    mode: whitelist
    allowed_domains:
      - api.hermes.nousresearch.com
      - api.openai.com
      - google.com
    force_https: true
    block_private_ranges: true
    max_response_size_mb: 10

  secrets:
    backend: vault
    vault_url: https://vault.internal.company.com
    auth_method: kubernetes
    default_ttl: 15m

  data_isolation:
    strategy: row_level_security
    tenant_field: tenant_id
    enforce_at: [database, vector_store, file_storage]

  audit:
    enabled: true
    log_level: ALL
    storage: append_only_s3
    retention_days: 365

Chapter Summary

This chapter built a complete production security perimeter for Hermes Agent:

  1. Least Privilege: Role permission matrix, tool capability enumeration, runtime enforcementโ€”fine-grained control at the tool layer
  2. File system sandbox: Docker containers with read-only root, seccomp system call filtering, non-root user
  3. Network access control: Domain allowlist + private IP blocklist (SSRF prevention), forced HTTPS, response size limits
  4. Multi-tenant isolation: PostgreSQL Row-Level Security with explicit tenant_id double-filtering
  5. Secret management: Vault dynamic credentials; agent never sees plaintext keys; immediate revocation after task completion

Discussion Questions

  1. The biggest practical challenge of least privilege is "permissions too strict for the agent to complete its task." How would you find the balance between security and utility?
  2. Docker container isolation is not foolproofโ€”container escapes are real. For extremely high-security scenarios, what additional layers would you add on top of containers?
  3. Vault dynamic credentials introduce a single point of failure: Vault itself. How would you maintain security while improving availability?
  4. RLS relies on current_setting being set correctly. If an attacker gains direct access to the connection pool, RLS fails. How would you design multi-layer data isolation to defend against this?
Rate this chapter
4.6  / 5  (3 ratings)

๐Ÿ’ฌ Comments