Chapter 34

Skill Composition Patterns: Pipeline and DAG Orchestration

Chapter 34: Skill Composition Patterns: Pipeline and DAG Orchestration

A single Skill is like one Lego brickโ€”focused but limited in scope. Real power emerges when multiple Skills are combined into workflows capable of accomplishing complex tasks. This chapter explores Hermes Agent's Skill composition patterns, from simple sequential pipelines to sophisticated DAG (Directed Acyclic Graph) orchestration, with a complete "research + write + publish" case study demonstrating production-grade workflow construction.


34.1 Composition Pattern Overview

Hermes supports four Skill composition patterns, covering every level of workflow complexity:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚              Skill Composition Patterns                 โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚  Sequential    โ”‚  Conditional โ”‚  Parallel โ”‚  DAG       โ”‚
โ”‚  Pipeline      โ”‚  Branch      โ”‚  Executionโ”‚  Orchestr. โ”‚
โ”‚  A โ†’ B โ†’ C     โ”‚  if/else     โ”‚  A โˆฅ Bโ†’C โ”‚  Graph     โ”‚
โ”‚  Simple, serialโ”‚  Dynamic     โ”‚  Speed    โ”‚  Complex   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Pattern Selection Guide

Scenario Recommended Pattern Reason
Data processing pipeline Sequential Pipeline Strict step-by-step dependency
Content-based routing Conditional Branch Dynamic path selection
Multi-source data fetching Parallel Execution No inter-step dependency
Research + write + publish DAG Orchestration Mixed serial/parallel

34.2 Sequential Pipeline Pattern (Aโ†’Bโ†’C)

The sequential pipeline is the most fundamental composition pattern: each Skill's output becomes the next Skill's input, forming a data processing chain.

YAML Configuration

# pipeline_config.yaml
name: "data-processing-pipeline"
pipeline:
  type: sequential
  steps:
    - id: fetch
      skill: web-fetcher-skill
      version: "^2.0.0"
      config:
        timeout: 30
        retry: 3
      input_mapping:
        url: "$.pipeline_input.url"
      
    - id: clean
      skill: html-cleaner-skill
      version: "^1.5.0"
      input_mapping:
        raw_html: "$.steps.fetch.output.html"
        
    - id: store
      skill: database-writer-skill
      version: "^3.0.0"
      input_mapping:
        content: "$.steps.clean.output.text"

  on_failure:
    strategy: "stop"    # stop | continue | retry

Python Implementation: Pipeline Engine

# hermes/orchestration/pipeline.py
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import asyncio, time, logging

logger = logging.getLogger(__name__)

@dataclass
class StepResult:
    step_id: str
    success: bool
    output: Any
    error: Optional[Exception] = None
    duration_ms: float = 0.0
    retry_count: int = 0

@dataclass
class PipelineContext:
    pipeline_input: Any
    steps: Dict[str, StepResult] = field(default_factory=dict)
    
    def get_step_output(self, step_id: str) -> Any:
        result = self.steps.get(step_id)
        if result is None or not result.success:
            raise RuntimeError(f"Step '{step_id}' not completed successfully")
        return result.output


class SequentialPipeline:
    def __init__(self, steps: List[Dict], on_failure: str = "stop", max_retries: int = 3):
        self.steps = steps
        self.on_failure = on_failure
        self.max_retries = max_retries
    
    async def execute(self, pipeline_input: Any) -> PipelineContext:
        ctx = PipelineContext(pipeline_input=pipeline_input)
        
        for step_config in self.steps:
            step_id = step_config["id"]
            result = await self._execute_with_retry(step_config, ctx)
            ctx.steps[step_id] = result
            
            if not result.success:
                if self.on_failure == "stop":
                    raise PipelineExecutionError(
                        f"Pipeline aborted at step '{step_id}': {result.error}"
                    )
                # "continue" mode: log and proceed
        return ctx
    
    async def _execute_with_retry(self, step_config: Dict, ctx: PipelineContext) -> StepResult:
        step_id = step_config["id"]
        max_retries = step_config.get("retry", self.max_retries)
        
        for attempt in range(max_retries + 1):
            start = time.time()
            try:
                step_input = self._resolve_inputs(step_config.get("input_mapping", {}), ctx)
                skill = await self._load_skill(step_config["skill"], step_config.get("version", "*"))
                output = await skill.execute(step_input, config=step_config.get("config", {}))
                return StepResult(
                    step_id=step_id, success=True, output=output,
                    duration_ms=(time.time() - start) * 1000, retry_count=attempt
                )
            except Exception as e:
                if attempt < max_retries:
                    await asyncio.sleep(2 ** attempt)
                else:
                    return StepResult(
                        step_id=step_id, success=False, output=None, error=e,
                        duration_ms=(time.time() - start) * 1000
                    )

34.3 Conditional Branch Pattern

Conditional branching enables dynamic routing based on runtime dataโ€”the key to building intelligent workflows.

YAML Configuration

