← 返回 Skills 市场
mirni

AgentOps: Managing AI Agent Fleets in Production

作者 mirni · GitHub ↗ · v1.3.1 · MIT-0
cross-platform ✓ 安全检测通过
187
总下载
1
收藏
0
当前安装
5
版本数
在 OpenClaw 中安装
/install greenhelix-agent-fleet-ops
功能描述
AgentOps: Managing AI Agent Fleets in Production. The SRE playbook for AI agent fleets: provisioning, observability, cost-aware routing, SLA enforcement, sca...
使用说明 (SKILL.md)

AgentOps: Managing AI Agent Fleets in Production

Notice: This is an educational guide with illustrative code examples. It does not execute code, require credentials, or install dependencies. All examples use the GreenHelix sandbox (https://sandbox.greenhelix.net) which provides 500 free credits — no API key required to get started.

IBM coined "AgentOps" as a formal discipline in early 2026 for a reason that is now obvious: the bottleneck in enterprise AI shifted from building agents to keeping them alive. LangSmith Fleet launched in March 2026. Gartner projects 40% of agentic AI projects will fail by 2027 — not because the models are bad, but because nobody wrote the ops playbook. A Fortune 500 company leaked $400 million in cloud spend from unbudgeted agent compute last quarter. The EU AI Act's high-risk obligations become enforceable on August 2, 2026, and the Spanish DPA has already ruled that "greater technical autonomy does not reduce legal responsibility." This guide is the ops playbook. It covers the full agent lifecycle — provision, deploy, monitor, scale, retire — across fleets of tens to thousands of agents, all wired to the GreenHelix A2A Commerce Gateway's 128 tools across 15 services. You will build a Fleet Commander dashboard that consolidates provisioning, observability, cost control, SLA enforcement, scaling, and governance into a single operational view. Every chapter has working Python code, architecture diagrams, decision trees, and checklists. By the end, you will have the infrastructure to run agent fleets the way SREs run microservice fleets: with error budgets, automated escalation, cost guardrails, and an audit trail that holds up under regulatory scrutiny.

Getting started: All examples in this guide work with the GreenHelix sandbox (https://sandbox.greenhelix.net) which provides 500 free credits — no API key required.

What You'll Learn

  • Chapter 1: The AgentOps Discipline
  • Chapter 2: Fleet Provisioning and Identity at Scale
  • Chapter 3: Observability for Agent Fleets
  • Chapter 4: Cost-Aware Model Routing and FinOps
  • Chapter 5: SLA Enforcement and Automated Escalation
  • Chapter 6: Fleet Scaling Patterns
  • Chapter 7: Governance, Audit, and EU AI Act Compliance
  • Chapter 8: The Fleet Commander Dashboard
  • What You Get

Full Guide

AgentOps: The Practitioner's Guide to Managing AI Agent Fleets in Production

IBM coined "AgentOps" as a formal discipline in early 2026 for a reason that is now obvious: the bottleneck in enterprise AI shifted from building agents to keeping them alive. LangSmith Fleet launched in March 2026. Gartner projects 40% of agentic AI projects will fail by 2027 — not because the models are bad, but because nobody wrote the ops playbook. A Fortune 500 company leaked $400 million in cloud spend from unbudgeted agent compute last quarter. The EU AI Act's high-risk obligations become enforceable on August 2, 2026, and the Spanish DPA has already ruled that "greater technical autonomy does not reduce legal responsibility." This guide is the ops playbook. It covers the full agent lifecycle — provision, deploy, monitor, scale, retire — across fleets of tens to thousands of agents, all wired to the GreenHelix A2A Commerce Gateway's 128 tools across 15 services. You will build a Fleet Commander dashboard that consolidates provisioning, observability, cost control, SLA enforcement, scaling, and governance into a single operational view. Every chapter has working Python code, architecture diagrams, decision trees, and checklists. By the end, you will have the infrastructure to run agent fleets the way SREs run microservice fleets: with error budgets, automated escalation, cost guardrails, and an audit trail that holds up under regulatory scrutiny.


Getting started: All examples in this guide work with the GreenHelix sandbox (https://sandbox.greenhelix.net) which provides 500 free credits — no API key required.

Table of Contents

  1. The AgentOps Discipline
  2. Fleet Provisioning and Identity at Scale
  3. Observability for Agent Fleets
  4. Cost-Aware Model Routing and FinOps
  5. SLA Enforcement and Automated Escalation
  6. Fleet Scaling Patterns
  7. Governance, Audit, and EU AI Act Compliance
  8. The Fleet Commander Dashboard

Chapter 1: The AgentOps Discipline

Why Agent Fleets Are Not Microservices

DevOps manages deterministic services. MLOps manages model training pipelines. Neither discipline handles the unique problem of autonomous software agents that make decisions, spend money, negotiate with counterparties, and degrade in ways that are stochastic rather than binary. A microservice either responds or it does not. An agent can respond with confident, plausible, expensive nonsense — and you will not know until the invoice arrives or the SLA breach notification lands.

The fundamental differences:

Dimension Microservices (DevOps) Models (MLOps) Agents (AgentOps)
Failure mode Crash / timeout Accuracy drift Behavioral drift, cost explosion, SLA violation
State Stateless or DB-backed Model weights Identity, wallet, reputation, active SLAs
Scaling unit Container replicas GPU hours Capability + trust + budget
Rollback Deploy previous image Revert model version Revoke access, freeze wallet, reassign tasks
Cost model Compute per request Training + inference Token spend + tool calls + counterparty fees
Compliance SOC2, PCI Model cards EU AI Act Article 12, audit trails, human oversight

The Agent Lifecycle

Every agent in a fleet moves through five phases. GreenHelix tools map directly to each:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│  PROVISION   │────▶│   DEPLOY    │────▶│   MONITOR   │
│              │     │             │     │             │
│ register     │     │ register    │     │ submit      │
│ _agent       │     │ _service    │     │ _metrics    │
│ create       │     │ create_sla  │     │ get         │
│ _wallet      │     │ record      │     │ _analytics  │
│ build_claim  │     │ _transaction│     │ check_sla   │
│ _chain       │     │             │     │ _compliance │
└─────────────┘     └─────────────┘     └─────────────┘
       ▲                                       │
       │                                       ▼
┌─────────────┐                        ┌─────────────┐
│   RETIRE    │◀───────────────────────│    SCALE    │
│             │                        │             │
│ freeze      │                        │ search      │
│ wallet      │                        │ _services   │
│ revoke keys │                        │ best_match  │
│ archive     │                        │ estimate    │
│ audit trail │                        │ _cost       │
└─────────────┘                        └─────────────┘

The GreenHelix Tool Surface

All operations go through a single endpoint:

import requests

base_url = "https://api.greenhelix.net/v1"
api_key = "your-api-key"

session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"

# Every tool call follows this pattern
resp = session.post(f"{base_url}/v1", json={
    "tool": "tool_name",
    "input": {"param": "value"}
})
result = resp.json()

This uniformity is what makes fleet management tractable. You are not integrating 15 different APIs with 15 different auth mechanisms and 15 different error formats. You are calling one endpoint, one auth header, one JSON envelope, 128 tools. The fleet controller you build in Chapter 8 exploits this to treat every GreenHelix operation as a uniform unit of work.

The AgentOps Maturity Model

Score your organization:

Level Name Characteristics
0 Ad Hoc Agents run on developer laptops, no monitoring, manual key management
1 Provisioned Agents registered with identities and wallets, basic logging
2 Observable Per-agent dashboards, SLOs defined, cost tracking active
3 Governed SLA enforcement automated, audit trails, escalation pipelines
4 Autonomous Self-scaling fleets, budget-aware routing, compliance continuous

This guide takes you from Level 0 to Level 4. Each chapter corresponds to a maturity jump.

The Cost of Getting AgentOps Wrong

The numbers are unforgiving. The $400M Fortune 500 cloud leak was not a single dramatic failure — it was thousands of agents, each overspending by small amounts, compounding over months with no aggregation layer to surface the trend. A mid-size fintech reported a $2.3M surprise bill from a fleet of 80 research agents that were supposed to cost $12K/month. The root cause: each agent retried failed tool calls indefinitely because nobody configured a retry ceiling, and a downstream API had a 72-hour partial outage. The agents kept hammering the endpoint, accumulating token charges on every attempt. In both cases, the technology worked exactly as instructed. The agents completed their tasks. They just did it at 50x the expected cost because nobody was watching.

These are not engineering failures. They are operational failures — the same class of failure that drove the DevOps and SRE movements in the 2010s. The difference is that DevOps had a decade to develop its playbook. AgentOps does not. The EU AI Act deadline is in months. Gartner's 40% failure projection is for next year. The teams that build their ops layer now will be the ones that survive the Gartner trough.

Who This Guide Is For

This guide is written for platform engineers, SREs, and infrastructure leads who have been handed a fleet of AI agents and told to "make them production-ready." You know how to run Kubernetes clusters, set up Prometheus dashboards, and configure PagerDuty escalation policies. What you do not know — yet — is how to apply those instincts to software that makes its own decisions, spends its own money, and degrades in ways that look like success until the SLA report comes in.

You do not need to be an ML engineer. You do not need to understand transformer architectures or prompt engineering. You need to understand operational discipline — and this guide translates that discipline into the agent domain.


Chapter 2: Fleet Provisioning and Identity at Scale

The Provisioning Pipeline

Provisioning a single agent is trivial. Provisioning 200 agents with correct identities, funded wallets, appropriate tier access, reputation seeds, and SLA contracts — without a single credential collision or orphaned wallet — requires a pipeline.

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│ MANIFEST │───▶│ REGISTER │───▶│  WALLET  │───▶│  TRUST   │───▶│ VERIFY   │
│          │    │          │    │          │    │          │    │          │
│ YAML spec│    │ identity │    │ create + │    │ claim    │    │ health   │
│ per agent│    │ + keys   │    │ fund     │    │ chains   │    │ check    │
└──────────┘    └──────────┘    └──────────┘    └──────────┘    └──────────┘

Agent Manifest Format

Define each agent in a declarative manifest before touching any API:

# fleet-manifest.yaml
fleet:
  name: "customer-support-fleet"
  environment: "production"
  agents:
    - id_prefix: "cs-triage"
      count: 10
      tier: "pro"
      initial_balance: 50.00
      capabilities: ["text-classification", "sentiment-analysis"]
      trust_claims: ["response-time-p99-under-2s", "accuracy-above-95"]
      sla_template: "standard-support"
    - id_prefix: "cs-escalation"
      count: 3
      tier: "pro"
      initial_balance: 200.00
      capabilities: ["complex-reasoning", "multi-turn-dialogue"]
      trust_claims: ["human-supervised", "pii-compliant"]
      sla_template: "premium-support"

Programmatic Registration

import requests
import uuid
import time

base_url = "https://api.greenhelix.net/v1"
api_key = "your-fleet-admin-key"

session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"

def provision_agent(id_prefix, index, tier, initial_balance, capabilities, trust_claims):
    """Provision a single agent: register, wallet, trust chain."""
    agent_id = f"{id_prefix}-{index:04d}-{uuid.uuid4().hex[:8]}"

    # Step 1: Register the agent identity
    resp = session.post(f"{base_url}/v1", json={
        "tool": "register_agent",
        "input": {
            "agent_id": agent_id,
            "tier": tier,
            "capabilities": capabilities,
            "metadata": {
                "fleet": id_prefix,
                "provisioned_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
                "environment": "production"
            }
        }
    })
    if resp.status_code != 200:
        raise RuntimeError(f"Registration failed for {agent_id}: {resp.text}")

    registration = resp.json()

    # Step 2: Create and fund wallet
    resp = session.post(f"{base_url}/v1", json={
        "tool": "create_wallet",
        "input": {
            "agent_id": agent_id,
            "initial_balance": str(initial_balance)
        }
    })
    if resp.status_code != 200:
        raise RuntimeError(f"Wallet creation failed for {agent_id}: {resp.text}")

    wallet = resp.json()

    # Step 3: Bootstrap trust via claim chain
    resp = session.post(f"{base_url}/v1", json={
        "tool": "build_claim_chain",
        "input": {
            "agent_id": agent_id,
            "claims": trust_claims
        }
    })
    if resp.status_code != 200:
        raise RuntimeError(f"Claim chain failed for {agent_id}: {resp.text}")

    return {
        "agent_id": agent_id,
        "wallet_id": wallet.get("wallet_id"),
        "balance": initial_balance,
        "claims": trust_claims
    }


def provision_fleet(manifest):
    """Provision an entire fleet from a manifest."""
    results = {"succeeded": [], "failed": []}

    for agent_spec in manifest["fleet"]["agents"]:
        for i in range(agent_spec["count"]):
            try:
                agent = provision_agent(
                    id_prefix=agent_spec["id_prefix"],
                    index=i,
                    tier=agent_spec["tier"],
                    initial_balance=agent_spec["initial_balance"],
                    capabilities=agent_spec["capabilities"],
                    trust_claims=agent_spec["trust_claims"]
                )
                results["succeeded"].append(agent)
            except RuntimeError as e:
                results["failed"].append({
                    "index": i,
                    "prefix": agent_spec["id_prefix"],
                    "error": str(e)
                })

    return results

API Key Management at Scale

Never share a single API key across agents. Each agent gets its own key scoped to its tier and capabilities. Store keys in a secrets manager (Vault, AWS Secrets Manager, GCP Secret Manager) and inject at deploy time.

Key rotation pattern:

def rotate_agent_key(agent_id, old_key):
    """Rotate API key with zero-downtime overlap window."""
    # Generate new key (via your identity provider)
    # Configure agent to accept both keys during overlap
    # Verify new key works
    resp = session.post(f"{base_url}/v1", json={
        "tool": "register_agent",
        "input": {
            "agent_id": agent_id,
            "rotate_key": True
        }
    })
    new_key = resp.json().get("api_key")

    # Update secrets manager
    # Wait for propagation (60s overlap window)
    # Revoke old key
    return new_key

Reputation Seeding

New agents start with zero reputation, which creates a cold-start problem — no one wants to transact with an unproven agent. Reputation seeds solve this:

def seed_reputation(agent_id, seed_metrics):
    """Submit initial metrics to bootstrap reputation."""
    resp = session.post(f"{base_url}/v1", json={
        "tool": "submit_metrics",
        "input": {
            "agent_id": agent_id,
            "metrics": seed_metrics
        }
    })
    return resp.json()

# Seed a triage agent with baseline metrics
seed_reputation("cs-triage-0001-a1b2c3d4", {
    "response_time_ms": 450,
    "accuracy_rate": 0.96,
    "uptime_rate": 0.999,
    "tasks_completed": 0,
    "error_rate": 0.0
})

Fleet Provisioning Checklist

  • All agents have unique IDs with fleet prefix for grouping
  • Each agent has its own API key stored in secrets manager
  • Wallets are funded with initial balance matching expected burn rate
  • Trust claims are registered and verifiable
  • Tier access matches required tool permissions
  • Agent metadata includes fleet name, environment, and provisioning timestamp
  • Key rotation schedule is configured (every 90 days minimum)
  • Provisioning script is idempotent (safe to re-run)
  • Failed provisioning steps trigger cleanup (no orphaned wallets)
  • Fleet manifest is version-controlled

Chapter 3: Observability for Agent Fleets

The Three Pillars, Adapted for Agents

Traditional observability has three pillars: metrics, logs, traces. Agent observability adds two more: behavioral signals (what the agent decided to do and why) and economic signals (what it cost and who paid). Without these two additions, you can tell that your agent is up and fast but not that it is spending $14 per task on a job budgeted at $2.

Per-Agent Metrics Collection

import requests
import time

base_url = "https://api.greenhelix.net/v1"
api_key = "your-api-key"

session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"

def collect_agent_metrics(agent_id):
    """Collect comprehensive metrics for a single agent."""
    metrics = {}

    # Performance metrics
    resp = session.post(f"{base_url}/v1", json={
        "tool": "get_analytics",
        "input": {
            "agent_id": agent_id,
            "time_range": "1h",
            "metrics": ["latency_p50", "latency_p99", "error_rate",
                        "tool_call_volume", "task_completion_rate"]
        }
    })
    if resp.status_code == 200:
        metrics["performance"] = resp.json()

    # Reputation and trust
    resp = session.post(f"{base_url}/v1", json={
        "tool": "get_agent_reputation",
        "input": {"agent_id": agent_id}
    })
    if resp.status_code == 200:
        metrics["reputation"] = resp.json()

    # Financial position
    resp = session.post(f"{base_url}/v1", json={
        "tool": "get_balance",
        "input": {"agent_id": agent_id}
    })
    if resp.status_code == 200:
        metrics["balance"] = resp.json()

    # SLA compliance
    resp = session.post(f"{base_url}/v1", json={
        "tool": "check_sla_compliance",
        "input": {"agent_id": agent_id}
    })
    if resp.status_code == 200:
        metrics["sla"] = resp.json()

    return metrics

Fleet Health Aggregation

Individual agent metrics are necessary but insufficient. You need fleet-level aggregation to spot systemic issues — a fleet where 30% of agents have degraded latency is a different problem than one agent having a bad day.

┌─────────────────────────────────────────────────────────┐
│                  FLEET HEALTH DASHBOARD                  │
├─────────────┬─────────────┬─────────────┬───────────────┤
│  Agents     │  Healthy    │  Degraded   │  Critical     │
│  Total: 200 │  ███ 174    │  ██ 21      │  █ 5          │
├─────────────┼─────────────┼─────────────┼───────────────┤
│  P99 Lat.   │  Fleet Err  │  SLA Comp.  │  Burn Rate    │
│  1,240 ms   │  Rate: 2.1% │  97.3%      │  $42.10/hr    │
├─────────────┴─────────────┴─────────────┴───────────────┤
│  ▁▂▃▄▅▆▇█▇▆▅▄▃▂▁  Tool call volume (24h)               │
│  ▕████████████████▏  Peak: 14:00 UTC                    │
├─────────────────────────────────────────────────────────┤
│  ALERTS                                                  │
│  ⚠ cs-triage-0042: latency P99 > 3000ms (15 min)       │
│  ⚠ cs-escalation-0002: balance below $20 threshold      │
│  ✗ cs-triage-0187: SLA breach — response time           │
└─────────────────────────────────────────────────────────┘
def aggregate_fleet_health(agent_ids):
    """Aggregate metrics across a fleet of agents."""
    fleet_metrics = {
        "total": len(agent_ids),
        "healthy": 0,
        "degraded": 0,
        "critical": 0,
        "total_burn_rate": 0.0,
        "sla_compliant": 0,
        "latencies_p99": [],
        "error_rates": [],
        "alerts": []
    }

    for agent_id in agent_ids:
        metrics = collect_agent_metrics(agent_id)

        perf = metrics.get("performance", {})
        balance = metrics.get("balance", {})
        sla = metrics.get("sla", {})

        error_rate = perf.get("error_rate", 0)
        latency_p99 = perf.get("latency_p99", 0)

        fleet_metrics["latencies_p99"].append(latency_p99)
        fleet_metrics["error_rates"].append(error_rate)

        # Classify agent health
        if error_rate > 0.10 or latency_p99 > 5000:
            fleet_metrics["critical"] += 1
            fleet_metrics["alerts"].append({
                "agent_id": agent_id,
                "severity": "critical",
                "reason": f"error_rate={error_rate}, p99={latency_p99}ms"
            })
        elif error_rate > 0.05 or latency_p99 > 3000:
            fleet_metrics["degraded"] += 1
            fleet_metrics["alerts"].append({
                "agent_id": agent_id,
                "severity": "warning",
                "reason": f"error_rate={error_rate}, p99={latency_p99}ms"
            })
        else:
            fleet_metrics["healthy"] += 1

        # SLA tracking
        if sla.get("compliant", False):
            fleet_metrics["sla_compliant"] += 1

        # Cost tracking
        fleet_metrics["total_burn_rate"] += float(balance.get("burn_rate_per_hour", 0))

    fleet_metrics["sla_compliance_pct"] = (
        fleet_metrics["sla_compliant"] / fleet_metrics["total"] * 100
        if fleet_metrics["total"] > 0 else 0
    )

    return fleet_metrics

Alerting Pipeline

Define alert rules that escalate from notification to automated remediation:

Severity Condition Action
Info P99 latency > 2x baseline for 5 min Log, notify Slack
Warning Error rate > 5% for 10 min Notify on-call, reduce traffic share
Critical SLA breach or balance \x3C $5 Page on-call, auto-pause agent, escalate
Emergency > 20% of fleet critical Page fleet commander, freeze provisioning
def evaluate_alerts(fleet_metrics, thresholds):
    """Evaluate fleet metrics against alerting thresholds."""
    alerts = []

    critical_pct = fleet_metrics["critical"] / fleet_metrics["total"] * 100
    if critical_pct > thresholds.get("emergency_critical_pct", 20):
        alerts.append({
            "severity": "emergency",
            "message": f"{critical_pct:.1f}% of fleet in critical state",
            "action": "freeze_provisioning"
        })

    if fleet_metrics["sla_compliance_pct"] \x3C thresholds.get("min_sla_pct", 95):
        alerts.append({
            "severity": "critical",
            "message": f"Fleet SLA compliance at {fleet_metrics['sla_compliance_pct']:.1f}%",
            "action": "page_fleet_commander"
        })

    if fleet_metrics["total_burn_rate"] > thresholds.get("max_burn_rate_per_hour", 100):
        alerts.append({
            "severity": "warning",
            "message": f"Fleet burn rate ${fleet_metrics['total_burn_rate']:.2f}/hr exceeds budget",
            "action": "notify_finops"
        })

    return alerts

Behavioral Observability: The Fifth Pillar

Traditional metrics tell you the agent is slow. Behavioral observability tells you why. An agent that suddenly starts calling search_services 40 times per task instead of 3 might have a perfectly healthy error rate and latency — but it is thrashing through the marketplace because its ranking heuristic has drifted. Without behavioral signals, you would never catch this until the cost spike hits.

Track these behavioral metrics per agent:

Metric What It Reveals Alert Threshold
Tool calls per task Efficiency / thrashing > 2x baseline
Unique tools per task Task complexity drift > 3x baseline
Retry ratio Downstream health > 20% of calls are retries
Escalation rate Agent capability limits > 15% of tasks escalated
Decision latency Reasoning bottlenecks P99 > 5x P50
Counterparty diversity Market exploration \x3C 2 unique counterparties/day
def collect_behavioral_metrics(agent_id, task_log):
    """Extract behavioral signals from a task execution log."""
    metrics = {
        "tool_calls_per_task": len(task_log.get("tool_calls", [])),
        "unique_tools": len(set(tc["tool"] for tc in task_log.get("tool_calls", []))),
        "retries": sum(1 for tc in task_log.get("tool_calls", []) if tc.get("is_retry")),
        "escalated": task_log.get("escalated", False),
        "decision_latency_ms": task_log.get("decision_latency_ms", 0)
    }

    # Submit behavioral metrics alongside performance metrics
    resp = session.post(f"{base_url}/v1", json={
        "tool": "submit_metrics",
        "input": {
            "agent_id": agent_id,
            "metrics": metrics
        }
    })
    return resp.json()

Distributed Tracing for Multi-Agent Workflows

When Agent A delegates a subtask to Agent B, which calls Agent C for data enrichment, a single task spans three agents and potentially dozens of tool calls. Without distributed tracing, debugging a failed task means manually correlating logs across agents — the same problem microservices solved with Jaeger and Zipkin.

Implement trace propagation by passing a trace context through every tool call:

import uuid

def create_trace_context(parent_trace_id=None):
    """Create or extend a trace context for cross-agent tracing."""
    trace_id = parent_trace_id or str(uuid.uuid4())
    span_id = str(uuid.uuid4())[:16]
    return {"trace_id": trace_id, "span_id": span_id}

def traced_tool_call(agent_id, tool_name, tool_input, trace_ctx):
    """Execute a tool call with trace context propagation."""
    resp = session.post(f"{base_url}/v1", json={
        "tool": tool_name,
        "input": {
            **tool_input,
            "_trace": trace_ctx
        }
    })
    result = resp.json()

    # Record the span in the audit trail for correlation
    session.post(f"{base_url}/v1", json={
        "tool": "create_audit_trail",
        "input": {
            "agent_id": agent_id,
            "action": f"tool_call:{tool_name}",
            "details": {
                "trace_id": trace_ctx["trace_id"],
                "span_id": trace_ctx["span_id"],
                "status_code": resp.status_code,
                "latency_ms": resp.elapsed.microseconds // 1000
            }
        }
    })
    return result

Observability Checklist

  • Per-agent metrics collection runs every 60 seconds
  • Fleet aggregation computes health classification (healthy/degraded/critical)
  • Alerting thresholds configured for latency, error rate, SLA, and cost
  • Escalation pipeline tested end-to-end (can you verify the page fires?)
  • Dashboard shows real-time fleet health with drill-down to individual agents
  • Metrics retained for 90 days minimum (compliance requirement)
  • Anomaly detection enabled for cost spikes (>200% of hourly baseline)
  • Tool-call-level tracing enabled for debugging cascading failures
  • Behavioral metrics tracked per agent (tool calls per task, retry ratio, escalation rate)
  • Distributed trace context propagated across multi-agent workflows

Chapter 4: Cost-Aware Model Routing and FinOps

The Agent FinOps Problem

A microservice has a predictable cost profile: X requests per second times Y compute per request. An agent has a stochastic cost profile: it might resolve a task in 2 tool calls or 47, it might choose a cheap model or an expensive one, and it might retry failed calls indefinitely if you do not stop it. The $400M Fortune 500 cloud leak happened because nobody modeled the cost distribution of agent behavior under production load.

Token-Level Cost Tracking

import requests

base_url = "https://api.greenhelix.net/v1"
api_key = "your-api-key"

session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"

def track_task_cost(agent_id, task_id, tool_calls):
    """Track per-task cost across all tool calls."""
    total_cost = 0.0

    for call in tool_calls:
        # Estimate cost before executing
        resp = session.post(f"{base_url}/v1", json={
            "tool": "estimate_cost",
            "input": {
                "tool_name": call["tool"],
                "input_size": call.get("estimated_tokens", 100)
            }
        })
        estimate = resp.json()
        estimated_cost = float(estimate.get("estimated_cost", "0"))

        # Check budget before executing
        if total_cost + estimated_cost > call.get("budget_remaining", float("inf")):
            return {
                "status": "budget_exceeded",
                "total_cost": str(total_cost),
                "blocked_call": call["tool"]
            }

        # Execute the tool call
        resp = session.post(f"{base_url}/v1", json={
            "tool": call["tool"],
            "input": call["input"]
        })

        # Record the transaction
        resp = session.post(f"{base_url}/v1", json={
            "tool": "record_transaction",
            "input": {
                "agent_id": agent_id,
                "task_id": task_id,
                "tool": call["tool"],
                "cost": str(estimated_cost),
                "timestamp": call.get("timestamp")
            }
        })

        total_cost += estimated_cost

    return {"status": "completed", "total_cost": str(total_cost)}

The Model Routing Decision Tree

Not every task needs GPT-4 or Claude Opus. Most fleets waste 60-70% of their token budget routing simple classification tasks to frontier models.

                    ┌──────────────────────┐
                    │   Incoming Task       │
                    └──────────┬───────────┘
                               │
                    ┌──────────▼───────────┐
                    │ Complexity estimate?  │
                    └──────────┬───────────┘
                       ┌───────┴───────┐
                       ▼               ▼
                  ┌─────────┐    ┌──────────┐
                  │  LOW    │    │ MED/HIGH │
                  │(\x3C100tok)│    │          │
                  └────┬────┘    └────┬─────┘
                       │              │
                       ▼              ▼
              ┌──────────────┐  ┌─────────────────┐
              │ Small model  │  │ Budget remaining │
              │ (Haiku/mini) │  │ > threshold?     │
              │ $0.001/task  │  └────────┬────────┘
              └──────────────┘     ┌─────┴─────┐
                                   ▼           ▼
                            ┌──────────┐ ┌──────────┐
                            │   YES    │ │    NO    │
                            │ Frontier │ │ Mid-tier │
                            │ model    │ │ model    │
                            │$0.03/task│ │$0.008/tsk│
                            └──────────┘ └──────────┘

Budget Guardrails

class BudgetGuardrail:
    """Enforce per-agent, per-fleet, and per-task budget limits."""

    def __init__(self, session, base_url):
        self.session = session
        self.base_url = base_url
        self.limits = {
            "per_task_max": 5.00,
            "per_agent_hourly_max": 50.00,
            "fleet_daily_max": 2000.00
        }

    def check_balance(self, agent_id):
        """Check if agent has sufficient funds to continue."""
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "get_balance",
            "input": {"agent_id": agent_id}
        })
        balance = resp.json()
        current = float(balance.get("balance", "0"))
        return current > self.limits["per_task_max"]

    def get_volume_discount(self, agent_id):
        """Check if agent qualifies for volume pricing."""
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "get_volume_discount",
            "input": {"agent_id": agent_id}
        })
        return resp.json()

    def convert_cost_estimate(self, amount, from_currency, to_currency):
        """Convert cost estimates across currencies for global fleets."""
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "convert_currency",
            "input": {
                "amount": str(amount),
                "from_currency": from_currency,
                "to_currency": to_currency
            }
        })
        return resp.json()

    def enforce(self, agent_id, task_cost):
        """Enforce budget guardrails. Returns (allowed, reason)."""
        if task_cost > self.limits["per_task_max"]:
            return False, f"Task cost ${task_cost:.2f} exceeds per-task limit ${self.limits['per_task_max']:.2f}"

        if not self.check_balance(agent_id):
            return False, f"Agent {agent_id} balance below minimum threshold"

        return True, "within_budget"

