← 返回 Skills 市场
alirezarezvani

agent-workflow-designer

作者 Alireza Rezvani · GitHub ↗ · v1.0.0 · MIT-0
cross-platform ⚠ suspicious
355
总下载
0
收藏
3
当前安装
2
版本数
在 OpenClaw 中安装
/install agent-workflow-designer
功能描述
Agent Workflow Designer
使用说明 (SKILL.md)

Agent Workflow Designer

Tier: POWERFUL
Category: Engineering
Domain: Multi-Agent Systems / AI Orchestration


Overview

Design production-grade multi-agent orchestration systems. Covers five core patterns (sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven, consensus), platform-specific implementations, handoff protocols, state management, error recovery, context window budgeting, and cost optimization.


Core Capabilities

  • Pattern selection guide for any orchestration requirement
  • Handoff protocol templates (structured context passing)
  • State management patterns for multi-agent workflows
  • Error recovery and retry strategies
  • Context window budget management
  • Cost optimization strategies per platform
  • Platform-specific configs: Claude Code Agent Teams, OpenClaw, CrewAI, AutoGen

When to Use

  • Building a multi-step AI pipeline that exceeds one agent's context capacity
  • Parallelizing research, generation, or analysis tasks for speed
  • Creating specialist agents with defined roles and handoff contracts
  • Designing fault-tolerant AI workflows for production

Pattern Selection Guide

Is the task sequential (each step needs previous output)?
  YES → Sequential Pipeline
  NO  → Can tasks run in parallel?
          YES → Parallel Fan-out/Fan-in
          NO  → Is there a hierarchy of decisions?
                  YES → Hierarchical Delegation
                  NO  → Is it event-triggered?
                          YES → Event-Driven
                          NO  → Need consensus/validation?
                                  YES → Consensus Pattern

Pattern 1: Sequential Pipeline

Use when: Each step depends on the previous output. Research → Draft → Review → Polish.

# sequential_pipeline.py
from dataclasses import dataclass
from typing import Callable, Any
import anthropic

@dataclass
class PipelineStage:
    name: "str"
    system_prompt: str
    input_key: str      # what to take from state
    output_key: str     # what to write to state
    model: str = "claude-3-5-sonnet-20241022"
    max_tokens: int = 2048

class SequentialPipeline:
    def __init__(self, stages: list[PipelineStage]):
        self.stages = stages
        self.client = anthropic.Anthropic()
    
    def run(self, initial_input: str) -> dict:
        state = {"input": initial_input}
        
        for stage in self.stages:
            print(f"[{stage.name}] Processing...")
            
            stage_input = state.get(stage.input_key, "")
            
            response = self.client.messages.create(
                model=stage.model,
                max_tokens=stage.max_tokens,
                system=stage.system_prompt,
                messages=[{"role": "user", "content": stage_input}],
            )
            
            state[stage.output_key] = response.content[0].text
            state[f"{stage.name}_tokens"] = response.usage.input_tokens + response.usage.output_tokens
            
            print(f"[{stage.name}] Done. Tokens: {state[f'{stage.name}_tokens']}")
        
        return state

# Example: Blog post pipeline
pipeline = SequentialPipeline([
    PipelineStage(
        name="researcher",
        system_prompt="You are a research specialist. Given a topic, produce a structured research brief with: key facts, statistics, expert perspectives, and controversy points.",
        input_key="input",
        output_key="research",
    ),
    PipelineStage(
        name="writer",
        system_prompt="You are a senior content writer. Using the research provided, write a compelling 800-word blog post with a clear hook, 3 main sections, and a strong CTA.",
        input_key="research",
        output_key="draft",
    ),
    PipelineStage(
        name="editor",
        system_prompt="You are a copy editor. Review the draft for: clarity, flow, grammar, and SEO. Return the improved version only, no commentary.",
        input_key="draft",
        output_key="final",
    ),
])

Pattern 2: Parallel Fan-out / Fan-in

Use when: Independent tasks that can run concurrently. Research 5 competitors simultaneously.

# parallel_fanout.py
import asyncio
import anthropic
from typing import Any

