第 34 章
Skill 组合模式:Pipeline 与 DAG 编排
第34章:Skill 组合模式:Pipeline 与 DAG 编排
单个 Skill 如同一块乐高积木——功能专一但能力有限。真正的力量来自于将多个 Skill 有机组合,形成能完成复杂任务的工作流。本章深入探讨 Hermes Agent 的 Skill 组合模式,从最简单的顺序 Pipeline 到复杂的 DAG(有向无环图)编排,并通过一个"研究+写作+发布"完整实战案例展示如何构建生产级工作流。
34.1 组合模式总览
Hermes 支持四种 Skill 组合模式,覆盖从简单到复杂的所有业务场景:
┌──────────────────────────────────────────────────────────────┐
│ Skill 组合模式体系 │
├──────────────┬───────────────┬──────────────┬────────────────┤
│ 顺序 Pipeline │ 条件分支 │ 并行执行 │ DAG 编排 │
│ A → B → C │ if/else 路由 │ A ∥ B → C │ 有向无环图 │
│ 串行、简单 │ 动态路由 │ 并发提速 │ 复杂依赖关系 │
└──────────────┴───────────────┴──────────────┴────────────────┘
模式选择指南
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 数据处理流水线 | 顺序 Pipeline | 步骤间有严格依赖 |
| 根据内容分类处理 | 条件分支 | 需要动态路由 |
| 多数据源并发获取 | 并行执行 | 步骤间无依赖,追求速度 |
| 复杂研究+写作流程 | DAG 编排 | 混合串并行,需精确控制 |
34.2 顺序 Pipeline 模式(A→B→C)
顺序 Pipeline 是最基础的组合模式:每个 Skill 的输出成为下一个 Skill 的输入,形成一条数据处理链。
基本架构
输入数据
↓
┌────────┐ ┌────────┐ ┌────────┐
│Skill A │ ──→ │Skill B │ ──→ │Skill C │
│(抓取) │ │(清洗) │ │(存储) │
└────────┘ └────────┘ └────────┘
↓
输出结果
YAML 声明式配置
# pipeline_config.yaml
name: "data-processing-pipeline"
version: "1.0.0"
description: "网页数据抓取、清洗、存储流水线"
pipeline:
type: sequential
steps:
- id: fetch
skill: web-fetcher-skill
version: "^2.0.0"
config:
timeout: 30
retry: 3
user_agent: "HermesBot/1.0"
input_mapping:
url: "$.pipeline_input.url"
- id: clean
skill: html-cleaner-skill
version: "^1.5.0"
config:
remove_scripts: true
extract_main_content: true
input_mapping:
raw_html: "$.steps.fetch.output.html"
- id: store
skill: database-writer-skill
version: "^3.0.0"
config:
table: "scraped_content"
batch_size: 100
input_mapping:
content: "$.steps.clean.output.text"
metadata: "$.steps.fetch.output.metadata"
on_failure:
strategy: "stop" # stop / continue / retry
notify: "[email protected]"
Python 实现:Pipeline 执行引擎
# hermes/orchestration/pipeline.py
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Callable
import time
import logging
logger = logging.getLogger(__name__)
@dataclass
class StepResult:
"""单个步骤的执行结果"""
step_id: str
success: bool
output: Any
error: Optional[Exception] = None
duration_ms: float = 0.0
retry_count: int = 0
@dataclass
class PipelineContext:
"""Pipeline 执行上下文,在步骤间传递数据"""
pipeline_input: Any
steps: Dict[str, StepResult] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def get_step_output(self, step_id: str) -> Any:
result = self.steps.get(step_id)
if result is None:
raise KeyError(f"步骤 '{step_id}' 尚未执行或不存在")
if not result.success:
raise RuntimeError(f"步骤 '{step_id}' 执行失败,无法获取输出")
return result.output
class SequentialPipeline:
"""顺序 Pipeline 执行引擎"""
def __init__(
self,
steps: List[Dict],
on_failure: str = "stop", # stop | continue | retry
max_retries: int = 3
):
self.steps = steps
self.on_failure = on_failure
self.max_retries = max_retries
async def execute(self, pipeline_input: Any) -> PipelineContext:
ctx = PipelineContext(pipeline_input=pipeline_input)
for step_config in self.steps:
step_id = step_config["id"]
skill_name = step_config["skill"]
logger.info(f"[Pipeline] 开始执行步骤: {step_id} ({skill_name})")
result = await self._execute_step_with_retry(step_config, ctx)
ctx.steps[step_id] = result
if not result.success:
logger.error(
f"[Pipeline] 步骤 {step_id} 失败: {result.error},"
f"耗时 {result.duration_ms:.1f}ms"
)
if self.on_failure == "stop":
raise PipelineExecutionError(
f"Pipeline 在步骤 '{step_id}' 处中止: {result.error}"
)
# on_failure == "continue":记录错误,继续执行
else:
logger.info(
f"[Pipeline] 步骤 {step_id} 成功,"
f"耗时 {result.duration_ms:.1f}ms"
)
return ctx
async def _execute_step_with_retry(
self,
step_config: Dict,
ctx: PipelineContext
) -> StepResult:
step_id = step_config["id"]
max_retries = step_config.get("retry", self.max_retries)
for attempt in range(max_retries + 1):
start = time.time()
try:
# 解析输入映射
step_input = self._resolve_inputs(
step_config.get("input_mapping", {}), ctx
)
# 获取并执行 Skill
skill = await self._load_skill(
step_config["skill"],
step_config.get("version", "*")
)
output = await skill.execute(
step_input,
config=step_config.get("config", {})
)
duration = (time.time() - start) * 1000
return StepResult(
step_id=step_id,
success=True,
output=output,
duration_ms=duration,
retry_count=attempt
)
except Exception as e:
duration = (time.time() - start) * 1000
if attempt < max_retries:
wait = 2 ** attempt # 指数退避
logger.warning(
f"步骤 {step_id} 第 {attempt+1} 次失败,"
f"{wait}s 后重试: {e}"
)
await asyncio.sleep(wait)
else:
return StepResult(
step_id=step_id,
success=False,
output=None,
error=e,
duration_ms=duration,
retry_count=attempt
)
def _resolve_inputs(self, mapping: Dict, ctx: PipelineContext) -> Dict:
"""解析 JSONPath 风格的输入映射"""
resolved = {}
for key, path in mapping.items():
if path.startswith("$.pipeline_input"):
field = path.replace("$.pipeline_input.", "")
resolved[key] = getattr(ctx.pipeline_input, field,
ctx.pipeline_input.get(field))
elif path.startswith("$.steps."):
parts = path.split(".")
step_id = parts[2]
output_field = ".".join(parts[4:]) if len(parts) > 4 else None
output = ctx.get_step_output(step_id)
resolved[key] = output.get(output_field) if output_field else output
return resolved
async def _load_skill(self, name: str, version: str):
from hermes.skill.registry import SkillRegistry
return await SkillRegistry.get(name, version)
class PipelineExecutionError(Exception):
pass
34.3 条件分支模式
条件分支允许根据运行时数据动态选择执行路径,是构建智能工作流的关键。
架构图
┌──────────┐
│ Skill A │
│ (分类器) │
└─────┬─────┘
│
┌──────────┴──────────┐
│ 根据分类结果路由 │
└──────┬──────────────┘
│
┌───────────┼───────────┐
↓ ↓ ↓
┌───────┐ ┌───────┐ ┌───────┐
│路径 A │ │路径 B │ │路径 C │
│(新闻) │ │(技术) │ │(其他) │
└───┬───┘ └───┬───┘ └───┬───┘
└──────────┴──────────┘
│
┌──────┴─────┐
│ 汇聚节点 │
│ (格式化) │
└────────────┘
YAML 配置与 Python 实现
# conditional_pipeline.yaml
pipeline:
type: conditional
steps:
- id: classify
skill: content-classifier-skill
- id: route
type: switch
condition: "$.steps.classify.output.category"
cases:
"news":
- id: process_news
skill: news-processor-skill
"technical":
- id: process_tech
skill: tech-analyzer-skill
"default":
- id: process_generic
skill: generic-processor-skill
- id: format
skill: output-formatter-skill
depends_on: ["route"] # 等待分支汇聚
# hermes/orchestration/conditional.py
from typing import Any, Callable, Dict, List
class ConditionalStep:
"""条件分支步骤"""
def __init__(
self,
condition: Callable[[Any], str],
branches: Dict[str, List],
default_branch: str = "default"
):
self.condition = condition
self.branches = branches
self.default_branch = default_branch
async def execute(self, ctx: 'PipelineContext') -> 'PipelineContext':
# 评估条件
branch_key = self.condition(ctx)
# 选择分支
branch_steps = self.branches.get(
branch_key,
self.branches.get(self.default_branch, [])
)
if not branch_steps:
raise ValueError(f"未找到分支: {branch_key}")
logger.info(f"[Conditional] 路由到分支: {branch_key}")
# 执行选中分支
branch_pipeline = SequentialPipeline(branch_steps)
return await branch_pipeline.execute(ctx.pipeline_input)
34.4 并行执行模式
当多个步骤彼此独立时,并行执行可以显著缩短总体时间。
并行执行架构
┌─────────────────┐
│ 输入数据 │
└────────┬────────┘
│ 分发
┌────────────┼────────────┐
↓ ↓ ↓
┌─────────┐ ┌─────────┐ ┌─────────┐
│搜索引擎A│ │搜索引擎B│ │搜索引擎C│
│(Google) │ │(Bing) │ │(DuckDGo)│
└────┬────┘ └────┬────┘ └────┬────┘
└────────────┼────────────┘
│ 汇聚
┌────────┴────────┐
│ 结果合并器 │
└─────────────────┘
Python 实现:并行执行引擎
# hermes/orchestration/parallel.py
import asyncio
from typing import Any, Dict, List, Optional
class ParallelExecutor:
"""并行 Skill 执行器"""
def __init__(
self,
max_concurrency: int = 10,
timeout: float = 60.0
):
self.max_concurrency = max_concurrency
self.timeout = timeout
self._semaphore = asyncio.Semaphore(max_concurrency)
async def execute_all(
self,
steps: List[Dict],
ctx: 'PipelineContext',
fail_fast: bool = True
) -> Dict[str, StepResult]:
"""并行执行所有步骤,返回结果字典"""
tasks = {
step["id"]: asyncio.create_task(
self._execute_with_semaphore(step, ctx)
)
for step in steps
}
results = {}
try:
done = await asyncio.wait_for(
asyncio.gather(*tasks.values(), return_exceptions=True),
timeout=self.timeout
)
for step_id, result in zip(tasks.keys(), done):
if isinstance(result, Exception):
results[step_id] = StepResult(
step_id=step_id,
success=False,
output=None,
error=result
)
if fail_fast:
# 取消其他任务
for tid, task in tasks.items():
if tid != step_id:
task.cancel()
raise result
else:
results[step_id] = result
except asyncio.TimeoutError:
for task in tasks.values():
task.cancel()
raise TimeoutError(f"并行执行超时(>{self.timeout}s)")
return results
async def _execute_with_semaphore(self, step, ctx):
async with self._semaphore:
return await self._execute_step(step, ctx)
async def execute_map_reduce(
self,
map_steps: List[Dict],
reduce_step: Dict,
ctx: 'PipelineContext'
) -> StepResult:
"""Map-Reduce 模式:并行 Map,然后 Reduce"""
# Map 阶段
map_results = await self.execute_all(map_steps, ctx)
# 准备 Reduce 输入
reduce_input = {
step_id: result.output
for step_id, result in map_results.items()
if result.success
}
# Reduce 阶段
return await self._execute_step(reduce_step, ctx, extra_input=reduce_input)
34.5 DAG 编排:有向无环图
DAG(Directed Acyclic Graph,有向无环图)是最强大的编排模式,支持任意复杂的依赖关系,同时自动实现最优的并行化。
DAG 的核心概念
节点(Node)= 一个 Skill 执行步骤
边(Edge) = 数据依赖关系(A→B 表示 B 需要 A 的输出)
DAG 调度器会:
1. 找出所有"入度为0"的节点(无依赖),并行启动
2. 某节点完成后,检查其后继节点是否所有依赖都已完成
3. 是则立即启动该后继节点
4. 重复直到所有节点完成
DAG 执行引擎实现
# hermes/orchestration/dag.py
import asyncio
from typing import Any, Dict, List, Set, Optional
from collections import defaultdict
class DAGOrchestrator:
"""
DAG 编排引擎
支持:
- 自动并行化无依赖节点
- 依赖就绪后立即启动
- 失败节点的级联取消
- 完整的执行状态追踪
"""
def __init__(self, dag_config: Dict):
self.steps = {s["id"]: s for s in dag_config["steps"]}
self.dependencies = self._parse_dependencies()
self.reverse_deps = self._build_reverse_deps()
def _parse_dependencies(self) -> Dict[str, Set[str]]:
"""解析每个节点的依赖集"""
deps = {}
for step_id, step in self.steps.items():
deps[step_id] = set(step.get("depends_on", []))
return deps
def _build_reverse_deps(self) -> Dict[str, Set[str]]:
"""构建反向依赖图(节点→其后继节点集)"""
reverse = defaultdict(set)
for step_id, deps in self.dependencies.items():
for dep in deps:
reverse[dep].add(step_id)
return dict(reverse)
async def execute(self, pipeline_input: Any) -> 'PipelineContext':
ctx = PipelineContext(pipeline_input=pipeline_input)
# 追踪状态
completed: Set[str] = set()
failed: Set[str] = set()
in_progress: Dict[str, asyncio.Task] = {}
# 找到初始可执行节点(无依赖)
ready = {
sid for sid, deps in self.dependencies.items()
if len(deps) == 0
}
while ready or in_progress:
# 启动所有就绪节点
for step_id in ready:
task = asyncio.create_task(
self._execute_step(step_id, ctx)
)
in_progress[step_id] = task
ready.clear()
if not in_progress:
break
# 等待任意一个节点完成
done_tasks, _ = await asyncio.wait(
in_progress.values(),
return_when=asyncio.FIRST_COMPLETED
)
for task in done_tasks:
# 找到对应的 step_id
step_id = next(
sid for sid, t in in_progress.items() if t == task
)
del in_progress[step_id]
try:
result = task.result()
ctx.steps[step_id] = result
completed.add(step_id)
# 检查后继节点是否就绪
for successor in self.reverse_deps.get(step_id, set()):
if self._is_ready(successor, completed, failed):
ready.add(successor)
except Exception as e:
failed.add(step_id)
logger.error(f"DAG 节点 {step_id} 失败: {e}")
# 级联取消依赖于失败节点的所有后继
self._cascade_cancel(step_id, in_progress, failed)
if failed:
raise DAGExecutionError(f"DAG 执行失败,失败节点: {failed}")
return ctx
def _is_ready(self, step_id: str, completed: Set, failed: Set) -> bool:
"""检查节点的所有依赖是否已完成"""
deps = self.dependencies[step_id]
if any(dep in failed for dep in deps):
return False # 有依赖失败,不启动
return all(dep in completed for dep in deps)
def _cascade_cancel(
self,
failed_id: str,
in_progress: Dict,
failed: Set
):
"""级联取消所有依赖于失败节点的后继"""
to_cancel = set()
queue = list(self.reverse_deps.get(failed_id, set()))
while queue:
node = queue.pop(0)
if node not in to_cancel:
to_cancel.add(node)
queue.extend(self.reverse_deps.get(node, set()))
for node in to_cancel:
if node in in_progress:
in_progress[node].cancel()
failed.add(node)
logger.warning(f"因 {failed_id} 失败,级联取消: {node}")
class DAGExecutionError(Exception):
pass
34.6 实战:构建"研究+写作+发布"三段 Skill 流水线
工作流架构设计
┌──────────────────────────────────────┐
│ 研究+写作+发布工作流 │
└──────────────────────────────────────┘
│
┌───────────────────────┼───────────────────────┐
│ 阶段一:研究(并行) │
│ │
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ web-search │ │ arxiv-search│ │ news-fetcher │ │
│ (网页搜索) │ │ (论文搜索) │ │ (新闻抓取) │ │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
└────────────────┼────────────────┘ │
↓ │
┌─────────────────────────────────┐ │
│ research-synthesizer │ │
│ (综合分析,合并三路研究结果) │ │
└──────────────────┬──────────────┘ │
┌──────────────────┼──────────────────────────┐
│ 阶段二:写作(串行) │
│ ↓ │
│ ┌─────────────────────────┐ │
│ │ outline-generator │ │
│ │ (生成文章大纲) │ │
│ └─────────────┬───────────┘ │
│ ↓ │
│ ┌─────────────────────────┐ │
│ │ content-writer │ │
│ │ (撰写正文) │ │
│ └─────────────┬───────────┘ │
│ ↓ │
│ ┌─────────────────────────┐ │
│ │ quality-checker │ │
│ │ (质量审核) │ │
│ └─────────────┬───────────┘ │
└──────────────────┼──────────────────────────┘
┌──────────────────┼──────────────────────────┐
│ 阶段三:发布(并行+串行混合) │
│ ↓ │
│ ┌─────────────────────────┐ │
│ │ seo-optimizer │ │
│ │ (SEO 优化) │ │
│ └──────┬──────────┬────────┘ │
│ ↓ ↓ │
│ ┌──────────┐ ┌──────────┐ │
│ │cms-poster │ │social- │ │
│ │(发布CMS) │ │share │ │
│ └──────────┘ │(社交分享) │ │
│ └──────────┘ │
└──────────────────────────────────────────────┘
完整 DAG 配置
# research_write_publish.yaml
name: "research-write-publish"
version: "1.0.0"
description: "AI 辅助内容创作完整流水线"
inputs:
topic: string
target_audience: string
publish_platforms: list
dag:
steps:
# ===== 阶段一:并行研究 =====
- id: web_search
skill: web-search-skill
version: "^2.0.0"
config:
engines: ["google", "bing"]
max_results: 20
language: "zh-CN"
inputs:
query: "$.inputs.topic"
- id: arxiv_search
skill: arxiv-search-skill
version: "^1.0.0"
config:
max_papers: 10
sort_by: "relevance"
inputs:
query: "$.inputs.topic"
- id: news_fetch
skill: news-fetcher-skill
version: "^1.2.0"
config:
days_back: 30
min_relevance: 0.7
inputs:
topic: "$.inputs.topic"
# 综合研究结果(依赖三个并行搜索)
- id: synthesize
skill: research-synthesizer-skill
version: "^2.0.0"
depends_on: ["web_search", "arxiv_search", "news_fetch"]
inputs:
web_results: "$.steps.web_search.output"
arxiv_results: "$.steps.arxiv_search.output"
news_results: "$.steps.news_fetch.output"
topic: "$.inputs.topic"
# ===== 阶段二:串行写作 =====
- id: outline
skill: outline-generator-skill
version: "^1.0.0"
depends_on: ["synthesize"]
config:
max_sections: 8
style: "detailed"
inputs:
research_summary: "$.steps.synthesize.output.summary"
audience: "$.inputs.target_audience"
- id: write
skill: content-writer-skill
version: "^3.0.0"
depends_on: ["outline"]
config:
model: "hermes-pro"
tone: "professional"
min_words: 2000
inputs:
outline: "$.steps.outline.output.sections"
research: "$.steps.synthesize.output.full_report"
- id: quality_check
skill: quality-checker-skill
version: "^1.5.0"
depends_on: ["write"]
config:
checks: ["grammar", "factual", "plagiarism", "seo-basics"]
min_score: 0.85
inputs:
content: "$.steps.write.output.article"
# ===== 阶段三:并行发布 =====
- id: seo_optimize
skill: seo-optimizer-skill
version: "^1.0.0"
depends_on: ["quality_check"]
config:
target_keywords_count: 5
meta_description_length: 160
inputs:
article: "$.steps.quality_check.output.approved_content"
topic: "$.inputs.topic"
- id: publish_cms
skill: cms-publisher-skill
version: "^2.0.0"
depends_on: ["seo_optimize"]
config:
cms: "wordpress"
status: "draft"
inputs:
content: "$.steps.seo_optimize.output.article"
seo_metadata: "$.steps.seo_optimize.output.metadata"
- id: social_share
skill: social-share-skill
version: "^1.0.0"
depends_on: ["seo_optimize"]
config:
platforms: "$.inputs.publish_platforms"
inputs:
title: "$.steps.seo_optimize.output.title"
summary: "$.steps.seo_optimize.output.description"
url: "$.steps.publish_cms.output.url"
运行工作流
# run_pipeline.py
import asyncio
from hermes.orchestration.dag import DAGOrchestrator
from hermes.orchestration.loader import load_dag_config
async def main():
# 加载 DAG 配置
config = load_dag_config("research_write_publish.yaml")
# 创建编排器
orchestrator = DAGOrchestrator(config)
# 执行
result = await orchestrator.execute({
"topic": "大型语言模型在医疗诊断中的应用",
"target_audience": "医疗从业者和AI研究员",
"publish_platforms": ["twitter", "linkedin"]
})
# 获取最终结果
print("文章已发布!")
print(f"CMS URL: {result.steps['publish_cms'].output['url']}")
print(f"总耗时: {result.metadata.get('total_duration_ms', 0):.1f}ms")
asyncio.run(main())
34.7 错误传播与补偿机制
错误处理策略对比
| 策略 | 描述 | 适用场景 |
|---|---|---|
| Fail-Fast | 遇到错误立即停止整个 Pipeline | 严格数据一致性要求 |
| Continue | 记录错误,跳过失败步骤继续 | 非关键步骤可容忍失败 |
| Retry | 按退避策略重试 | 网络超时、临时错误 |
| Compensate | 执行补偿操作撤销已完成步骤 | 事务性工作流 |
| Circuit-Break | 熔断机制,避免雪崩 | 高并发、外部依赖不稳定 |
Saga 补偿模式实现
# hermes/orchestration/saga.py
from typing import Callable, Any, List
import logging
logger = logging.getLogger(__name__)
class SagaStep:
"""Saga 事务步骤,包含正向操作和补偿操作"""
def __init__(
self,
name: str,
execute: Callable,
compensate: Callable
):
self.name = name
self.execute = execute
self.compensate = compensate
class SagaOrchestrator:
"""
Saga 事务编排器
如果第 N 步失败,自动按逆序执行第 1~N-1 步的补偿操作。
适用于需要事务一致性的工作流(如:付款+发邮件+更新库存)。
"""
def __init__(self, steps: List[SagaStep]):
self.steps = steps
self.completed = []
async def execute(self, initial_data: Any) -> Any:
data = initial_data
self.completed = []
for step in self.steps:
try:
logger.info(f"[Saga] 执行: {step.name}")
data = await step.execute(data)
self.completed.append(step)
except Exception as e:
logger.error(f"[Saga] 步骤 {step.name} 失败: {e},开始补偿...")
await self._compensate()
raise SagaRollbackError(
f"Saga 在 {step.name} 失败,已回滚所有已完成步骤"
) from e
return data
async def _compensate(self):
"""按逆序执行补偿操作"""
for step in reversed(self.completed):
try:
logger.info(f"[Saga] 补偿: {step.name}")
await step.compensate()
except Exception as e:
# 补偿失败需要告警,但继续补偿其他步骤
logger.critical(f"[Saga] 补偿 {step.name} 失败(需人工介入): {e}")
class SagaRollbackError(Exception):
pass
本章小结
本章系统介绍了 Hermes Skill 的四种组合模式:
- 顺序 Pipeline(A→B→C):简单直接,适合有明确数据流向的处理链
- 条件分支:通过运行时数据动态路由,实现智能工作流决策
- 并行执行:利用
asyncio并发处理无依赖步骤,大幅缩短总时间 - DAG 编排:最强大的模式,自动计算最优并行度,支持复杂依赖关系
通过"研究+写作+发布"实战案例,展示了如何将多种模式组合使用,构建生产级内容创作工作流。
思考题
- 在 DAG 执行时,如何实现"进度报告"功能,让用户实时看到每个节点的执行状态?
- 如果某个步骤的执行时间严重超出预期(例如 web_search 花了 5 分钟),你会如何设计超时和熔断机制?
- 条件分支的条件函数(condition)是否应该允许调用 LLM?这会带来什么问题?
- 设计一个"Skill Pipeline 模板市场",用户可以分享自己的工作流配置,你会如何设计模板格式和版本管理?