Fleet Economics Model

Build a spreadsheet model before deploying. Here is the math:

Fleet Monthly Cost = Σ(agent_i) [
    (tasks_per_day × avg_tool_calls_per_task × avg_cost_per_call × 30)
    + (provisioning_cost)
    + (monitoring_overhead)
]

ROI = (revenue_from_agent_tasks - fleet_monthly_cost) / fleet_monthly_cost

Break-even point = fleet_monthly_cost / revenue_per_task

Example: A 50-agent triage fleet handling 10,000 tasks/day at $0.003/task average cost:

  • Monthly tool cost: 10,000 x 3.2 avg calls x $0.003 x 30 = $2,880
  • Monitoring overhead (5%): $144
  • Total: $3,024/month
  • If each task saves $0.50 in human labor: Revenue = 10,000 x $0.50 x 30 = $150,000/month
  • ROI: 4,860%

Cost Attribution: Who Pays for Shared Work?

In multi-agent workflows, a single task may involve several agents. The triage agent classifies it, the specialist agent processes it, the quality agent validates it. Who pays for each step? Without clear cost attribution, fleet economics modeling is fiction.

Three attribution models:

Caller-pays: The agent that initiates a tool call or delegates a subtask pays the full cost. Simple to implement. Penalizes orchestrator agents that coordinate work but add the least value per call.

