第 34 章

Skill 组合模式:Pipeline 与 DAG 编排

第34章:Skill 组合模式:Pipeline 与 DAG 编排

单个 Skill 如同一块乐高积木——功能专一但能力有限。真正的力量来自于将多个 Skill 有机组合,形成能完成复杂任务的工作流。本章深入探讨 Hermes Agent 的 Skill 组合模式,从最简单的顺序 Pipeline 到复杂的 DAG(有向无环图)编排,并通过一个"研究+写作+发布"完整实战案例展示如何构建生产级工作流。


34.1 组合模式总览

Hermes 支持四种 Skill 组合模式,覆盖从简单到复杂的所有业务场景:

┌──────────────────────────────────────────────────────────────┐
│                  Skill 组合模式体系                            │
├──────────────┬───────────────┬──────────────┬────────────────┤
│  顺序 Pipeline │   条件分支     │   并行执行    │   DAG 编排     │
│  A → B → C   │  if/else 路由  │  A ∥ B → C  │  有向无环图     │
│  串行、简单    │  动态路由      │  并发提速     │  复杂依赖关系   │
└──────────────┴───────────────┴──────────────┴────────────────┘

模式选择指南

场景 推荐模式 原因
数据处理流水线 顺序 Pipeline 步骤间有严格依赖
根据内容分类处理 条件分支 需要动态路由
多数据源并发获取 并行执行 步骤间无依赖,追求速度
复杂研究+写作流程 DAG 编排 混合串并行,需精确控制

34.2 顺序 Pipeline 模式(A→B→C)

顺序 Pipeline 是最基础的组合模式:每个 Skill 的输出成为下一个 Skill 的输入,形成一条数据处理链。

基本架构

输入数据
   ↓
┌────────┐     ┌────────┐     ┌────────┐
│Skill A │ ──→ │Skill B │ ──→ │Skill C │
│(抓取)  │     │(清洗)  │     │(存储)  │
└────────┘     └────────┘     └────────┘
                                  ↓
                               输出结果

YAML 声明式配置

# pipeline_config.yaml
name: "data-processing-pipeline"
version: "1.0.0"
description: "网页数据抓取、清洗、存储流水线"

pipeline:
  type: sequential
  steps:
    - id: fetch
      skill: web-fetcher-skill
      version: "^2.0.0"
      config:
        timeout: 30
        retry: 3
        user_agent: "HermesBot/1.0"
      input_mapping:
        url: "$.pipeline_input.url"
      
    - id: clean
      skill: html-cleaner-skill
      version: "^1.5.0"
      config:
        remove_scripts: true
        extract_main_content: true
      input_mapping:
        raw_html: "$.steps.fetch.output.html"
        
    - id: store
      skill: database-writer-skill
      version: "^3.0.0"
      config:
        table: "scraped_content"
        batch_size: 100
      input_mapping:
        content: "$.steps.clean.output.text"
        metadata: "$.steps.fetch.output.metadata"

  on_failure:
    strategy: "stop"    # stop / continue / retry
    notify: "[email protected]"

Python 实现:Pipeline 执行引擎

# hermes/orchestration/pipeline.py
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable
import time
import logging

logger = logging.getLogger(__name__)

@dataclass
class StepResult:
    """单个步骤的执行结果"""
    step_id: str
    success: bool
    output: Any
    error: Optional[Exception] = None
    duration_ms: float = 0.0
    retry_count: int = 0

@dataclass
class PipelineContext:
    """Pipeline 执行上下文,在步骤间传递数据"""
    pipeline_input: Any
    steps: Dict[str, StepResult] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def get_step_output(self, step_id: str) -> Any:
        result = self.steps.get(step_id)
        if result is None:
            raise KeyError(f"步骤 '{step_id}' 尚未执行或不存在")
        if not result.success:
            raise RuntimeError(f"步骤 '{step_id}' 执行失败,无法获取输出")
        return result.output