pipeline:
  type: conditional
  steps:
    - id: classify
      skill: content-classifier-skill
      
    - id: route
      type: switch
      condition: "$.steps.classify.output.category"
      cases:
        "news":
          - id: process_news
            skill: news-processor-skill
        "technical":
          - id: process_tech
            skill: tech-analyzer-skill
        "default":
          - id: process_generic
            skill: generic-processor-skill
      
    - id: format
      skill: output-formatter-skill
      depends_on: ["route"]
class ConditionalStep:
    def __init__(self, condition, branches, default_branch="default"):
        self.condition = condition
        self.branches = branches
        self.default_branch = default_branch
    
    async def execute(self, ctx):
        branch_key = self.condition(ctx)
        branch_steps = self.branches.get(
            branch_key, self.branches.get(self.default_branch, [])
        )
        if not branch_steps:
            raise ValueError(f"No branch found for key: {branch_key}")
        
        branch_pipeline = SequentialPipeline(branch_steps)
        return await branch_pipeline.execute(ctx.pipeline_input)

34.4 Parallel Execution Pattern

When multiple steps are independent of each other, parallel execution dramatically reduces total time.

# hermes/orchestration/parallel.py
import asyncio
from typing import Any, Dict, List

class ParallelExecutor:
    def __init__(self, max_concurrency: int = 10, timeout: float = 60.0):
        self.max_concurrency = max_concurrency
        self.timeout = timeout
        self._semaphore = asyncio.Semaphore(max_concurrency)
    
    async def execute_all(
        self, steps: List[Dict], ctx, fail_fast: bool = True
    ) -> Dict[str, StepResult]:
        tasks = {
            step["id"]: asyncio.create_task(
                self._execute_with_semaphore(step, ctx)
            )
            for step in steps
        }
        
        try:
            results_list = await asyncio.wait_for(
                asyncio.gather(*tasks.values(), return_exceptions=True),
                timeout=self.timeout
            )
        except asyncio.TimeoutError:
            for t in tasks.values():
                t.cancel()
            raise TimeoutError(f"Parallel execution timed out (>{self.timeout}s)")
        
        results = {}
        for step_id, result in zip(tasks.keys(), results_list):
            if isinstance(result, Exception):
                results[step_id] = StepResult(
                    step_id=step_id, success=False, output=None, error=result
                )
                if fail_fast:
                    raise result
            else:
                results[step_id] = result
        return results
    
    async def _execute_with_semaphore(self, step, ctx):
        async with self._semaphore:
            return await self._execute_step(step, ctx)

34.5 DAG Orchestration

DAG (Directed Acyclic Graph) is the most powerful orchestration patternโ€”it supports arbitrarily complex dependencies while automatically achieving optimal parallelization.

DAG Scheduler Logic

1. Find all nodes with zero in-degree (no dependencies) โ†’ launch in parallel
2. When a node completes, check if any successor now has all deps satisfied
3. If yes, launch that successor immediately
4. Repeat until all nodes are complete

DAG Engine Implementation

# hermes/orchestration/dag.py
import asyncio
from typing import Any, Dict, List, Set
from collections import defaultdict

class DAGOrchestrator:
    def __init__(self, dag_config: Dict):
        self.steps = {s["id"]: s for s in dag_config["steps"]}
        self.dependencies = {
            sid: set(s.get("depends_on", []))
            for sid, s in self.steps.items()
        }
        self.reverse_deps = self._build_reverse_deps()
    
    def _build_reverse_deps(self) -> Dict[str, Set[str]]:
        reverse = defaultdict(set)
        for step_id, deps in self.dependencies.items():
            for dep in deps:
                reverse[dep].add(step_id)
        return dict(reverse)
    
    async def execute(self, pipeline_input: Any) -> PipelineContext:
        ctx = PipelineContext(pipeline_input=pipeline_input)
        completed: Set[str] = set()
        failed: Set[str] = set()
        in_progress: Dict[str, asyncio.Task] = {}
        
        ready = {sid for sid, deps in self.dependencies.items() if len(deps) == 0}
        
        while ready or in_progress:
            for step_id in ready:
                in_progress[step_id] = asyncio.create_task(
                    self._execute_step(step_id, ctx)
                )
            ready.clear()
            
            if not in_progress:
                break
            
            done_tasks, _ = await asyncio.wait(
                in_progress.values(), return_when=asyncio.FIRST_COMPLETED
            )
            
            for task in done_tasks:
                step_id = next(sid for sid, t in in_progress.items() if t == task)
                del in_progress[step_id]
                
                try:
                    ctx.steps[step_id] = task.result()
                    completed.add(step_id)
                    for successor in self.reverse_deps.get(step_id, set()):
                        if self._is_ready(successor, completed, failed):
                            ready.add(successor)
                except Exception as e:
                    failed.add(step_id)
                    self._cascade_cancel(step_id, in_progress, failed)
        
        if failed:
            raise DAGExecutionError(f"DAG failed at nodes: {failed}")
        return ctx
    
    def _is_ready(self, step_id, completed, failed):
        deps = self.dependencies[step_id]
        return (
            not any(d in failed for d in deps) and
            all(d in completed for d in deps)
        )
    
    def _cascade_cancel(self, failed_id, in_progress, failed):
        queue = list(self.reverse_deps.get(failed_id, set()))
        while queue:
            node = queue.pop(0)
            if node in in_progress:
                in_progress[node].cancel()
                failed.add(node)
                queue.extend(self.reverse_deps.get(node, set()))