Proportional split: Each agent in the workflow pays a share proportional to the token volume or tool calls they contributed. Fair but complex to implement — requires full trace context to compute shares.

Task-budget pool: A budget is allocated to the task, not the agent. All agents draw from the same pool. The fleet operator monitors the pool, not individual agents. Best for workflows where agent boundaries are fluid.

def attribute_task_cost(task_trace, model="proportional"):
    """Attribute costs across agents in a multi-agent workflow."""
    agent_costs = {}

    for span in task_trace.get("spans", []):
        agent_id = span["agent_id"]
        cost = float(span.get("cost", "0"))

        if agent_id not in agent_costs:
            agent_costs[agent_id] = 0.0
        agent_costs[agent_id] += cost

    total = sum(agent_costs.values())

    if model == "caller_pays":
        initiator = task_trace["initiating_agent"]
        return {initiator: str(total)}

    elif model == "proportional":
        return {
            agent_id: str(cost)
            for agent_id, cost in agent_costs.items()
        }

    elif model == "task_pool":
        return {"task_pool": str(total), "breakdown": agent_costs}

    return agent_costs

The Weekly FinOps Review

Schedule a weekly review with this agenda:

  1. Budget vs. actual — Compare each fleet segment's actual spend to its budget. Flag any segment exceeding 110% of plan.
  2. Cost per task trend — Is cost per task stable, rising, or falling? Rising cost at stable volume means agent efficiency is degrading.
  3. Model mix audit — What percentage of tasks went to frontier models vs. mid-tier vs. small? Is the routing decision tree working?
  4. Top 10 most expensive tasks — Review the outliers. Were they legitimately complex or did an agent thrash?
  5. Volume discount capture — Are you hitting volume tiers? If not, should you consolidate traffic to fewer agents to hit the next tier?