class SequentialPipeline:
    """顺序 Pipeline 执行引擎"""
    
    def __init__(
        self, 
        steps: List[Dict],
        on_failure: str = "stop",    # stop | continue | retry
        max_retries: int = 3
    ):
        self.steps = steps
        self.on_failure = on_failure
        self.max_retries = max_retries
    
    async def execute(self, pipeline_input: Any) -> PipelineContext:
        ctx = PipelineContext(pipeline_input=pipeline_input)
        
        for step_config in self.steps:
            step_id = step_config["id"]
            skill_name = step_config["skill"]
            
            logger.info(f"[Pipeline] 开始执行步骤: {step_id} ({skill_name})")
            result = await self._execute_step_with_retry(step_config, ctx)
            ctx.steps[step_id] = result
            
            if not result.success:
                logger.error(
                    f"[Pipeline] 步骤 {step_id} 失败: {result.error},"
                    f"耗时 {result.duration_ms:.1f}ms"
                )
                if self.on_failure == "stop":
                    raise PipelineExecutionError(
                        f"Pipeline 在步骤 '{step_id}' 处中止: {result.error}"
                    )
                # on_failure == "continue":记录错误,继续执行
            else:
                logger.info(
                    f"[Pipeline] 步骤 {step_id} 成功,"
                    f"耗时 {result.duration_ms:.1f}ms"
                )
        
        return ctx
    
    async def _execute_step_with_retry(
        self,
        step_config: Dict,
        ctx: PipelineContext
    ) -> StepResult:
        step_id = step_config["id"]
        max_retries = step_config.get("retry", self.max_retries)
        
        for attempt in range(max_retries + 1):
            start = time.time()
            try:
                # 解析输入映射
                step_input = self._resolve_inputs(
                    step_config.get("input_mapping", {}), ctx
                )
                
                # 获取并执行 Skill
                skill = await self._load_skill(
                    step_config["skill"],
                    step_config.get("version", "*")
                )
                output = await skill.execute(
                    step_input, 
                    config=step_config.get("config", {})
                )
                
                duration = (time.time() - start) * 1000
                return StepResult(
                    step_id=step_id,
                    success=True,
                    output=output,
                    duration_ms=duration,
                    retry_count=attempt
                )
                
            except Exception as e:
                duration = (time.time() - start) * 1000
                if attempt < max_retries:
                    wait = 2 ** attempt  # 指数退避
                    logger.warning(
                        f"步骤 {step_id} 第 {attempt+1} 次失败,"
                        f"{wait}s 后重试: {e}"
                    )
                    await asyncio.sleep(wait)
                else:
                    return StepResult(
                        step_id=step_id,
                        success=False,
                        output=None,
                        error=e,
                        duration_ms=duration,
                        retry_count=attempt
                    )
    
    def _resolve_inputs(self, mapping: Dict, ctx: PipelineContext) -> Dict:
        """解析 JSONPath 风格的输入映射"""
        resolved = {}
        for key, path in mapping.items():
            if path.startswith("$.pipeline_input"):
                field = path.replace("$.pipeline_input.", "")
                resolved[key] = getattr(ctx.pipeline_input, field, 
                                       ctx.pipeline_input.get(field))
            elif path.startswith("$.steps."):
                parts = path.split(".")
                step_id = parts[2]
                output_field = ".".join(parts[4:]) if len(parts) > 4 else None
                output = ctx.get_step_output(step_id)
                resolved[key] = output.get(output_field) if output_field else output
        return resolved
    
    async def _load_skill(self, name: str, version: str):
        from hermes.skill.registry import SkillRegistry
        return await SkillRegistry.get(name, version)


class PipelineExecutionError(Exception):
    pass

34.3 条件分支模式

条件分支允许根据运行时数据动态选择执行路径,是构建智能工作流的关键。