async def run_agent(client, task_name: "str-system-str-user-str-model-str"claude-3-5-sonnet-20241022") -> dict:
    """Single async agent call"""
    loop = asyncio.get_event_loop()
    
    def _call():
        return client.messages.create(
            model=model,
            max_tokens=2048,
            system=system,
            messages=[{"role": "user", "content": user}],
        )
    
    response = await loop.run_in_executor(None, _call)
    return {
        "task": task_name,
        "output": response.content[0].text,
        "tokens": response.usage.input_tokens + response.usage.output_tokens,
    }

async def parallel_research(competitors: list[str], research_type: str) -> dict:
    """Fan-out: research all competitors in parallel. Fan-in: synthesize results."""
    client = anthropic.Anthropic()
    
    # FAN-OUT: spawn parallel agent calls
    tasks = [
        run_agent(
            client,
            task_name=competitor,
            system=f"You are a competitive intelligence analyst. Research {competitor} and provide: pricing, key features, target market, and known weaknesses.",
            user=f"Analyze {competitor} for comparison with our product in the {research_type} market.",
        )
        for competitor in competitors
    ]
    
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Handle failures gracefully
    successful = [r for r in results if not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]
    
    if failed:
        print(f"Warning: {len(failed)} research tasks failed: {failed}")
    
    # FAN-IN: synthesize
    combined_research = "\
\
".join([
        f"## {r['task']}\
{r['output']}" for r in successful
    ])
    
    synthesis = await run_agent(
        client,
        task_name="synthesizer",
        system="You are a strategic analyst. Synthesize competitor research into a concise comparison matrix and strategic recommendations.",
        user=f"Synthesize these competitor analyses:\
\
{combined_research}",
        model="claude-3-5-sonnet-20241022",
    )
    
    return {
        "individual_analyses": successful,
        "synthesis": synthesis["output"],
        "total_tokens": sum(r["tokens"] for r in successful) + synthesis["tokens"],
    }

Pattern 3: Hierarchical Delegation

Use when: Complex tasks with subtask discovery. Orchestrator breaks down work, delegates to specialists.

# hierarchical_delegation.py
import json
import anthropic

ORCHESTRATOR_SYSTEM = """You are an orchestration agent. Your job is to:
1. Analyze the user's request
2. Break it into subtasks
3. Assign each to the appropriate specialist agent
4. Collect results and synthesize

Available specialists:
- researcher: finds facts, data, and information
- writer: creates content and documents  
- coder: writes and reviews code
- analyst: analyzes data and produces insights

Respond with a JSON plan:
{
  "subtasks": [
    {"id": "1", "agent": "researcher", "task": "...", "depends_on": []},
    {"id": "2", "agent": "writer", "task": "...", "depends_on": ["1"]}
  ]
}"""

SPECIALIST_SYSTEMS = {
    "researcher": "You are a research specialist. Find accurate, relevant information and cite sources when possible.",
    "writer": "You are a professional writer. Create clear, engaging content in the requested format.",
    "coder": "You are a senior software engineer. Write clean, well-commented code with error handling.",
    "analyst": "You are a data analyst. Provide structured analysis with evidence-backed conclusions.",
}

class HierarchicalOrchestrator:
    def __init__(self):
        self.client = anthropic.Anthropic()
    
    def run(self, user_request: str) -> str:
        # 1. Orchestrator creates plan
        plan_response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            system=ORCHESTRATOR_SYSTEM,
            messages=[{"role": "user", "content": user_request}],
        )
        
        plan = json.loads(plan_response.content[0].text)
        results = {}
        
        # 2. Execute subtasks respecting dependencies
        for subtask in self._topological_sort(plan["subtasks"]):
            context = self._build_context(subtask, results)
            specialist = SPECIALIST_SYSTEMS[subtask["agent"]]
            
            result = self.client.messages.create(
                model="claude-3-5-sonnet-20241022",
                max_tokens=2048,
                system=specialist,
                messages=[{"role": "user", "content": f"{context}\
\
Task: {subtask['task']}"}],
            )
            results[subtask["id"]] = result.content[0].text
        
        # 3. Final synthesis
        all_results = "\
\
".join([f"### {k}\
{v}" for k, v in results.items()])
        synthesis = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=2048,
            system="Synthesize the specialist outputs into a coherent final response.",
            messages=[{"role": "user", "content": f"Original request: {user_request}\
\
Specialist outputs:\
{all_results}"}],
        )
        return synthesis.content[0].text
    
    def _build_context(self, subtask: dict, results: dict) -> str:
        if not subtask.get("depends_on"):
            return ""
        deps = [f"Output from task {dep}:\
{results[dep]}" for dep in subtask["depends_on"] if dep in results]
        return "Previous results:\
" + "\
\
".join(deps) if deps else ""
    
    def _topological_sort(self, subtasks: list) -> list:
        # Simple ordered execution respecting depends_on
        ordered, remaining = [], list(subtasks)
        completed = set()
        while remaining:
            for task in remaining:
                if all(dep in completed for dep in task.get("depends_on", [])):
                    ordered.append(task)
                    completed.add(task["id"])
                    remaining.remove(task)
                    break
        return ordered