FinOps Checklist

  • Per-task budget limits configured and enforced
  • Per-agent hourly spending caps in place
  • Fleet daily budget ceiling with automated pause on breach
  • Cost estimates run before every tool call (estimate_cost)
  • Volume discount eligibility checked at provisioning
  • Model routing routes \x3C100 token tasks to cheapest model
  • Weekly cost review comparing actual vs. budgeted spend
  • Anomaly detection for cost spikes (agent spending >3x normal)
  • Currency conversion configured for global fleet operations
  • Break-even analysis documented per fleet segment
  • Cost attribution model chosen and implemented for multi-agent workflows
  • Top-10 expensive task review runs weekly

Chapter 5: SLA Enforcement and Automated Escalation

Defining Agent-Level SLAs

An SLA for an agent is not the same as an SLA for a web service. A web service SLA covers availability and latency. An agent SLA must also cover accuracy, cost-per-task, and behavioral bounds — things that are harder to measure and harder to enforce.

SLA dimensions for agents:

Dimension Metric Example Target
Response time P99 latency \x3C 2,000 ms
Availability Uptime percentage > 99.5%
Accuracy Task success rate > 95%
Cost Average cost per task \x3C $0.05
Behavioral Escalation rate \x3C 10% of tasks

Creating SLAs Programmatically

import requests

base_url = "https://api.greenhelix.net/v1"
api_key = "your-api-key"

session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"

def create_agent_sla(agent_id, sla_template):
    """Create an SLA contract for an agent."""
    sla_definitions = {
        "standard-support": {
            "response_time_p99_ms": 2000,
            "uptime_pct": 99.5,
            "accuracy_pct": 95.0,
            "max_cost_per_task": "0.05",
            "review_period_days": 30
        },
        "premium-support": {
            "response_time_p99_ms": 500,
            "uptime_pct": 99.9,
            "accuracy_pct": 98.0,
            "max_cost_per_task": "0.10",
            "review_period_days": 7
        }
    }

    sla_config = sla_definitions.get(sla_template)
    if not sla_config:
        raise ValueError(f"Unknown SLA template: {sla_template}")

    resp = session.post(f"{base_url}/v1", json={
        "tool": "create_sla",
        "input": {
            "agent_id": agent_id,
            "terms": sla_config
        }
    })
    return resp.json()

Continuous Compliance Monitoring

def monitor_sla_compliance(agent_ids, interval_minutes=5):
    """Continuously monitor SLA compliance across fleet."""
    violations = []

    for agent_id in agent_ids:
        resp = session.post(f"{base_url}/v1", json={
            "tool": "check_sla_compliance",
            "input": {"agent_id": agent_id}
        })
        compliance = resp.json()

        if not compliance.get("compliant", True):
            # Get violation details
            resp = session.post(f"{base_url}/v1", json={
                "tool": "get_sla_violations",
                "input": {"agent_id": agent_id}
            })
            agent_violations = resp.json()

            violations.append({
                "agent_id": agent_id,
                "violations": agent_violations,
                "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
            })

    return violations

The Automated Escalation Pipeline

When an SLA breach is detected, the system should not wait for a human to notice. The escalation pipeline has four tiers:

┌────────────────────────────────────────────────────────────┐
│                  ESCALATION PIPELINE                        │
├────────────┬───────────────────┬───────────────────────────┤
│   Tier 1   │ Auto-remediate    │ Restart agent, clear      │
│ (0-5 min)  │                   │ cache, check dependencies │
├────────────┼───────────────────┼───────────────────────────┤
│   Tier 2   │ Reroute traffic   │ Shift load to healthy     │
│ (5-15 min) │                   │ agents, reduce task rate   │
├────────────┼───────────────────┼───────────────────────────┤
│   Tier 3   │ Notify + dispute  │ Send message to operator, │
│ (15-30 min)│                   │ create dispute if needed   │
├────────────┼───────────────────┼───────────────────────────┤
│   Tier 4   │ Freeze + escalate │ Pause agent, freeze wallet│
│ (30+ min)  │                   │ human intervention needed  │
└────────────┴───────────────────┴───────────────────────────┘
def escalate_sla_violation(agent_id, violation, elapsed_minutes):
    """Execute escalation based on violation duration."""

    if elapsed_minutes \x3C 5:
        # Tier 1: Auto-remediate
        return {"action": "restart", "agent_id": agent_id}

    elif elapsed_minutes \x3C 15:
        # Tier 2: Reroute traffic
        return {"action": "reroute", "agent_id": agent_id, "reduce_traffic_pct": 50}

    elif elapsed_minutes \x3C 30:
        # Tier 3: Notify via messaging, file dispute if counterparty involved
        resp = session.post(f"{base_url}/v1", json={
            "tool": "send_message",
            "input": {
                "to": "fleet-commander",
                "subject": f"SLA violation: {agent_id}",
                "body": f"Agent {agent_id} has been in violation for {elapsed_minutes} min. "
                        f"Violation: {violation}"
            }
        })

        if violation.get("counterparty"):
            resp = session.post(f"{base_url}/v1", json={
                "tool": "create_dispute",
                "input": {
                    "agent_id": agent_id,
                    "counterparty": violation["counterparty"],
                    "reason": "sla_breach",
                    "details": violation
                }
            })

        return {"action": "notified_and_disputed", "agent_id": agent_id}

    else:
        # Tier 4: Freeze agent
        return {"action": "frozen", "agent_id": agent_id, "requires_human": True}

Dispute Resolution for Fleet Operators

When an agent breaches an SLA with a counterparty, disputes need structured resolution:

def resolve_fleet_disputes(fleet_prefix):
    """Review and resolve open disputes for a fleet."""
    # In production, query for open disputes filtered by fleet prefix
    # and apply resolution policies

    resp = session.post(f"{base_url}/v1", json={
        "tool": "resolve_dispute",
        "input": {
            "dispute_id": "dispute-123",
            "resolution": "partial_refund",
            "amount": "5.00",
            "notes": "SLA response time breach confirmed. 50% credit applied."
        }
    })
    return resp.json()

SLA Enforcement Checklist

  • SLA templates defined for each agent role (triage, escalation, specialist)
  • SLA contracts created programmatically at provisioning time
  • Compliance checks run every 5 minutes per agent
  • Escalation pipeline tested end-to-end through all 4 tiers
  • Dispute creation automated for counterparty-facing SLA breaches
  • Dispute resolution policies documented and approved by legal
  • SLA violation history retained for compliance reporting
  • Monthly SLA review cadence with fleet performance report
  • Error budget model: 0.5% monthly error budget = 219 minutes of allowed violation

Chapter 6: Fleet Scaling Patterns

Horizontal Scaling: Specialist Agents

Horizontal scaling for agents is not "add more replicas." It is "add more specialists." A triage agent and an escalation agent are not interchangeable the way two nginx pods are.

┌─────────────────────────────────────────────────────┐
│                   TASK ROUTER                        │
│  incoming tasks → classify → route to specialist     │
└──────────┬──────────┬──────────┬───────────────────┘
           │          │          │
     ┌─────▼────┐ ┌──▼──────┐ ┌▼──────────┐
     │ TRIAGE   │ │ BILLING │ │ ESCALATION│
     │ FLEET    │ │ FLEET   │ │ FLEET     │
     │ (20 agt) │ │ (5 agt) │ │ (3 agt)  │
     │          │ │         │ │           │
     │ Simple   │ │ Payment │ │ Complex   │
     │ classify │ │ queries │ │ reasoning │
     │ + route  │ │ + calc  │ │ + human   │
     └──────────┘ └─────────┘ └───────────┘