架构图

              ┌──────────┐
              │ Skill A   │
              │ (分类器)  │
              └─────┬─────┘
                    │
         ┌──────────┴──────────┐
         │ 根据分类结果路由      │
         └──────┬──────────────┘
                │
    ┌───────────┼───────────┐
    ↓           ↓           ↓
┌───────┐  ┌───────┐  ┌───────┐
│路径 A │  │路径 B │  │路径 C │
│(新闻) │  │(技术) │  │(其他) │
└───┬───┘  └───┬───┘  └───┬───┘
    └──────────┴──────────┘
                │
         ┌──────┴─────┐
         │  汇聚节点   │
         │ (格式化)    │
         └────────────┘

YAML 配置与 Python 实现

# conditional_pipeline.yaml
pipeline:
  type: conditional
  steps:
    - id: classify
      skill: content-classifier-skill
      
    - id: route
      type: switch
      condition: "$.steps.classify.output.category"
      cases:
        "news":
          - id: process_news
            skill: news-processor-skill
        "technical":
          - id: process_tech
            skill: tech-analyzer-skill
        "default":
          - id: process_generic
            skill: generic-processor-skill
      
    - id: format
      skill: output-formatter-skill
      depends_on: ["route"]   # 等待分支汇聚
# hermes/orchestration/conditional.py
from typing import Any, Callable, Dict, List

class ConditionalStep:
    """条件分支步骤"""
    
    def __init__(
        self,
        condition: Callable[[Any], str],
        branches: Dict[str, List],
        default_branch: str = "default"
    ):
        self.condition = condition
        self.branches = branches
        self.default_branch = default_branch
    
    async def execute(self, ctx: 'PipelineContext') -> 'PipelineContext':
        # 评估条件
        branch_key = self.condition(ctx)
        
        # 选择分支
        branch_steps = self.branches.get(
            branch_key, 
            self.branches.get(self.default_branch, [])
        )
        
        if not branch_steps:
            raise ValueError(f"未找到分支: {branch_key}")
        
        logger.info(f"[Conditional] 路由到分支: {branch_key}")
        
        # 执行选中分支
        branch_pipeline = SequentialPipeline(branch_steps)
        return await branch_pipeline.execute(ctx.pipeline_input)

34.4 并行执行模式

当多个步骤彼此独立时,并行执行可以显著缩短总体时间。

并行执行架构

             ┌─────────────────┐
             │   输入数据       │
             └────────┬────────┘
                      │ 分发
         ┌────────────┼────────────┐
         ↓            ↓            ↓
    ┌─────────┐  ┌─────────┐  ┌─────────┐
    │搜索引擎A│  │搜索引擎B│  │搜索引擎C│
    │(Google) │  │(Bing)   │  │(DuckDGo)│
    └────┬────┘  └────┬────┘  └────┬────┘
         └────────────┼────────────┘
                      │ 汇聚
             ┌────────┴────────┐
             │   结果合并器     │
             └─────────────────┘

Python 实现:并行执行引擎

# hermes/orchestration/parallel.py
import asyncio
from typing import Any, Dict, List, Optional