34.6 Case Study: Research + Write + Publish Pipeline

DAG Configuration

name: "research-write-publish"

dag:
  steps:
    # Phase 1: Parallel Research
    - id: web_search
      skill: web-search-skill
      inputs:
        query: "$.inputs.topic"

    - id: arxiv_search
      skill: arxiv-search-skill
      inputs:
        query: "$.inputs.topic"

    - id: news_fetch
      skill: news-fetcher-skill
      inputs:
        topic: "$.inputs.topic"

    - id: synthesize
      skill: research-synthesizer-skill
      depends_on: ["web_search", "arxiv_search", "news_fetch"]
      inputs:
        web_results:   "$.steps.web_search.output"
        arxiv_results: "$.steps.arxiv_search.output"
        news_results:  "$.steps.news_fetch.output"

    # Phase 2: Sequential Writing
    - id: outline
      skill: outline-generator-skill
      depends_on: ["synthesize"]
      inputs:
        research_summary: "$.steps.synthesize.output.summary"

    - id: write
      skill: content-writer-skill
      depends_on: ["outline"]
      inputs:
        outline: "$.steps.outline.output.sections"

    - id: quality_check
      skill: quality-checker-skill
      depends_on: ["write"]
      config:
        checks: ["grammar", "factual", "plagiarism"]
        min_score: 0.85

    # Phase 3: Parallel Publishing
    - id: seo_optimize
      skill: seo-optimizer-skill
      depends_on: ["quality_check"]

    - id: publish_cms
      skill: cms-publisher-skill
      depends_on: ["seo_optimize"]

    - id: social_share
      skill: social-share-skill
      depends_on: ["seo_optimize"]

Running the Workflow

import asyncio
from hermes.orchestration.dag import DAGOrchestrator
from hermes.orchestration.loader import load_dag_config

async def main():
    config = load_dag_config("research_write_publish.yaml")
    orchestrator = DAGOrchestrator(config)
    
    result = await orchestrator.execute({
        "topic": "LLMs in medical diagnostics",
        "target_audience": "healthcare professionals and AI researchers",
        "publish_platforms": ["twitter", "linkedin"]
    })
    
    print(f"Published! CMS URL: {result.steps['publish_cms'].output['url']}")

asyncio.run(main())

34.7 Error Propagation and Compensation

Error Strategy Comparison

Strategy Description Best For
Fail-Fast Stop the entire pipeline on first error Strict data consistency
Continue Log error, skip failed step Non-critical optional steps
Retry Retry with exponential backoff Network timeouts, transient errors
Compensate (Saga) Undo completed steps in reverse order Transactional workflows
Circuit-Break Open circuit after N failures Unstable external dependencies

Saga Compensation Pattern

class SagaOrchestrator:
    """
    If step N fails, automatically executes compensation
    for steps 1 through N-1 in reverse order.
    """
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps
        self.completed = []
    
    async def execute(self, initial_data: Any) -> Any:
        data = initial_data
        for step in self.steps:
            try:
                data = await step.execute(data)
                self.completed.append(step)
            except Exception as e:
                await self._compensate()
                raise SagaRollbackError(
                    f"Saga failed at {step.name}, all steps rolled back"
                ) from e
        return data
    
    async def _compensate(self):
        for step in reversed(self.completed):
            try:
                await step.compensate()
            except Exception as e:
                # Log critical alert โ€” manual intervention needed
                logger.critical(f"Compensation failed for {step.name}: {e}")

Chapter Summary

This chapter covered all four Hermes Skill composition patterns:

  1. Sequential Pipeline (Aโ†’Bโ†’C): Simple, direct, ideal for strict data flow chains with retry logic
  2. Conditional Branch: Runtime routing enables intelligent workflow decisions
  3. Parallel Execution: asyncio-powered concurrency dramatically reduces total wall-clock time
  4. DAG Orchestration: The most powerful patternโ€”automatically computes optimal parallelism for complex dependency graphs

The "Research + Write + Publish" case study demonstrated how to combine all patterns into a production-grade content creation workflow with robust error handling.

Review Questions

  1. How would you implement a real-time progress reporting feature so users can see each DAG node's execution status live?
  2. If a step significantly exceeds its expected duration (e.g., web_search takes 5 minutes), how would you design timeout and circuit-breaking mechanisms?
  3. Should conditional branch functions be allowed to call an LLM? What problems could this introduce?
  4. Design a "Skill Pipeline Template Marketplace" where users share workflow configurations. How would you handle template versioning and compatibility?
Rate this chapter
4.5  / 5  (3 ratings)

๐Ÿ’ฌ Comments