Dynamic Task Routing via Marketplace

Instead of hardcoding which agent handles which task, use the marketplace to find the best agent dynamically:

import requests

base_url = "https://api.greenhelix.net/v1"
api_key = "your-api-key"

session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"

def route_task_to_best_agent(task):
    """Find the best agent for a task using marketplace discovery."""

    # Step 1: Search for agents with required capabilities
    resp = session.post(f"{base_url}/v1", json={
        "tool": "search_services",
        "input": {
            "capabilities": task["required_capabilities"],
            "min_reputation": 0.8,
            "max_cost": task.get("budget", "1.00")
        }
    })
    candidates = resp.json().get("services", [])

    if not candidates:
        return {"status": "no_candidates", "task_id": task["id"]}

    # Step 2: Find best match considering cost, reputation, and latency
    resp = session.post(f"{base_url}/v1", json={
        "tool": "best_match",
        "input": {
            "requirements": {
                "capabilities": task["required_capabilities"],
                "max_latency_ms": task.get("max_latency_ms", 2000),
                "max_cost": task.get("budget", "1.00")
            },
            "candidates": [c["agent_id"] for c in candidates]
        }
    })
    match = resp.json()

    # Step 3: Check SLA before routing
    best_agent = match.get("best_match")
    if best_agent:
        resp = session.post(f"{base_url}/v1", json={
            "tool": "check_sla_compliance",
            "input": {"agent_id": best_agent}
        })
        compliance = resp.json()

        if compliance.get("compliant", False):
            return {"status": "routed", "agent_id": best_agent, "task_id": task["id"]}

    return {"status": "no_compliant_candidate", "task_id": task["id"]}

Vertical Scaling: Capability Upgrades

Vertical scaling means upgrading an agent's capabilities — switching from a small model to a large one, granting access to more tools, or increasing its tier. This is not free: higher capability means higher cost.

def upgrade_agent_tier(agent_id, new_tier):
    """Upgrade agent to a higher capability tier."""

    # Estimate cost impact
    resp = session.post(f"{base_url}/v1", json={
        "tool": "estimate_cost",
        "input": {
            "agent_id": agent_id,
            "new_tier": new_tier,
            "projected_volume": "daily"
        }
    })
    cost_estimate = resp.json()

    # Verify budget approval
    current_cost = float(cost_estimate.get("current_daily_cost", "0"))
    projected_cost = float(cost_estimate.get("projected_daily_cost", "0"))
    cost_increase_pct = ((projected_cost - current_cost) / current_cost * 100
                          if current_cost > 0 else float("inf"))

    if cost_increase_pct > 200:
        return {
            "status": "requires_approval",
            "cost_increase_pct": cost_increase_pct,
            "projected_daily_cost": str(projected_cost)
        }

    # Register with upgraded tier
    resp = session.post(f"{base_url}/v1", json={
        "tool": "register_agent",
        "input": {
            "agent_id": agent_id,
            "tier": new_tier,
            "upgrade": True
        }
    })
    return resp.json()

Swarm vs. Orchestrator: The Decision Matrix

Factor Swarm (peer-to-peer) Orchestrator (centralized)
Task complexity Homogeneous, parallelizable Heterogeneous, sequential dependencies
Failure tolerance High — any agent can take over Low — orchestrator is SPOF
Cost control Hard — decentralized spend Easy — centralized budget
Latency Lower — no orchestrator hop Higher — extra routing step
Debugging Hard — distributed traces Easier — centralized logs
Scaling Add agents to swarm Scale orchestrator + agents
Best for Data processing, scraping, monitoring Customer support, multi-step workflows, compliance

Decision rule: If tasks are independent and agents are interchangeable, use a swarm. If tasks have dependencies and agents are specialized, use an orchestrator. Most production fleets use a hybrid: an orchestrator routes to specialist swarms.

Auto-Scaling Rules

def evaluate_scaling_decision(fleet_metrics, scaling_config):
    """Decide whether to scale fleet up, down, or hold."""
    decisions = []

    # Scale up: queue depth increasing
    queue_depth = fleet_metrics.get("pending_tasks", 0)
    agents_active = fleet_metrics.get("healthy", 0)
    tasks_per_agent = queue_depth / max(agents_active, 1)

    if tasks_per_agent > scaling_config["scale_up_threshold"]:
        new_count = min(
            agents_active + scaling_config["scale_up_increment"],
            scaling_config["max_agents"]
        )
        decisions.append({
            "action": "scale_up",
            "from": agents_active,
            "to": new_count,
            "reason": f"Queue depth {queue_depth}, {tasks_per_agent:.1f} tasks/agent"
        })

    # Scale down: agents idle
    elif tasks_per_agent \x3C scaling_config["scale_down_threshold"]:
        new_count = max(
            agents_active - scaling_config["scale_down_increment"],
            scaling_config["min_agents"]
        )
        if new_count \x3C agents_active:
            decisions.append({
                "action": "scale_down",
                "from": agents_active,
                "to": new_count,
                "reason": f"Low utilization: {tasks_per_agent:.1f} tasks/agent"
            })

    # Cost-based scaling: burn rate too high
    if fleet_metrics.get("total_burn_rate", 0) > scaling_config["max_burn_rate"]:
        decisions.append({
            "action": "cost_scale_down",
            "reason": f"Burn rate ${fleet_metrics['total_burn_rate']:.2f}/hr exceeds limit"
        })

    return decisions if decisions else [{"action": "hold", "reason": "Within thresholds"}]

Load Balancing Strategies

Agent load balancing is not round-robin. Agents are not fungible — they have different capabilities, reputations, costs, and remaining budgets. The load balancer must account for all four.

                     INCOMING TASK
                          │
                          ▼
                  ┌───────────────┐
                  │   CLASSIFY    │
                  │  task type +  │
                  │  complexity   │
                  └───────┬───────┘
                          │
              ┌───────────┼───────────┐
              ▼           ▼           ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ CAPABLE  │ │ CAPABLE  │ │ CAPABLE  │
        │ AGENTS   │ │ AGENTS   │ │ AGENTS   │
        │ Pool A   │ │ Pool B   │ │ Pool C   │
        └─────┬────┘ └─────┬────┘ └─────┬────┘
              │             │             │
              ▼             ▼             ▼
        ┌──────────────────────────────────────┐
        │        WEIGHTED SELECTION             │
        │                                      │
        │  Score = (reputation × 0.4)          │
        │       + (budget_headroom × 0.2)      │
        │       + (current_idle × 0.2)         │
        │       + (sla_margin × 0.2)           │
        └──────────────────┬───────────────────┘
                           │
                           ▼
                    SELECTED AGENT
def weighted_agent_selection(capable_agents, task):
    """Select the best agent from a pool using weighted scoring."""
    scored = []

    for agent_id in capable_agents:
        # Get reputation
        resp = session.post(f"{base_url}/v1", json={
            "tool": "get_agent_reputation",
            "input": {"agent_id": agent_id}
        })
        reputation = float(resp.json().get("score", 0.5))

        # Get balance headroom
        resp = session.post(f"{base_url}/v1", json={
            "tool": "get_balance",
            "input": {"agent_id": agent_id}
        })
        balance = float(resp.json().get("balance", "0"))
        budget_headroom = min(balance / 100.0, 1.0)  # Normalize to 0-1

        # Check SLA margin
        resp = session.post(f"{base_url}/v1", json={
            "tool": "check_sla_compliance",
            "input": {"agent_id": agent_id}
        })
        sla_data = resp.json()
        sla_margin = float(sla_data.get("margin_pct", 50)) / 100.0

        # Composite score
        score = (reputation * 0.4
                 + budget_headroom * 0.2
                 + sla_margin * 0.2
                 + 0.2)  # idle factor — in production, wire to queue depth

        scored.append({"agent_id": agent_id, "score": score})

    scored.sort(key=lambda x: x["score"], reverse=True)
    return scored[0]["agent_id"] if scored else None

Graceful Agent Retirement

Scaling down means retiring agents, and retirement is not just "stop the process." An agent with in-flight tasks, open SLAs, and a funded wallet must be drained gracefully:

  1. Stop routing — Remove agent from the task router's eligible pool.
  2. Drain tasks — Wait for in-flight tasks to complete (with a timeout).
  3. Settle SLAs — Close or transfer open SLA contracts.
  4. Withdraw funds — Transfer remaining wallet balance back to the fleet treasury.
  5. Archive — Create a final audit trail entry recording the retirement.
  6. Deregister — Remove from marketplace and identity registry.
def retire_agent(agent_id, drain_timeout_seconds=300):
    """Gracefully retire an agent from the fleet."""

    # Step 1: Remove from task routing (mark inactive)
    # This is application-level — stop sending tasks to this agent

    # Step 2: Wait for in-flight tasks to drain
    import time
    start = time.time()
    while time.time() - start \x3C drain_timeout_seconds:
        # Check if agent has pending tasks (application-level check)
        time.sleep(10)

    # Step 3: Record final metrics snapshot
    resp = session.post(f"{base_url}/v1", json={
        "tool": "get_analytics",
        "input": {"agent_id": agent_id, "time_range": "lifetime"}
    })
    final_metrics = resp.json()

    # Step 4: Create retirement audit trail
    resp = session.post(f"{base_url}/v1", json={
        "tool": "create_audit_trail",
        "input": {
            "agent_id": agent_id,
            "action": "retired",
            "details": {
                "final_metrics": final_metrics,
                "reason": "scale_down",
                "retired_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
            }
        }
    })

    return {"agent_id": agent_id, "status": "retired"}

Scaling Checklist

  • Task router classifies and routes before assigning to agents
  • Marketplace discovery (search_services, best_match) used for dynamic routing
  • SLA compliance checked before routing to any agent
  • Tier upgrade requires cost impact analysis and budget approval
  • Auto-scaling rules defined with min/max agent counts
  • Scale-down cooldown period (10 min) prevents thrashing
  • Swarm vs. orchestrator decision documented per fleet segment
  • Load testing validates fleet at 3x expected peak volume
  • Weighted agent selection accounts for reputation, budget, SLA margin
  • Graceful retirement drains tasks and settles financials before deregistering