class ParallelExecutor:
    """并行 Skill 执行器"""
    
    def __init__(
        self, 
        max_concurrency: int = 10,
        timeout: float = 60.0
    ):
        self.max_concurrency = max_concurrency
        self.timeout = timeout
        self._semaphore = asyncio.Semaphore(max_concurrency)
    
    async def execute_all(
        self,
        steps: List[Dict],
        ctx: 'PipelineContext',
        fail_fast: bool = True
    ) -> Dict[str, StepResult]:
        """并行执行所有步骤,返回结果字典"""
        
        tasks = {
            step["id"]: asyncio.create_task(
                self._execute_with_semaphore(step, ctx)
            )
            for step in steps
        }
        
        results = {}
        
        try:
            done = await asyncio.wait_for(
                asyncio.gather(*tasks.values(), return_exceptions=True),
                timeout=self.timeout
            )
            
            for step_id, result in zip(tasks.keys(), done):
                if isinstance(result, Exception):
                    results[step_id] = StepResult(
                        step_id=step_id,
                        success=False,
                        output=None,
                        error=result
                    )
                    if fail_fast:
                        # 取消其他任务
                        for tid, task in tasks.items():
                            if tid != step_id:
                                task.cancel()
                        raise result
                else:
                    results[step_id] = result
                    
        except asyncio.TimeoutError:
            for task in tasks.values():
                task.cancel()
            raise TimeoutError(f"并行执行超时(>{self.timeout}s)")
        
        return results
    
    async def _execute_with_semaphore(self, step, ctx):
        async with self._semaphore:
            return await self._execute_step(step, ctx)
    
    async def execute_map_reduce(
        self,
        map_steps: List[Dict],
        reduce_step: Dict,
        ctx: 'PipelineContext'
    ) -> StepResult:
        """Map-Reduce 模式:并行 Map,然后 Reduce"""
        # Map 阶段
        map_results = await self.execute_all(map_steps, ctx)
        
        # 准备 Reduce 输入
        reduce_input = {
            step_id: result.output 
            for step_id, result in map_results.items()
            if result.success
        }
        
        # Reduce 阶段
        return await self._execute_step(reduce_step, ctx, extra_input=reduce_input)

34.5 DAG 编排:有向无环图

DAG(Directed Acyclic Graph,有向无环图)是最强大的编排模式,支持任意复杂的依赖关系,同时自动实现最优的并行化。

DAG 的核心概念

节点(Node)= 一个 Skill 执行步骤
边(Edge) = 数据依赖关系(A→B 表示 B 需要 A 的输出)

DAG 调度器会:
1. 找出所有"入度为0"的节点(无依赖),并行启动
2. 某节点完成后,检查其后继节点是否所有依赖都已完成
3. 是则立即启动该后继节点
4. 重复直到所有节点完成

DAG 执行引擎实现

# hermes/orchestration/dag.py
import asyncio
from typing import Any, Dict, List, Set, Optional
from collections import defaultdict

class DAGOrchestrator:
    """
    DAG 编排引擎
    
    支持:
    - 自动并行化无依赖节点
    - 依赖就绪后立即启动
    - 失败节点的级联取消
    - 完整的执行状态追踪
    """
    
    def __init__(self, dag_config: Dict):
        self.steps = {s["id"]: s for s in dag_config["steps"]}
        self.dependencies = self._parse_dependencies()
        self.reverse_deps = self._build_reverse_deps()
    
    def _parse_dependencies(self) -> Dict[str, Set[str]]:
        """解析每个节点的依赖集"""
        deps = {}
        for step_id, step in self.steps.items():
            deps[step_id] = set(step.get("depends_on", []))
        return deps
    
    def _build_reverse_deps(self) -> Dict[str, Set[str]]:
        """构建反向依赖图(节点→其后继节点集)"""
        reverse = defaultdict(set)
        for step_id, deps in self.dependencies.items():
            for dep in deps:
                reverse[dep].add(step_id)
        return dict(reverse)
    
    async def execute(self, pipeline_input: Any) -> 'PipelineContext':
        ctx = PipelineContext(pipeline_input=pipeline_input)
        
        # 追踪状态
        completed: Set[str] = set()
        failed: Set[str] = set()
        in_progress: Dict[str, asyncio.Task] = {}
        
        # 找到初始可执行节点(无依赖)
        ready = {
            sid for sid, deps in self.dependencies.items()
            if len(deps) == 0
        }
        
        while ready or in_progress:
            # 启动所有就绪节点
            for step_id in ready:
                task = asyncio.create_task(
                    self._execute_step(step_id, ctx)
                )
                in_progress[step_id] = task
            ready.clear()
            
            if not in_progress:
                break
            
            # 等待任意一个节点完成
            done_tasks, _ = await asyncio.wait(
                in_progress.values(),
                return_when=asyncio.FIRST_COMPLETED
            )
            
            for task in done_tasks:
                # 找到对应的 step_id
                step_id = next(
                    sid for sid, t in in_progress.items() if t == task
                )
                del in_progress[step_id]
                
                try:
                    result = task.result()
                    ctx.steps[step_id] = result
                    completed.add(step_id)
                    
                    # 检查后继节点是否就绪
                    for successor in self.reverse_deps.get(step_id, set()):
                        if self._is_ready(successor, completed, failed):
                            ready.add(successor)
                            
                except Exception as e:
                    failed.add(step_id)
                    logger.error(f"DAG 节点 {step_id} 失败: {e}")
                    
                    # 级联取消依赖于失败节点的所有后继
                    self._cascade_cancel(step_id, in_progress, failed)
        
        if failed:
            raise DAGExecutionError(f"DAG 执行失败,失败节点: {failed}")
        
        return ctx
    
    def _is_ready(self, step_id: str, completed: Set, failed: Set) -> bool:
        """检查节点的所有依赖是否已完成"""
        deps = self.dependencies[step_id]
        if any(dep in failed for dep in deps):
            return False  # 有依赖失败,不启动
        return all(dep in completed for dep in deps)
    
    def _cascade_cancel(
        self, 
        failed_id: str, 
        in_progress: Dict,
        failed: Set
    ):
        """级联取消所有依赖于失败节点的后继"""
        to_cancel = set()
        queue = list(self.reverse_deps.get(failed_id, set()))
        
        while queue:
            node = queue.pop(0)
            if node not in to_cancel:
                to_cancel.add(node)
                queue.extend(self.reverse_deps.get(node, set()))
        
        for node in to_cancel:
            if node in in_progress:
                in_progress[node].cancel()
                failed.add(node)
                logger.warning(f"因 {failed_id} 失败,级联取消: {node}")