Handoff Protocol Template

# Standard handoff context format — use between all agents
@dataclass
class AgentHandoff:
    """Structured context passed between agents in a workflow."""
    task_id: str
    workflow_id: str
    step_number: int
    total_steps: int
    
    # What was done
    previous_agent: str
    previous_output: str
    artifacts: dict  # {"filename": "content"} for any files produced
    
    # What to do next
    current_agent: str
    current_task: str
    constraints: list[str]  # hard rules for this step
    
    # Metadata
    context_budget_remaining: int  # tokens left for this agent
    cost_so_far_usd: float
    
    def to_prompt(self) -> str:
        return f"""
# Agent Handoff — Step {self.step_number}/{self.total_steps}

## Your Task
{self.current_task}

## Constraints
{chr(10).join(f'- {c}' for c in self.constraints)}

## Context from Previous Step ({self.previous_agent})
{self.previous_output[:2000]}{"... [truncated]" if len(self.previous_output) > 2000 else ""}

## Context Budget
You have approximately {self.context_budget_remaining} tokens remaining. Be concise.
"""

Error Recovery Patterns

import time
from functools import wraps

def with_retry(max_attempts=3, backoff_seconds=2, fallback_model=None):
    """Decorator for agent calls with exponential backoff and model fallback."""
    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_attempts):
                try:
                    return fn(*args, **kwargs)
                except Exception as e:
                    last_error = e
                    if attempt \x3C max_attempts - 1:
                        wait = backoff_seconds * (2 ** attempt)
                        print(f"Attempt {attempt+1} failed: {e}. Retrying in {wait}s...")
                        time.sleep(wait)
                        
                        # Fall back to cheaper/faster model on rate limit
                        if fallback_model and "rate_limit" in str(e).lower():
                            kwargs["model"] = fallback_model
            raise last_error
        return wrapper
    return decorator

@with_retry(max_attempts=3, fallback_model="claude-3-haiku-20240307")
def call_agent(model, system, user):
    ...

Context Window Budgeting

# Budget context across a multi-step pipeline
# Rule: never let any step consume more than 60% of remaining budget

CONTEXT_LIMITS = {
    "claude-3-5-sonnet-20241022": 200_000,
    "gpt-4o": 128_000,
}