Chapter 7: Governance, Audit, and EU AI Act Compliance

The August 2, 2026 Deadline

The EU AI Act's provisions for high-risk AI systems become enforceable on August 2, 2026. If your agent fleet operates in the EU, processes data of EU residents, or transacts with EU-based counterparties, you must comply. The penalties are up to 35 million euros or 7% of global annual turnover, whichever is higher. The Spanish DPA has already ruled that "greater technical autonomy does not reduce legal responsibility" — meaning the operator of the agent fleet, not the model provider, bears liability.

Key obligations for agent fleet operators:

Article Requirement What It Means for Fleets
Art. 9 Risk management system Documented risk assessment per agent type
Art. 11 Technical documentation Architecture docs, training data summary, validation results
Art. 12 Record-keeping Audit trail of all agent actions, retrievable for 6+ months
Art. 13 Transparency Agents must identify themselves as AI to counterparties
Art. 14 Human oversight Human-in-the-loop checkpoints for high-risk decisions
Art. 15 Accuracy/robustness Ongoing monitoring of accuracy, error rates, failure modes

Building Audit Trails

Every agent action must be recorded in an immutable audit trail. GreenHelix's create_audit_trail tool provides the foundation:

import requests
import time
import hashlib
import json

base_url = "https://api.greenhelix.net/v1"
api_key = "your-api-key"

session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"

def record_auditable_action(agent_id, action_type, details, human_approved=False):
    """Record an agent action with full audit trail."""

    # Create deterministic hash for integrity verification
    record = {
        "agent_id": agent_id,
        "action_type": action_type,
        "details": details,
        "human_approved": human_approved,
        "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
    }
    record_hash = hashlib.sha256(json.dumps(record, sort_keys=True).encode()).hexdigest()

    resp = session.post(f"{base_url}/v1", json={
        "tool": "create_audit_trail",
        "input": {
            "agent_id": agent_id,
            "action": action_type,
            "details": details,
            "metadata": {
                "human_approved": human_approved,
                "record_hash": record_hash,
                "eu_ai_act_article": "12"
            }
        }
    })
    return resp.json()


def wrap_tool_call_with_audit(agent_id, tool_name, tool_input, requires_human_approval=False):
    """Wrapper that creates audit trail for every tool call."""

    # Record the intent
    record_auditable_action(agent_id, "tool_call_initiated", {
        "tool": tool_name,
        "input_summary": str(tool_input)[:500]  # Truncate for storage
    })

    # Check if human approval needed (Art. 14 human oversight)
    if requires_human_approval:
        approval = request_human_approval(agent_id, tool_name, tool_input)
        if not approval.get("approved"):
            record_auditable_action(agent_id, "tool_call_blocked", {
                "tool": tool_name,
                "reason": "human_rejected",
                "reviewer": approval.get("reviewer")
            }, human_approved=False)
            return {"status": "blocked", "reason": "human_rejected"}

    # Execute the tool call
    resp = session.post(f"{base_url}/v1", json={
        "tool": tool_name,
        "input": tool_input
    })
    result = resp.json()

    # Record the result
    record_auditable_action(agent_id, "tool_call_completed", {
        "tool": tool_name,
        "status_code": resp.status_code,
        "result_summary": str(result)[:500]
    }, human_approved=requires_human_approval)

    return result


def request_human_approval(agent_id, tool_name, tool_input):
    """Request human-in-the-loop approval for high-risk actions."""
    resp = session.post(f"{base_url}/v1", json={
        "tool": "send_message",
        "input": {
            "to": "human-oversight-queue",
            "subject": f"Approval required: {agent_id} wants to call {tool_name}",
            "body": json.dumps({
                "agent_id": agent_id,
                "tool": tool_name,
                "input": tool_input,
                "risk_level": "high",
                "article_14_checkpoint": True
            }, indent=2),
            "priority": "high",
            "requires_response": True
        }
    })
    return resp.json()

Compliance Reporting

def generate_compliance_report(fleet_prefix, reporting_period):
    """Generate EU AI Act compliance report for a fleet."""

    resp = session.post(f"{base_url}/v1", json={
        "tool": "get_compliance_report",
        "input": {
            "fleet_prefix": fleet_prefix,
            "period": reporting_period,
            "articles": ["9", "11", "12", "13", "14", "15"]
        }
    })
    report = resp.json()

    # Supplement with reputation data for Article 15 (accuracy/robustness)
    resp = session.post(f"{base_url}/v1", json={
        "tool": "search_agents_by_metrics",
        "input": {
            "prefix": fleet_prefix,
            "metrics": ["accuracy_rate", "error_rate", "uptime_rate"]
        }
    })
    accuracy_data = resp.json()

    # Supplement with SLA data for ongoing monitoring evidence
    resp = session.post(f"{base_url}/v1", json={
        "tool": "get_sla_violations",
        "input": {
            "fleet_prefix": fleet_prefix,
            "period": reporting_period
        }
    })
    sla_data = resp.json()

    return {
        "compliance_report": report,
        "accuracy_metrics": accuracy_data,
        "sla_violations": sla_data,
        "generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
        "next_review_date": "2026-05-01"
    }

Human-in-the-Loop Checkpoint Design

Not every action needs human approval. Over-gating kills throughput. Under-gating creates liability. The right design uses risk classification:

┌──────────────────────────────────────────────────────────┐
│              RISK-BASED APPROVAL MATRIX                   │
├──────────────┬──────────────┬─────────────┬──────────────┤
│  Risk Level  │  Action Type │  Approval   │  Example     │
├──────────────┼──────────────┼─────────────┼──────────────┤
│  LOW         │  Read-only   │  None       │  get_balance │
│              │  queries     │  (audit     │  get_agent   │
│              │              │  only)      │  _reputation │
├──────────────┼──────────────┼─────────────┼──────────────┤
│  MEDIUM      │  State       │  Post-hoc   │  record      │
│              │  changes     │  review     │  _transaction│
│              │  \x3C $10       │  (24h)      │  submit      │
│              │              │             │  _metrics    │
├──────────────┼──────────────┼─────────────┼──────────────┤
│  HIGH        │  Financial   │  Pre-       │  Transactions│
│              │  > $100      │  approval   │  > $100,     │
│              │  or PII      │  required   │  SLA changes │
├──────────────┼──────────────┼─────────────┼──────────────┤
│  CRITICAL    │  Irreversible│  Dual       │  Delete data,│
│              │  actions     │  approval   │  revoke      │
│              │              │             │  identity    │
└──────────────┴──────────────┴─────────────┴──────────────┘

Conformity Documentation Template

The EU AI Act requires a conformity assessment for high-risk AI systems. For fleet operators, this translates to a living document that proves continuous compliance. Here is the minimum structure:

CONFORMITY DOCUMENTATION — [Fleet Name]
Version: [X.Y]
Last Updated: [Date]
Next Review: [Date]

1. SYSTEM DESCRIPTION
   - Fleet architecture (reference Chapter 1 lifecycle diagram)
   - Agent types and their roles
   - Upstream model providers and versions
   - Data flows and processing locations

2. RISK ASSESSMENT (Art. 9)
   - Risk register per agent type
   - Mitigation measures per identified risk
   - Residual risk acceptance criteria
   - Review frequency and responsible owner

3. TECHNICAL DOCUMENTATION (Art. 11)
   - System architecture diagrams
   - Training/fine-tuning data summary (or "none" if using base models)
   - Validation methodology and results
   - Performance benchmarks (accuracy, latency, cost)

4. RECORD-KEEPING EVIDENCE (Art. 12)
   - Audit trail system description
   - Retention policy (minimum 6 months)
   - Integrity verification method (SHA-256 hashing)
   - Sample audit trail export

5. TRANSPARENCY MEASURES (Art. 13)
   - How agents identify as AI to counterparties
   - User-facing documentation
   - Notification mechanisms

6. HUMAN OVERSIGHT DESIGN (Art. 14)
   - Risk classification matrix (LOW/MEDIUM/HIGH/CRITICAL)
   - Approval workflow for each risk level
   - Override and kill-switch procedures
   - Human reviewer qualification requirements

7. ACCURACY AND ROBUSTNESS MONITORING (Art. 15)
   - Continuous monitoring metrics and thresholds
   - Degradation detection and response procedures
   - Scheduled accuracy audits (weekly/monthly)
   - Robustness testing methodology

The 90-Day Compliance Sprint

If you are starting from zero with the August 2, 2026 deadline, here is the minimum viable timeline:

Week Milestone Deliverable
1-2 Risk assessment Risk register per agent type
3-4 Audit trail implementation create_audit_trail integrated into all tool calls
5-6 Human oversight design Risk classification matrix + approval workflows
7-8 Transparency measures Agent self-identification in all messages
9-10 Monitoring baseline Accuracy and robustness dashboards live
11-12 Conformity documentation V1.0 of conformity document + legal review
13 Dry run Internal audit simulating regulatory inspection

Governance Checklist

  • Risk management system documented per Art. 9 (risk assessment per agent type)
  • Technical documentation up-to-date per Art. 11 (architecture, validation)
  • Audit trail records every agent action per Art. 12 (6+ month retention)
  • Agents identify as AI in all counterparty interactions per Art. 13
  • Human-in-the-loop checkpoints active for HIGH and CRITICAL actions per Art. 14
  • Accuracy and robustness monitoring continuous per Art. 15
  • Compliance report generation automated and scheduled monthly
  • Record hash integrity verifiable for any audit trail entry
  • Incident response procedure documented for compliance failures
  • Legal review of SLA templates completed (dispute resolution language)
  • Data retention policy aligned with GDPR and AI Act requirements
  • All audit trail data stored in EU region if processing EU data
  • Conformity documentation v1.0 completed and under version control
  • 90-day compliance sprint scheduled and staffed

Chapter 8: The Fleet Commander Dashboard

Architecture Overview