class DAGExecutionError(Exception):
    pass

34.6 实战:构建"研究+写作+发布"三段 Skill 流水线

工作流架构设计

                    ┌──────────────────────────────────────┐
                    │       研究+写作+发布工作流               │
                    └──────────────────────────────────────┘
                                      │
              ┌───────────────────────┼───────────────────────┐
              │      阶段一:研究(并行)                        │
              │                                               │
    ┌─────────────┐  ┌─────────────┐  ┌─────────────┐        │
    │ web-search  │  │ arxiv-search│  │ news-fetcher │        │
    │  (网页搜索) │  │  (论文搜索) │  │  (新闻抓取) │        │
    └──────┬──────┘  └──────┬──────┘  └──────┬──────┘        │
           └────────────────┼────────────────┘               │
                            ↓                               │
              ┌─────────────────────────────────┐           │
              │   research-synthesizer           │           │
              │   (综合分析,合并三路研究结果)     │           │
              └──────────────────┬──────────────┘           │
              ┌──────────────────┼──────────────────────────┐
              │      阶段二:写作(串行)                      │
              │                  ↓                          │
              │    ┌─────────────────────────┐              │
              │    │   outline-generator      │              │
              │    │   (生成文章大纲)         │              │
              │    └─────────────┬───────────┘              │
              │                  ↓                          │
              │    ┌─────────────────────────┐              │
              │    │   content-writer         │              │
              │    │   (撰写正文)             │              │
              │    └─────────────┬───────────┘              │
              │                  ↓                          │
              │    ┌─────────────────────────┐              │
              │    │   quality-checker        │              │
              │    │   (质量审核)             │              │
              │    └─────────────┬───────────┘              │
              └──────────────────┼──────────────────────────┘
              ┌──────────────────┼──────────────────────────┐
              │      阶段三:发布(并行+串行混合)              │
              │                  ↓                          │
              │    ┌─────────────────────────┐              │
              │    │   seo-optimizer          │              │
              │    │   (SEO 优化)             │              │
              │    └──────┬──────────┬────────┘              │
              │           ↓          ↓                      │
              │  ┌──────────┐  ┌──────────┐                 │
              │  │cms-poster │  │social-   │                 │
              │  │(发布CMS)  │  │share     │                 │
              │  └──────────┘  │(社交分享) │                 │
              │                └──────────┘                 │
              └──────────────────────────────────────────────┘

