Skill Composition Patterns: Pipeline and DAG Orchestration
Chapter 34: Skill Composition Patterns: Pipeline and DAG Orchestration
A single Skill is like one Lego brick—focused but limited in scope. Real power emerges when multiple Skills are combined into workflows capable of accomplishing complex tasks. This chapter explores Hermes Agent's Skill composition patterns, from simple sequential pipelines to sophisticated DAG (Directed Acyclic Graph) orchestration, with a complete "research + write + publish" case study demonstrating production-grade workflow construction.
34.1 Composition Pattern Overview
Hermes supports four Skill composition patterns, covering every level of workflow complexity:
┌────────────────────────────────────────────────────────┐
│ Skill Composition Patterns │
├────────────────┬──────────────┬───────────┬────────────┤
│ Sequential │ Conditional │ Parallel │ DAG │
│ Pipeline │ Branch │ Execution│ Orchestr. │
│ A → B → C │ if/else │ A ∥ B→C │ Graph │
│ Simple, serial│ Dynamic │ Speed │ Complex │
└────────────────┴──────────────┴───────────┴────────────┘
Pattern Selection Guide
| Scenario | Recommended Pattern | Reason |
|---|---|---|
| Data processing pipeline | Sequential Pipeline | Strict step-by-step dependency |
| Content-based routing | Conditional Branch | Dynamic path selection |
| Multi-source data fetching | Parallel Execution | No inter-step dependency |
| Research + write + publish | DAG Orchestration | Mixed serial/parallel |
34.2 Sequential Pipeline Pattern (A→B→C)
The sequential pipeline is the most fundamental composition pattern: each Skill's output becomes the next Skill's input, forming a data processing chain.
YAML Configuration
# pipeline_config.yaml
name: "data-processing-pipeline"
pipeline:
type: sequential
steps:
- id: fetch
skill: web-fetcher-skill
version: "^2.0.0"
config:
timeout: 30
retry: 3
input_mapping:
url: "$.pipeline_input.url"
- id: clean
skill: html-cleaner-skill
version: "^1.5.0"
input_mapping:
raw_html: "$.steps.fetch.output.html"
- id: store
skill: database-writer-skill
version: "^3.0.0"
input_mapping:
content: "$.steps.clean.output.text"
on_failure:
strategy: "stop" # stop | continue | retry
Python Implementation: Pipeline Engine
# hermes/orchestration/pipeline.py
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import asyncio, time, logging
logger = logging.getLogger(__name__)
@dataclass
class StepResult:
step_id: str
success: bool
output: Any
error: Optional[Exception] = None
duration_ms: float = 0.0
retry_count: int = 0
@dataclass
class PipelineContext:
pipeline_input: Any
steps: Dict[str, StepResult] = field(default_factory=dict)
def get_step_output(self, step_id: str) -> Any:
result = self.steps.get(step_id)
if result is None or not result.success:
raise RuntimeError(f"Step '{step_id}' not completed successfully")
return result.output
class SequentialPipeline:
def __init__(self, steps: List[Dict], on_failure: str = "stop", max_retries: int = 3):
self.steps = steps
self.on_failure = on_failure
self.max_retries = max_retries
async def execute(self, pipeline_input: Any) -> PipelineContext:
ctx = PipelineContext(pipeline_input=pipeline_input)
for step_config in self.steps:
step_id = step_config["id"]
result = await self._execute_with_retry(step_config, ctx)
ctx.steps[step_id] = result
if not result.success:
if self.on_failure == "stop":
raise PipelineExecutionError(
f"Pipeline aborted at step '{step_id}': {result.error}"
)
# "continue" mode: log and proceed
return ctx
async def _execute_with_retry(self, step_config: Dict, ctx: PipelineContext) -> StepResult:
step_id = step_config["id"]
max_retries = step_config.get("retry", self.max_retries)
for attempt in range(max_retries + 1):
start = time.time()
try:
step_input = self._resolve_inputs(step_config.get("input_mapping", {}), ctx)
skill = await self._load_skill(step_config["skill"], step_config.get("version", "*"))
output = await skill.execute(step_input, config=step_config.get("config", {}))
return StepResult(
step_id=step_id, success=True, output=output,
duration_ms=(time.time() - start) * 1000, retry_count=attempt
)
except Exception as e:
if attempt < max_retries:
await asyncio.sleep(2 ** attempt)
else:
return StepResult(
step_id=step_id, success=False, output=None, error=e,
duration_ms=(time.time() - start) * 1000
)
34.3 Conditional Branch Pattern
Conditional branching enables dynamic routing based on runtime data—the key to building intelligent workflows.
YAML Configuration
pipeline:
type: conditional
steps:
- id: classify
skill: content-classifier-skill
- id: route
type: switch
condition: "$.steps.classify.output.category"
cases:
"news":
- id: process_news
skill: news-processor-skill
"technical":
- id: process_tech
skill: tech-analyzer-skill
"default":
- id: process_generic
skill: generic-processor-skill
- id: format
skill: output-formatter-skill
depends_on: ["route"]
class ConditionalStep:
def __init__(self, condition, branches, default_branch="default"):
self.condition = condition
self.branches = branches
self.default_branch = default_branch
async def execute(self, ctx):
branch_key = self.condition(ctx)
branch_steps = self.branches.get(
branch_key, self.branches.get(self.default_branch, [])
)
if not branch_steps:
raise ValueError(f"No branch found for key: {branch_key}")
branch_pipeline = SequentialPipeline(branch_steps)
return await branch_pipeline.execute(ctx.pipeline_input)
34.4 Parallel Execution Pattern
When multiple steps are independent of each other, parallel execution dramatically reduces total time.
# hermes/orchestration/parallel.py
import asyncio
from typing import Any, Dict, List
class ParallelExecutor:
def __init__(self, max_concurrency: int = 10, timeout: float = 60.0):
self.max_concurrency = max_concurrency
self.timeout = timeout
self._semaphore = asyncio.Semaphore(max_concurrency)
async def execute_all(
self, steps: List[Dict], ctx, fail_fast: bool = True
) -> Dict[str, StepResult]:
tasks = {
step["id"]: asyncio.create_task(
self._execute_with_semaphore(step, ctx)
)
for step in steps
}
try:
results_list = await asyncio.wait_for(
asyncio.gather(*tasks.values(), return_exceptions=True),
timeout=self.timeout
)
except asyncio.TimeoutError:
for t in tasks.values():
t.cancel()
raise TimeoutError(f"Parallel execution timed out (>{self.timeout}s)")
results = {}
for step_id, result in zip(tasks.keys(), results_list):
if isinstance(result, Exception):
results[step_id] = StepResult(
step_id=step_id, success=False, output=None, error=result
)
if fail_fast:
raise result
else:
results[step_id] = result
return results
async def _execute_with_semaphore(self, step, ctx):
async with self._semaphore:
return await self._execute_step(step, ctx)
34.5 DAG Orchestration
DAG (Directed Acyclic Graph) is the most powerful orchestration pattern—it supports arbitrarily complex dependencies while automatically achieving optimal parallelization.
DAG Scheduler Logic
1. Find all nodes with zero in-degree (no dependencies) → launch in parallel
2. When a node completes, check if any successor now has all deps satisfied
3. If yes, launch that successor immediately
4. Repeat until all nodes are complete
DAG Engine Implementation
# hermes/orchestration/dag.py
import asyncio
from typing import Any, Dict, List, Set
from collections import defaultdict
class DAGOrchestrator:
def __init__(self, dag_config: Dict):
self.steps = {s["id"]: s for s in dag_config["steps"]}
self.dependencies = {
sid: set(s.get("depends_on", []))
for sid, s in self.steps.items()
}
self.reverse_deps = self._build_reverse_deps()
def _build_reverse_deps(self) -> Dict[str, Set[str]]:
reverse = defaultdict(set)
for step_id, deps in self.dependencies.items():
for dep in deps:
reverse[dep].add(step_id)
return dict(reverse)
async def execute(self, pipeline_input: Any) -> PipelineContext:
ctx = PipelineContext(pipeline_input=pipeline_input)
completed: Set[str] = set()
failed: Set[str] = set()
in_progress: Dict[str, asyncio.Task] = {}
ready = {sid for sid, deps in self.dependencies.items() if len(deps) == 0}
while ready or in_progress:
for step_id in ready:
in_progress[step_id] = asyncio.create_task(
self._execute_step(step_id, ctx)
)
ready.clear()
if not in_progress:
break
done_tasks, _ = await asyncio.wait(
in_progress.values(), return_when=asyncio.FIRST_COMPLETED
)
for task in done_tasks:
step_id = next(sid for sid, t in in_progress.items() if t == task)
del in_progress[step_id]
try:
ctx.steps[step_id] = task.result()
completed.add(step_id)
for successor in self.reverse_deps.get(step_id, set()):
if self._is_ready(successor, completed, failed):
ready.add(successor)
except Exception as e:
failed.add(step_id)
self._cascade_cancel(step_id, in_progress, failed)
if failed:
raise DAGExecutionError(f"DAG failed at nodes: {failed}")
return ctx
def _is_ready(self, step_id, completed, failed):
deps = self.dependencies[step_id]
return (
not any(d in failed for d in deps) and
all(d in completed for d in deps)
)
def _cascade_cancel(self, failed_id, in_progress, failed):
queue = list(self.reverse_deps.get(failed_id, set()))
while queue:
node = queue.pop(0)
if node in in_progress:
in_progress[node].cancel()
failed.add(node)
queue.extend(self.reverse_deps.get(node, set()))
34.6 Case Study: Research + Write + Publish Pipeline
DAG Configuration
name: "research-write-publish"
dag:
steps:
# Phase 1: Parallel Research
- id: web_search
skill: web-search-skill
inputs:
query: "$.inputs.topic"
- id: arxiv_search
skill: arxiv-search-skill
inputs:
query: "$.inputs.topic"
- id: news_fetch
skill: news-fetcher-skill
inputs:
topic: "$.inputs.topic"
- id: synthesize
skill: research-synthesizer-skill
depends_on: ["web_search", "arxiv_search", "news_fetch"]
inputs:
web_results: "$.steps.web_search.output"
arxiv_results: "$.steps.arxiv_search.output"
news_results: "$.steps.news_fetch.output"
# Phase 2: Sequential Writing
- id: outline
skill: outline-generator-skill
depends_on: ["synthesize"]
inputs:
research_summary: "$.steps.synthesize.output.summary"
- id: write
skill: content-writer-skill
depends_on: ["outline"]
inputs:
outline: "$.steps.outline.output.sections"
- id: quality_check
skill: quality-checker-skill
depends_on: ["write"]
config:
checks: ["grammar", "factual", "plagiarism"]
min_score: 0.85
# Phase 3: Parallel Publishing
- id: seo_optimize
skill: seo-optimizer-skill
depends_on: ["quality_check"]
- id: publish_cms
skill: cms-publisher-skill
depends_on: ["seo_optimize"]
- id: social_share
skill: social-share-skill
depends_on: ["seo_optimize"]
Running the Workflow
import asyncio
from hermes.orchestration.dag import DAGOrchestrator
from hermes.orchestration.loader import load_dag_config
async def main():
config = load_dag_config("research_write_publish.yaml")
orchestrator = DAGOrchestrator(config)
result = await orchestrator.execute({
"topic": "LLMs in medical diagnostics",
"target_audience": "healthcare professionals and AI researchers",
"publish_platforms": ["twitter", "linkedin"]
})
print(f"Published! CMS URL: {result.steps['publish_cms'].output['url']}")
asyncio.run(main())
34.7 Error Propagation and Compensation
Error Strategy Comparison
| Strategy | Description | Best For |
|---|---|---|
| Fail-Fast | Stop the entire pipeline on first error | Strict data consistency |
| Continue | Log error, skip failed step | Non-critical optional steps |
| Retry | Retry with exponential backoff | Network timeouts, transient errors |
| Compensate (Saga) | Undo completed steps in reverse order | Transactional workflows |
| Circuit-Break | Open circuit after N failures | Unstable external dependencies |
Saga Compensation Pattern
class SagaOrchestrator:
"""
If step N fails, automatically executes compensation
for steps 1 through N-1 in reverse order.
"""
def __init__(self, steps: List[SagaStep]):
self.steps = steps
self.completed = []
async def execute(self, initial_data: Any) -> Any:
data = initial_data
for step in self.steps:
try:
data = await step.execute(data)
self.completed.append(step)
except Exception as e:
await self._compensate()
raise SagaRollbackError(
f"Saga failed at {step.name}, all steps rolled back"
) from e
return data
async def _compensate(self):
for step in reversed(self.completed):
try:
await step.compensate()
except Exception as e:
# Log critical alert — manual intervention needed
logger.critical(f"Compensation failed for {step.name}: {e}")
Chapter Summary
This chapter covered all four Hermes Skill composition patterns:
- Sequential Pipeline (A→B→C): Simple, direct, ideal for strict data flow chains with retry logic
- Conditional Branch: Runtime routing enables intelligent workflow decisions
- Parallel Execution:
asyncio-powered concurrency dramatically reduces total wall-clock time - DAG Orchestration: The most powerful pattern—automatically computes optimal parallelism for complex dependency graphs
The "Research + Write + Publish" case study demonstrated how to combine all patterns into a production-grade content creation workflow with robust error handling.
Review Questions
- How would you implement a real-time progress reporting feature so users can see each DAG node's execution status live?
- If a step significantly exceeds its expected duration (e.g., web_search takes 5 minutes), how would you design timeout and circuit-breaking mechanisms?
- Should conditional branch functions be allowed to call an LLM? What problems could this introduce?
- Design a "Skill Pipeline Template Marketplace" where users share workflow configurations. How would you handle template versioning and compatibility?