The Fleet Commander is the capstone: a single operational view that wires together every subsystem from the previous chapters — provisioning, observability, cost control, SLA enforcement, scaling, and governance — into one control plane.

┌──────────────────────────────────────────────────────────────┐
│                    FLEET COMMANDER                            │
│                                                              │
│  ┌─────────────┐ ┌──────────────┐ ┌────────────────────┐    │
│  │ Provisioner │ │  Observer    │ │   Cost Controller  │    │
│  │             │ │              │ │                    │    │
│  │ register    │ │ get_analytics│ │ estimate_cost      │    │
│  │ create_wall.│ │ submit_metr. │ │ get_balance        │    │
│  │ build_claim │ │ check_sla    │ │ get_volume_disc.   │    │
│  └──────┬──────┘ └──────┬───────┘ └─────────┬──────────┘    │
│         │               │                    │               │
│  ┌──────▼───────────────▼────────────────────▼──────────┐    │
│  │              FLEET STATE STORE                        │    │
│  │   agents[] | metrics{} | budgets{} | slas{} | audit[]│    │
│  └──────┬───────────────┬────────────────────┬──────────┘    │
│         │               │                    │               │
│  ┌──────▼──────┐ ┌──────▼───────┐ ┌─────────▼──────────┐    │
│  │ SLA Engine  │ │  Scaler      │ │   Compliance       │    │
│  │             │ │              │ │                    │    │
│  │ create_sla  │ │ search_serv. │ │ create_audit_trail │    │
│  │ check_compl.│ │ best_match   │ │ get_compliance_rep.│    │
│  │ get_violat. │ │ leaderboard  │ │ get_agent_reput.   │    │
│  │ create_disp.│ │              │ │ search_agents_by   │    │
│  │ send_message│ │              │ │ _metrics           │    │
│  └─────────────┘ └──────────────┘ └────────────────────┘    │
└──────────────────────────────────────────────────────────────┘

The FleetCommander Class

import requests
import time
import json

base_url = "https://api.greenhelix.net/v1"
api_key = "your-fleet-admin-key"

session = requests.Session()
session.headers["Authorization"] = f"Bearer {api_key}"