完整 DAG 配置

# research_write_publish.yaml
name: "research-write-publish"
version: "1.0.0"
description: "AI 辅助内容创作完整流水线"

inputs:
  topic: string
  target_audience: string
  publish_platforms: list

dag:
  steps:
    # ===== 阶段一:并行研究 =====
    - id: web_search
      skill: web-search-skill
      version: "^2.0.0"
      config:
        engines: ["google", "bing"]
        max_results: 20
        language: "zh-CN"
      inputs:
        query: "$.inputs.topic"

    - id: arxiv_search
      skill: arxiv-search-skill
      version: "^1.0.0"
      config:
        max_papers: 10
        sort_by: "relevance"
      inputs:
        query: "$.inputs.topic"

    - id: news_fetch
      skill: news-fetcher-skill
      version: "^1.2.0"
      config:
        days_back: 30
        min_relevance: 0.7
      inputs:
        topic: "$.inputs.topic"

    # 综合研究结果(依赖三个并行搜索)
    - id: synthesize
      skill: research-synthesizer-skill
      version: "^2.0.0"
      depends_on: ["web_search", "arxiv_search", "news_fetch"]
      inputs:
        web_results:   "$.steps.web_search.output"
        arxiv_results: "$.steps.arxiv_search.output"
        news_results:  "$.steps.news_fetch.output"
        topic:         "$.inputs.topic"

    # ===== 阶段二:串行写作 =====
    - id: outline
      skill: outline-generator-skill
      version: "^1.0.0"
      depends_on: ["synthesize"]
      config:
        max_sections: 8
        style: "detailed"
      inputs:
        research_summary: "$.steps.synthesize.output.summary"
        audience: "$.inputs.target_audience"

    - id: write
      skill: content-writer-skill
      version: "^3.0.0"
      depends_on: ["outline"]
      config:
        model: "hermes-pro"
        tone: "professional"
        min_words: 2000
      inputs:
        outline: "$.steps.outline.output.sections"
        research: "$.steps.synthesize.output.full_report"

    - id: quality_check
      skill: quality-checker-skill
      version: "^1.5.0"
      depends_on: ["write"]
      config:
        checks: ["grammar", "factual", "plagiarism", "seo-basics"]
        min_score: 0.85
      inputs:
        content: "$.steps.write.output.article"

    # ===== 阶段三:并行发布 =====
    - id: seo_optimize
      skill: seo-optimizer-skill
      version: "^1.0.0"
      depends_on: ["quality_check"]
      config:
        target_keywords_count: 5
        meta_description_length: 160
      inputs:
        article: "$.steps.quality_check.output.approved_content"
        topic:   "$.inputs.topic"

    - id: publish_cms
      skill: cms-publisher-skill
      version: "^2.0.0"
      depends_on: ["seo_optimize"]
      config:
        cms: "wordpress"
        status: "draft"
      inputs:
        content:      "$.steps.seo_optimize.output.article"
        seo_metadata: "$.steps.seo_optimize.output.metadata"

    - id: social_share
      skill: social-share-skill
      version: "^1.0.0"
      depends_on: ["seo_optimize"]
      config:
        platforms: "$.inputs.publish_platforms"
      inputs:
        title:   "$.steps.seo_optimize.output.title"
        summary: "$.steps.seo_optimize.output.description"
        url:     "$.steps.publish_cms.output.url"

运行工作流

# run_pipeline.py
import asyncio
from hermes.orchestration.dag import DAGOrchestrator
from hermes.orchestration.loader import load_dag_config