class ContextBudget:
    def __init__(self, model: str, reserve_pct: float = 0.2):
        total = CONTEXT_LIMITS.get(model, 128_000)
        self.total = total
        self.reserve = int(total * reserve_pct)  # keep 20% as buffer
        self.used = 0
    
    @property
    def remaining(self):
        return self.total - self.reserve - self.used
    
    def allocate(self, step_name: "str-requested-int-int"
        allocated = min(requested, int(self.remaining * 0.6))  # max 60% of remaining
        print(f"[Budget] {step_name}: allocated {allocated:,} tokens (remaining: {self.remaining:,})")
        return allocated
    
    def consume(self, tokens_used: int):
        self.used += tokens_used

def truncate_to_budget(text: str, token_budget: int, chars_per_token: float = 4.0) -> str:
    """Rough truncation — use tiktoken for precision."""
    char_budget = int(token_budget * chars_per_token)
    if len(text) \x3C= char_budget:
        return text
    return text[:char_budget] + "\
\
[... truncated to fit context budget ...]"

Cost Optimization Strategies

Strategy Savings Tradeoff
Use Haiku for routing/classification 85-90% Slightly less nuanced judgment
Cache repeated system prompts 50-90% Requires prompt caching setup
Truncate intermediate outputs 20-40% May lose detail in handoffs
Batch similar tasks 50% Latency increases
Use Sonnet for most, Opus for final step only 60-70% Final quality may improve
Short-circuit on confidence threshold 30-50% Need confidence scoring

Common Pitfalls

  • Circular dependencies — agents calling each other in loops; enforce DAG structure at design time
  • Context bleed — passing entire previous output to every step; summarize or extract only what's needed
  • No timeout — a stuck agent blocks the whole pipeline; always set max_tokens and wall-clock timeouts
  • Silent failures — agent returns plausible but wrong output; add validation steps for critical paths
  • Ignoring cost — 10 parallel Opus calls is $0.50 per workflow; model selection is a cost decision
  • Over-orchestration — if a single prompt can do it, it should; only add agents when genuinely needed
安全使用建议
This skill’s documentation contains real-looking code that calls external LLM APIs (Anthropic/Claude) and refers to other orchestration platforms, but it does not declare any required API keys or installation steps and several code samples are truncated or malformed. Before installing or enabling: 1) ask the publisher to list required environment variables (e.g., ANTHROPIC_API_KEY) and to explain where data is sent; 2) request working, complete example code and explicit guidance about credential/scoping best practices; 3) if you must run it, use isolated/test environment and only provide least-privilege, scoped API keys (not broad account keys); 4) prefer skills that declare their dependencies and do not silently call external services. Because of the mismatches and incomplete examples, proceed cautiously or treat this as a draft until clarified.
功能分析
Type: OpenClaw Skill Name: agent-workflow-designer Version: 1.0.0 The skill bundle is an educational resource providing templates and Python code for designing multi-agent orchestration patterns such as sequential pipelines and hierarchical delegation. It includes utility functions for context budgeting and error recovery using the Anthropic API. No indicators of data exfiltration, malicious execution, or harmful prompt injection were found; minor syntax artifacts in the code snippets (e.g., in SKILL.md) appear to be malformed template placeholders rather than intentional obfuscation.
能力评估
Purpose & Capability
The SKILL.md content matches the stated purpose (multi-agent workflow design) and includes concrete patterns and platform-specific guidance. However, the examples explicitly instantiate an Anthropic client and reference Claude models and other platforms (OpenClaw, CrewAI, AutoGen) while the skill metadata declares no required environment variables, credentials, or installs. This mismatch (external API usage without declared credentials or prerequisites) is incoherent.
Instruction Scope
The runtime instructions and code samples direct the agent to make network/API calls (e.g., anthropic.Anthropic client usage, model calls, token accounting) and to synthesize results. The SKILL.md does not instruct reading unrelated local files, but it omits how credentials are provided and contains truncated/malformed code fragments (syntax errors and a truncated section) that make actual behavior unclear. The agent would be expected to send user data to third-party APIs but the guide doesn't declare or constrain that.
Install Mechanism
There is no install spec and no files beyond SKILL.md, so nothing will be written to disk or installed automatically by the registry. Instruction-only is low risk in terms of installation surprises.
Credentials
The examples require access to Anthropic/Claude APIs (implying an API key or similar credential) and potentially other platform-specific credentials, but the skill's metadata lists no required environment variables or primary credential. Requiring zero env vars while showing code that calls external services is disproportionate and ambiguous — it fails to state what secrets would be needed and how they should be scoped/managed.
Persistence & Privilege
The skill is not always-enabled and does not request elevated platform privileges. Autonomous invocation is allowed by default (noted), which is normal; there is no indication the skill modifies other skills or global agent settings.
如何使用
  1. 确保已安装 OpenClaw(本地或 Docker 部署)
  2. 在对话框中输入安装命令:/install agent-workflow-designer
  3. 安装完成后,直接呼叫该 Skill 的名称或使用 /agent-workflow-designer 触发
  4. 根据 Skill 的参数说明提供必要输入,即可获得结构化输出
版本历史
v1.0.0
Initial publish
v2.1.1
v2.1.1: optimization, reference splits
元数据
Slug agent-workflow-designer
版本 1.0.0
许可证 MIT-0
累计安装 3
当前安装数 3
历史版本数 2
常见问题

agent-workflow-designer 是什么?

Agent Workflow Designer. 它是一个面向 Claude Code / OpenClaw 的 AI Agent Skill 插件,目前累计下载 355 次。

如何安装 agent-workflow-designer?

在 OpenClaw 或 Claude Code 对话框中运行命令「/install agent-workflow-designer」即可一键安装,无需额外配置。

agent-workflow-designer 是免费的吗?

是的,agent-workflow-designer 完全免费,采用 MIT-0 许可证,可自由下载、安装和使用。

agent-workflow-designer 支持哪些平台?

agent-workflow-designer 跨平台运行,可在任意部署了 OpenClaw / Claude Code 的环境中使用(cross-platform)。

谁开发了 agent-workflow-designer?

由 Alireza Rezvani(@alirezarezvani)开发并维护,当前版本 v1.0.0。

💬 留言讨论