class FleetCommander:
    """Central control plane for an agent fleet."""

    def __init__(self, fleet_name, session, base_url):
        self.fleet_name = fleet_name
        self.session = session
        self.base_url = base_url
        self.agents = {}
        self.scaling_config = {
            "min_agents": 5,
            "max_agents": 100,
            "scale_up_threshold": 10,    # tasks per agent
            "scale_down_threshold": 1,   # tasks per agent
            "scale_up_increment": 5,
            "scale_down_increment": 2,
            "max_burn_rate": 100.0       # $/hr
        }
        self.budget = {
            "per_task_max": 5.00,
            "per_agent_hourly_max": 50.00,
            "fleet_daily_max": 2000.00
        }
        self.alert_thresholds = {
            "emergency_critical_pct": 20,
            "min_sla_pct": 95,
            "max_burn_rate_per_hour": 100
        }

    # ── Provisioning ─────────────────────────────────────────

    def provision_agent(self, agent_spec):
        """Provision a single agent with full lifecycle setup."""
        agent_id = agent_spec["agent_id"]

        # Register identity
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "register_agent",
            "input": {
                "agent_id": agent_id,
                "tier": agent_spec["tier"],
                "capabilities": agent_spec["capabilities"],
                "metadata": {"fleet": self.fleet_name}
            }
        })
        registration = resp.json()

        # Create wallet
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "create_wallet",
            "input": {
                "agent_id": agent_id,
                "initial_balance": str(agent_spec["initial_balance"])
            }
        })
        wallet = resp.json()

        # Build trust chain
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "build_claim_chain",
            "input": {
                "agent_id": agent_id,
                "claims": agent_spec.get("trust_claims", [])
            }
        })

        # Create SLA
        if agent_spec.get("sla_template"):
            resp = self.session.post(f"{self.base_url}/v1", json={
                "tool": "create_sla",
                "input": {
                    "agent_id": agent_id,
                    "terms": agent_spec["sla_terms"]
                }
            })

        # Register service in marketplace
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "register_service",
            "input": {
                "agent_id": agent_id,
                "capabilities": agent_spec["capabilities"],
                "pricing": agent_spec.get("pricing", {})
            }
        })

        # Audit the provisioning
        self.session.post(f"{self.base_url}/v1", json={
            "tool": "create_audit_trail",
            "input": {
                "agent_id": agent_id,
                "action": "provisioned",
                "details": {"spec": agent_spec},
                "metadata": {"eu_ai_act_article": "12"}
            }
        })

        self.agents[agent_id] = {
            "spec": agent_spec,
            "status": "active",
            "provisioned_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        }

        return {"agent_id": agent_id, "status": "provisioned"}

    # ── Observability ────────────────────────────────────────

    def get_fleet_health(self):
        """Aggregate health metrics across the entire fleet."""
        health = {
            "total": len(self.agents),
            "healthy": 0, "degraded": 0, "critical": 0,
            "total_burn_rate": 0.0,
            "sla_compliant": 0,
            "alerts": []
        }

        for agent_id in self.agents:
            # Get analytics
            resp = self.session.post(f"{self.base_url}/v1", json={
                "tool": "get_analytics",
                "input": {"agent_id": agent_id, "time_range": "1h"}
            })
            analytics = resp.json()

            # Get balance
            resp = self.session.post(f"{self.base_url}/v1", json={
                "tool": "get_balance",
                "input": {"agent_id": agent_id}
            })
            balance = resp.json()

            # Check SLA
            resp = self.session.post(f"{self.base_url}/v1", json={
                "tool": "check_sla_compliance",
                "input": {"agent_id": agent_id}
            })
            sla = resp.json()

            # Classify health
            error_rate = analytics.get("error_rate", 0)
            latency = analytics.get("latency_p99", 0)

            if error_rate > 0.10 or latency > 5000:
                health["critical"] += 1
                health["alerts"].append({
                    "agent_id": agent_id, "severity": "critical"
                })
            elif error_rate > 0.05 or latency > 3000:
                health["degraded"] += 1
            else:
                health["healthy"] += 1

            if sla.get("compliant"):
                health["sla_compliant"] += 1

            health["total_burn_rate"] += float(balance.get("burn_rate_per_hour", 0))

        health["sla_compliance_pct"] = (
            health["sla_compliant"] / health["total"] * 100
            if health["total"] > 0 else 0
        )
        return health

    # ── Cost Control ─────────────────────────────────────────

    def check_budget(self, agent_id, estimated_cost):
        """Enforce budget guardrails before a tool call."""
        if estimated_cost > self.budget["per_task_max"]:
            return {"allowed": False, "reason": "exceeds_per_task_limit"}

        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "get_balance",
            "input": {"agent_id": agent_id}
        })
        balance = float(resp.json().get("balance", "0"))

        if balance \x3C estimated_cost:
            return {"allowed": False, "reason": "insufficient_balance"}

        return {"allowed": True, "remaining": str(balance - estimated_cost)}

    # ── SLA Enforcement ──────────────────────────────────────

    def enforce_slas(self):
        """Run SLA compliance check and trigger escalation if needed."""
        violations = []

        for agent_id in self.agents:
            resp = self.session.post(f"{self.base_url}/v1", json={
                "tool": "check_sla_compliance",
                "input": {"agent_id": agent_id}
            })
            compliance = resp.json()

            if not compliance.get("compliant", True):
                resp = self.session.post(f"{self.base_url}/v1", json={
                    "tool": "get_sla_violations",
                    "input": {"agent_id": agent_id}
                })
                agent_violations = resp.json()
                violations.append({"agent_id": agent_id, "violations": agent_violations})

                # Escalate
                elapsed = agent_violations.get("duration_minutes", 0)
                if elapsed > 30:
                    self.session.post(f"{self.base_url}/v1", json={
                        "tool": "send_message",
                        "input": {
                            "to": "fleet-commander-oncall",
                            "subject": f"SLA breach: {agent_id} ({elapsed} min)",
                            "body": json.dumps(agent_violations)
                        }
                    })

        return violations

    # ── Scaling ──────────────────────────────────────────────

    def evaluate_scaling(self, pending_tasks):
        """Decide whether to scale the fleet."""
        active = sum(1 for a in self.agents.values() if a["status"] == "active")
        tasks_per_agent = pending_tasks / max(active, 1)

        if tasks_per_agent > self.scaling_config["scale_up_threshold"]:
            return {"action": "scale_up", "current": active,
                    "reason": f"{tasks_per_agent:.1f} tasks/agent"}

        if tasks_per_agent \x3C self.scaling_config["scale_down_threshold"]:
            return {"action": "scale_down", "current": active,
                    "reason": f"Low utilization: {tasks_per_agent:.1f} tasks/agent"}

        return {"action": "hold"}

    def find_best_agent_for_task(self, task_requirements):
        """Route a task to the best available agent."""
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "best_match",
            "input": {"requirements": task_requirements}
        })
        return resp.json()

    # ── Governance ───────────────────────────────────────────

    def generate_compliance_report(self, period="monthly"):
        """Generate compliance report for the fleet."""
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "get_compliance_report",
            "input": {
                "fleet_prefix": self.fleet_name,
                "period": period,
                "articles": ["9", "11", "12", "13", "14", "15"]
            }
        })
        report = resp.json()

        # Augment with leaderboard data for Art. 15 monitoring
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "get_leaderboard",
            "input": {"category": "accuracy", "fleet": self.fleet_name}
        })
        leaderboard = resp.json()

        # Augment with rate limit status
        resp = self.session.post(f"{self.base_url}/v1", json={
            "tool": "check_rate_limit",
            "input": {"fleet": self.fleet_name}
        })
        rate_limits = resp.json()

        return {
            "compliance": report,
            "accuracy_leaderboard": leaderboard,
            "rate_limit_status": rate_limits,
            "generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        }

    # ── Main Loop ────────────────────────────────────────────

    def run_control_loop(self, interval_seconds=60):
        """Main control loop — run continuously in production."""
        print(f"Fleet Commander started for '{self.fleet_name}'")
        print(f"Managing {len(self.agents)} agents")

        while True:
            # 1. Collect fleet health
            health = self.get_fleet_health()
            print(f"[{time.strftime('%H:%M:%S')}] "
                  f"Healthy: {health['healthy']} | "
                  f"Degraded: {health['degraded']} | "
                  f"Critical: {health['critical']} | "
                  f"Burn: ${health['total_burn_rate']:.2f}/hr | "
                  f"SLA: {health['sla_compliance_pct']:.1f}%")

            # 2. Enforce SLAs
            violations = self.enforce_slas()
            if violations:
                print(f"  SLA violations: {len(violations)} agents")

            # 3. Evaluate scaling
            scaling = self.evaluate_scaling(pending_tasks=0)  # Wire to task queue
            if scaling["action"] != "hold":
                print(f"  Scaling: {scaling['action']} — {scaling.get('reason', '')}")

            # 4. Check fleet budget
            if health["total_burn_rate"] > self.budget["fleet_daily_max"] / 24:
                print(f"  BUDGET WARNING: burn rate exceeds daily allocation")

            # 5. Generate alerts
            for alert in health.get("alerts", []):
                print(f"  ALERT [{alert['severity']}]: {alert['agent_id']}")

            time.sleep(interval_seconds)

Putting It All Together

# Initialize the Fleet Commander
commander = FleetCommander(
    fleet_name="customer-support",
    session=session,
    base_url=base_url
)

# Provision the fleet from manifest
fleet_specs = [
    {
        "agent_id": f"cs-triage-{i:04d}",
        "tier": "pro",
        "initial_balance": 50.00,
        "capabilities": ["text-classification", "sentiment-analysis"],
        "trust_claims": ["response-time-p99-under-2s"],
        "sla_template": "standard-support",
        "sla_terms": {
            "response_time_p99_ms": 2000,
            "uptime_pct": 99.5,
            "accuracy_pct": 95.0
        }
    }
    for i in range(20)
]

for spec in fleet_specs:
    result = commander.provision_agent(spec)
    print(f"Provisioned: {result['agent_id']}")

# Start the control loop
commander.run_control_loop(interval_seconds=30)

Fleet Commander Deployment Checklist

  • FleetCommander deployed as a persistent service (not a cron job)
  • Control loop interval tuned to fleet size (30s for \x3C50 agents, 60s for 50-200)
  • All GreenHelix API calls have retry + circuit breaker wrappers
  • Fleet state persisted to database (not just in-memory)
  • Dashboard endpoint exposed for web UI or Grafana
  • Compliance report generated on first of each month
  • Alert channels configured (Slack, PagerDuty, email)
  • Scaling decisions logged with full context for post-mortems
  • Human override exists to pause the control loop
  • DR plan: what happens if the Fleet Commander itself goes down?

What You Get

By implementing the eight chapters in this guide, you will have:

A provisioning pipeline that registers agents with unique identities, funded wallets, trust claims, and SLA contracts — idempotently and at scale. (Chapter 2)

An observability stack that tracks per-agent latency, error rate, tool call volume, SLA compliance, and financial position, aggregated into fleet-level health metrics with automated alerting. (Chapter 3)

A FinOps framework with token-level cost tracking, model routing that sends cheap tasks to cheap models, per-task and per-agent budget guardrails, and fleet economics modeling. (Chapter 4)

An SLA enforcement engine with programmatic SLA creation, continuous compliance monitoring, a four-tier automated escalation pipeline, and integrated dispute resolution. (Chapter 5)

Scaling patterns for horizontal (specialist agents), vertical (capability upgrades), and dynamic (marketplace-driven) scaling, with auto-scaling rules and a decision matrix for swarm vs. orchestrator topologies. (Chapter 6)

An EU AI Act compliance layer with immutable audit trails, risk-based human-in-the-loop checkpoints, automated compliance reporting, and record integrity verification — ready for the August 2, 2026 enforcement deadline. (Chapter 7)

A Fleet Commander dashboard that wires all of the above into a single control loop: provision, observe, control costs, enforce SLAs, scale, and govern — all from one class, one API, one operational view. (Chapter 8)

The GreenHelix tools used across all chapters:

Tool Chapter Purpose
register_agent 2, 8 Identity provisioning
create_wallet 2, 8 Financial provisioning
build_claim_chain 2, 8 Trust bootstrapping
submit_metrics 2, 3 Performance reporting
get_agent_reputation 3, 7 Trust monitoring
get_balance 3, 4, 8 Financial monitoring
get_analytics 3, 8 Performance analytics
check_sla_compliance 3, 5, 6, 8 SLA monitoring
estimate_cost 4, 6 Cost prediction
get_volume_discount 4 Volume pricing
convert_currency 4 Multi-currency support
record_transaction 4 Transaction tracking
create_sla 5, 8 SLA creation
get_sla_violations 5, 8 Violation detection
send_message 5, 8 Escalation messaging
create_dispute 5 Dispute initiation
resolve_dispute 5 Dispute resolution
search_services 6 Agent discovery
best_match 6, 8 Optimal routing
register_service 8 Marketplace listing
create_audit_trail 7, 8 Compliance recording
get_compliance_report 7, 8 Compliance reporting
search_agents_by_metrics 7 Fleet-wide metrics query
get_leaderboard 8 Fleet ranking
check_rate_limit 8 Capacity monitoring

Total: 25 of 128 GreenHelix tools, covering 6 of 15 services (identity, billing, payments, marketplace, messaging, trust). The remaining tools and services — paywall, reputation, ledger, and others — extend naturally as your fleet operations mature.

安全使用建议
This appears to be a purely educational AgentOps guide and does not request credentials or install code. Before using any of the included Python examples in production: (1) review the sample code to ensure it does not hard-code credentials or point to production endpoints, (2) validate the sandbox URL and never paste real secrets into online sandboxes, and (3) if you plan to adapt examples that interact with external gateways (e.g., commerce/tools), make sure you provision dedicated service accounts with least privilege and verify billing/finops controls. If you want deeper assurance, request the full, untruncated SKILL.md to inspect the concrete code examples for any hidden I/O or network calls.
功能分析
Type: OpenClaw Skill Name: greenhelix-agent-fleet-ops Version: 1.3.1 The bundle is an extensive educational guide and code library for 'AgentOps' (AI agent fleet management) using the GreenHelix platform. It contains Python examples for provisioning, monitoring, cost-tracking, and governing agent fleets via the GreenHelix API (api.greenhelix.net). The content is themed around a future scenario (2026) involving EU AI Act compliance and contains no evidence of malicious intent, unauthorized data exfiltration, or prompt injection attacks.
能力标签
cryptorequires-walletcan-make-purchasesrequires-sensitive-credentials
能力评估
Purpose & Capability
The name/description promise an AgentOps playbook with Python examples and governance guidance; the SKILL.md is a large, static guide that matches this purpose. There are no unexpected required binaries, environment variables, or config paths declared.
Instruction Scope
The SKILL.md is a documentation/guide artifact (no execution steps, no shell commands, and no instructions to read local files or exfiltrate data). It references an external sandbox (https://sandbox.greenhelix.net) for examples, which is reasonable for a tutorial.
Install Mechanism
No install spec or code files are present; the skill is instruction-only so nothing is written to disk or downloaded during installation.
Credentials
The skill declares no required environment variables, credentials, or config paths. The guide mentions external services but does not request secrets or access to unrelated systems.
Persistence & Privilege
always is false and autonomous invocation is allowed (the platform default). The skill does not request persistent system presence or modify other skills; this is proportionate for an instruction-only guide.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install greenhelix-agent-fleet-ops
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /greenhelix-agent-fleet-ops 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.3.1
## greenhelix-agent-fleet-ops 1.3.1 Changelog - Updated skill version from 1.3.0 to 1.3.1 in SKILL.md. - No functional, feature, or guide content changes—version bump only.
v1.3.0
- Added instructions and details for using the GreenHelix sandbox with 500 free credits and no API key required, making the guide more accessible for new users. - Clarified in multiple places that all code examples work out-of-the-box with the GreenHelix sandbox environment. - Improved onboarding notices for beginners in the guide introduction and "Getting Started" sections.
v1.2.0
- Bumped version to 1.2.0. - Documentation and metadata updated in SKILL.md; no content changes to code or functionality.
v1.1.0
- Added clear notice that this guide is educational only and does not execute code or require credentials. - Updated code examples section to clarify usage is illustrative and implementation must be user-provided. - Metadata fields `executable`, `credentials`, and `install` set to specify non-executable content and no installation requirements. - Description adjusted to highlight detailed Python code examples for learning. - Minor wording improvements and clarifications for user expectations.
v1.0.0
Initial release of greenhelix-agent-fleet-ops. - Provides a comprehensive guide for managing AI agent fleets in production environments. - Covers key AgentOps topics: provisioning, observability, cost-aware routing, SLA enforcement, scaling, governance under the EU AI Act, and the unified Fleet Commander dashboard. - Includes production-ready Python code samples, architecture diagrams, decision trees, and operational checklists. - Tailored for environments using GreenHelix and compatible with OpenClaw. - Focuses on real-world concerns like cost overruns, audit trails, and regulatory compliance.
元数据
Slug greenhelix-agent-fleet-ops
版本 1.3.1
许可证 MIT-0
累计安装 0
当前安装数 0
历史版本数 5
常见问题

AgentOps: Managing AI Agent Fleets in Production 是什么?

AgentOps: Managing AI Agent Fleets in Production. The SRE playbook for AI agent fleets: provisioning, observability, cost-aware routing, SLA enforcement, sca... 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 187 次。

如何安装 AgentOps: Managing AI Agent Fleets in Production?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install greenhelix-agent-fleet-ops」即可一键安装,无需额外配置。

AgentOps: Managing AI Agent Fleets in Production 是免费的吗?

是的,AgentOps: Managing AI Agent Fleets in Production 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。

AgentOps: Managing AI Agent Fleets in Production 支持哪些平台?

AgentOps: Managing AI Agent Fleets in Production 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 AgentOps: Managing AI Agent Fleets in Production?

由 mirni(@mirni)开发并维护,当前版本 v1.3.1。

💬 留言讨论