async def main():
    # 加载 DAG 配置
    config = load_dag_config("research_write_publish.yaml")
    
    # 创建编排器
    orchestrator = DAGOrchestrator(config)
    
    # 执行
    result = await orchestrator.execute({
        "topic": "大型语言模型在医疗诊断中的应用",
        "target_audience": "医疗从业者和AI研究员",
        "publish_platforms": ["twitter", "linkedin"]
    })
    
    # 获取最终结果
    print("文章已发布!")
    print(f"CMS URL: {result.steps['publish_cms'].output['url']}")
    print(f"总耗时: {result.metadata.get('total_duration_ms', 0):.1f}ms")

asyncio.run(main())

34.7 错误传播与补偿机制

错误处理策略对比

策略 描述 适用场景
Fail-Fast 遇到错误立即停止整个 Pipeline 严格数据一致性要求
Continue 记录错误,跳过失败步骤继续 非关键步骤可容忍失败
Retry 按退避策略重试 网络超时、临时错误
Compensate 执行补偿操作撤销已完成步骤 事务性工作流
Circuit-Break 熔断机制,避免雪崩 高并发、外部依赖不稳定

Saga 补偿模式实现

# hermes/orchestration/saga.py
from typing import Callable, Any, List
import logging

logger = logging.getLogger(__name__)

class SagaStep:
    """Saga 事务步骤,包含正向操作和补偿操作"""
    
    def __init__(
        self,
        name: str,
        execute: Callable,
        compensate: Callable
    ):
        self.name = name
        self.execute = execute
        self.compensate = compensate


class SagaOrchestrator:
    """
    Saga 事务编排器
    
    如果第 N 步失败,自动按逆序执行第 1~N-1 步的补偿操作。
    适用于需要事务一致性的工作流(如:付款+发邮件+更新库存)。
    """
    
    def __init__(self, steps: List[SagaStep]):
        self.steps = steps
        self.completed = []
    
    async def execute(self, initial_data: Any) -> Any:
        data = initial_data
        self.completed = []
        
        for step in self.steps:
            try:
                logger.info(f"[Saga] 执行: {step.name}")
                data = await step.execute(data)
                self.completed.append(step)
                
            except Exception as e:
                logger.error(f"[Saga] 步骤 {step.name} 失败: {e},开始补偿...")
                await self._compensate()
                raise SagaRollbackError(
                    f"Saga 在 {step.name} 失败,已回滚所有已完成步骤"
                ) from e
        
        return data
    
    async def _compensate(self):
        """按逆序执行补偿操作"""
        for step in reversed(self.completed):
            try:
                logger.info(f"[Saga] 补偿: {step.name}")
                await step.compensate()
            except Exception as e:
                # 补偿失败需要告警,但继续补偿其他步骤
                logger.critical(f"[Saga] 补偿 {step.name} 失败(需人工介入): {e}")


class SagaRollbackError(Exception):
    pass

本章小结

本章系统介绍了 Hermes Skill 的四种组合模式:

  1. 顺序 Pipeline(A→B→C):简单直接,适合有明确数据流向的处理链
  2. 条件分支:通过运行时数据动态路由,实现智能工作流决策
  3. 并行执行:利用 asyncio 并发处理无依赖步骤,大幅缩短总时间
  4. DAG 编排:最强大的模式,自动计算最优并行度,支持复杂依赖关系

通过"研究+写作+发布"实战案例,展示了如何将多种模式组合使用,构建生产级内容创作工作流。

思考题

  1. 在 DAG 执行时,如何实现"进度报告"功能,让用户实时看到每个节点的执行状态?
  2. 如果某个步骤的执行时间严重超出预期(例如 web_search 花了 5 分钟),你会如何设计超时和熔断机制?
  3. 条件分支的条件函数(condition)是否应该允许调用 LLM?这会带来什么问题?
  4. 设计一个"Skill Pipeline 模板市场",用户可以分享自己的工作流配置,你会如何设计模板格式和版本管理?
本章评分
4.5  / 5  (3 评分)

💬 留言讨论