复杂工作流:循环、并行分支与错误恢复
第10章:复杂工作流——循环、并行分支与错误恢复
当业务场景超出简单的线性流程时,循环处理批量数据、并行加速多路任务、健壮的错误恢复机制是区分玩具工作流和生产级系统的关键。
本章导读
在实际业务中,你很快会遇到单一线性工作流无法满足的场景:
- 需要处理一个文档列表(10份合同、100条评论、500行数据),每份都需要 AI 分析
- 同时调用多个 LLM 或数据源,取最快响应(竞速)或汇总所有结果
- 某个步骤可能失败(外部 API 超时、LLM 输出格式错误),需要自动重试或走备用方案
这些场景分别对应工作流的三个进阶能力:循环节点(Loop)、并行分支(Parallel Branch)、错误恢复(Error Recovery)。
本章深入讲解:
- 迭代节点与循环节点的区别和适用场景
- 并行分支的设计模式:并行-汇总、竞速、扇出-扇入
- 错误处理策略:重试、降级、异常分支
- 生产环境中的性能优化和资源控制
Level 1:基础认知(1-3 年经验)
1.1 循环节点:对列表做批处理
**迭代节点(Iteration Node)**是 Dify 中处理列表数据的核心节点。它接收一个数组,对每个元素执行相同的子流程,最终汇总所有结果。
类比:迭代节点就像一个 for 循环,把同样的处理逻辑应用到列表中的每个元素。
配置步骤:
- 添加迭代节点到工作流
- 在"迭代数组变量"中选择要遍历的变量(必须是数组类型)
- 在迭代节点内部(子图)添加处理逻辑
- 迭代节点的输出是所有单次迭代结果组成的数组
实战示例:批量翻译
开始节点(输入:documents = ["文档1内容", "文档2内容", "文档3内容"])
↓
迭代节点(遍历 documents 数组)
├─ [内部子图]
│ LLM节点:翻译当前文档(使用 {{#item#}} 引用当前元素)
│ → 输出:translated_text
↓
结束节点(输出:迭代节点.output = ["翻译1", "翻译2", "翻译3"])
关键变量:迭代节点内部使用 {{#item#}} 引用当前迭代的元素,使用 {{#index#}} 引用当前索引(从 0 开始)。
1.2 并行分支:同时执行多个任务
**并行分支(Parallel Branch)**让工作流同时执行多个分支,然后等待所有分支完成后汇总结果。
类比:如果说条件分支(IF/ELSE)是"走左边或走右边",那并行分支就是"左边和右边同时走"。
使用场景:
- 同时用 GPT-4o 和 Claude 3.5 处理同一个问题,比较结果
- 同时调用多个 API(天气、股价、新闻),汇总数据
- 同时对一份文档做摘要、提取关键词、生成标签,然后合并
配置方法:
- 在工作流中添加"并行"节点(开始并行)
- 从并行节点拉出多条线,分别连接不同的处理节点
- 添加"汇聚"节点(结束并行),连接所有并行分支的终点
注意:Dify 中的并行节点(Parallel)是工作流的结构设计,而非独立的节点类型。通过将多个节点从同一个父节点拖出,即可自动形成并行执行。
1.3 理解错误处理的重要性
生产环境中,工作流面临的常见失败场景:
| 场景 | 失败原因 | 频率 |
|---|---|---|
| HTTP 请求节点 | 外部 API 超时、限速、服务不可用 | 每日数次 |
| LLM 节点 | 模型过载、输出格式不符、上下文超长 | 每周数次 |
| 代码节点 | 类型错误、除零错误、解析失败 | 每月数次 |
| 知识库节点 | 向量数据库连接失败 | 极少但严重 |
没有错误处理的工作流,一个节点失败会导致整个工作流失败,用户看到的是冷冰冰的错误信息。
Dify 的三种错误处理模式(在每个节点的设置中配置):
- 终止工作流(默认):节点失败时立即停止整个工作流
- 继续执行(使用默认值):节点失败时,用预设的默认值代替输出,继续执行下游节点
- 使用备用方案:节点失败时,走预先定义的备用节点链
1.4 构建一个包含循环的实用工作流
场景:批量分析产品评论,提取情感和关键词
工作流结构:
开始节点
输入:review_list(数组,每个元素是一条评论文本)
输入:product_name(文本)
↓
迭代节点(遍历 review_list)
内部子图:
├─ LLM节点:分析单条评论
│ 提示词:
│ """
│ 分析以下关于{{start.product_name}}的用户评论:
│ {{#item#}}
│
│ 输出 JSON:
│ {"sentiment": "positive/negative/neutral",
│ "score": 1-5,
│ "keywords": ["词1", "词2"]}
│ """
│ ↓
└─ 代码节点:解析 JSON 输出
def main(llm_output: str) -> dict:
return json.loads(extract_json(llm_output))
↓
代码节点:汇总分析结果
def main(iteration_results: list) -> dict:
sentiments = [r["sentiment"] for r in iteration_results]
avg_score = sum(r["score"] for r in iteration_results) / len(iteration_results)
return {
"positive_rate": sentiments.count("positive") / len(sentiments),
"avg_score": round(avg_score, 2),
"negative_reviews": [
r for r in iteration_results if r["sentiment"] == "negative"
]
}
↓
结束节点(输出分析汇总)
Level 2:机制深解(3-5 年经验)
2.1 迭代节点的性能特征与并发控制
迭代节点默认是顺序执行:先处理元素 0,再处理元素 1,以此类推。
对于 100 个元素,每个 LLM 调用 2 秒,顺序执行需要 200 秒。
并发迭代(Dify v0.10+):
迭代节点支持配置并发数(Concurrency),同时处理多个元素:
并发数 = 1(默认):顺序执行,100个元素 × 2s = 200s
并发数 = 5:5个并发,100个元素 / 5 × 2s = 40s(提速5倍)
并发数 = 10:10个并发,100个元素 / 10 × 2s = 20s(提速10倍)
并发的代价:
- 更多的并发 = 更高的 API 费用速率(token 消耗加速)
- 触发 LLM API 的速率限制(Rate Limit)
- 如果有数据库写入操作,需要处理并发冲突
推荐配置:
- 小批量(< 20 个):并发数 = 元素数量(全并行)
- 中批量(20-100 个):并发数 = 5-10
- 大批量(> 100 个):并发数 = 3-5(防止触发速率限制)
在 Dify 中配置并发:点击迭代节点 → 设置 → 并发数量。
2.2 并行分支的高级模式
模式一:聚合(Fan-out / Fan-in)
单一输入
↓
/ | \ (并行分支)
A B C
\ | / (汇聚节点)
↓
合并结果
实现:在汇聚后的代码节点中,分别引用各分支的输出:
def main(
analysis_a: dict, # 来自分支 A
analysis_b: dict, # 来自分支 B
analysis_c: dict, # 来自分支 C
) -> dict:
# 融合三路分析结果
return {
"combined_score": (
analysis_a["score"] * 0.4 +
analysis_b["score"] * 0.3 +
analysis_c["score"] * 0.3
),
"all_insights": (
analysis_a["insights"] +
analysis_b["insights"] +
analysis_c["insights"]
)
}
模式二:竞速(Race / First-Wins)
让多个 LLM 同时处理同一个任务,取最快的结果:
# 需要在代码节点中实现
import asyncio
import aiohttp
async def race_llm_calls(prompt: str, models: list) -> dict:
"""调用多个模型,返回最快的结果"""
tasks = [call_llm(prompt, model) for model in models]
# asyncio.FIRST_COMPLETED:第一个完成就返回
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
# 取消未完成的任务
for task in pending:
task.cancel()
return done.pop().result()
注意:代码节点不能发起网络请求,此模式需要在 Dify 外部封装为 HTTP 接口,通过 HTTP 节点调用。
模式三:条件并行
根据输入条件动态决定启动哪些并行分支(在 Dify 中需要结合 IF/ELSE + 并行来实现):
IF query 涉及财务数据?
→ 启动 财务数据库查询
IF query 涉及产品信息?
→ 启动 产品知识库查询
IF query 涉及用户数据?
→ 启动 CRM 系统查询
↓
汇聚所有已启动的查询结果
2.3 错误恢复的实现模式
重试机制(通过循环实现):
尝试节点(HTTP请求)
↓
判断节点(IF: 请求成功?)
├── 成功 → 继续主流程
└── 失败 → 重试计数器 +1
IF 重试次数 < 3
→ 等待一秒(通过代码节点实现)
→ 回到尝试节点
ELSE
→ 走降级分支
在代码节点中实现指数退避重试:
import time
import requests
def main(
url: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> dict:
"""带指数退避的 HTTP 请求"""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return {
"success": True,
"data": response.json(),
"attempts": attempt + 1
}
except Exception as e:
if attempt < max_retries - 1:
# 指数退避:1s, 2s, 4s, ...
wait_time = base_delay * (2 ** attempt)
time.sleep(wait_time)
else:
return {
"success": False,
"error": str(e),
"attempts": attempt + 1
}
降级策略:
主路径:查询实时数据(HTTP → 外部 API)
↓ 失败时
降级路径:查询缓存数据(知识库检索)
↓ 仍然失败
最终降级:返回预设的兜底回答
在 Dify 中通过节点的"错误处理"设置 + 条件分支组合实现。
2.4 工作流状态持久化
对于长时间运行的工作流(如批处理数百个文档),需要实现状态持久化,以便失败时能从断点恢复:
外部状态存储方案:
# 在工作流开始时,记录进度到 Redis/数据库
def main(
documents: list,
job_id: str,
redis_url: str
) -> dict:
import redis
import json
r = redis.from_url(redis_url)
# 检查是否有未完成的任务(断点续传)
progress_key = f"workflow_progress:{job_id}"
existing_progress = r.get(progress_key)
if existing_progress:
processed_ids = json.loads(existing_progress)
else:
processed_ids = []
# 过滤出未处理的文档
remaining = [
doc for i, doc in enumerate(documents)
if str(i) not in processed_ids
]
return {
"remaining_documents": remaining,
"already_processed": len(processed_ids),
"total": len(documents)
}
Level 3:源码与原理(5 年以上)
3.1 迭代节点的内部实现
Dify 迭代节点(Iteration Node)的核心逻辑:
# api/core/workflow/nodes/iteration/iteration_node.py
class IterationNode(BaseNode):
def _run(self, variable_pool: VariablePool) -> NodeRunResult:
# 获取要迭代的数组
iterator_list = variable_pool.get_any(
self.node_data.iterator_selector
)
if not isinstance(iterator_list, list):
raise ValueError("迭代变量必须是数组类型")
outputs = []
# 并发执行的核心
if self.node_data.is_parallel and self.node_data.parallel_nums > 1:
# 将列表分批,每批并发执行
batch_size = self.node_data.parallel_nums
for i in range(0, len(iterator_list), batch_size):
batch = iterator_list[i:i + batch_size]
batch_results = self._run_batch_concurrent(
batch, variable_pool, start_index=i
)
outputs.extend(batch_results)
else:
# 顺序执行
for index, item in enumerate(iterator_list):
result = self._run_single_iteration(
item, index, variable_pool
)
outputs.append(result)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={"output": outputs}
)
def _run_batch_concurrent(
self, batch: list, variable_pool: VariablePool, start_index: int
) -> list:
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(batch)
) as executor:
futures = {
executor.submit(
self._run_single_iteration,
item, start_index + i, variable_pool.copy()
): i
for i, item in enumerate(batch)
}
results = [None] * len(batch)
for future in concurrent.futures.as_completed(futures):
idx = futures[future]
results[idx] = future.result()
return results
关键设计:每个并发迭代都拿到变量池的一个副本(variable_pool.copy()),确保迭代间的数据隔离,避免并发写入冲突。
3.2 并行分支的执行模型
Dify 的并行分支在图引擎层面的实现:
class GraphEngine:
def _get_next_nodes(
self,
graph: WorkflowGraph,
completed_node_id: str,
result: NodeRunResult,
variable_pool: VariablePool
) -> list[str]:
"""确定下一步要执行的节点"""
outgoing_edges = graph.get_outgoing_edges(completed_node_id)
if len(outgoing_edges) == 1:
# 单一后继节点
return [outgoing_edges[0].target]
elif len(outgoing_edges) > 1:
# 多个后继节点:并行执行
# 所有后继节点同时加入执行队列
return [edge.target for edge in outgoing_edges]
return []
def _execute_parallel(
self,
nodes: list[str],
graph: WorkflowGraph,
variable_pool: VariablePool
) -> dict[str, NodeRunResult]:
"""并发执行多个节点分支"""
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_node = {
executor.submit(
self._execute_branch,
node_id, graph, variable_pool.copy()
): node_id
for node_id in nodes
}
results = {}
for future in concurrent.futures.as_completed(future_to_node):
node_id = future_to_node[future]
results[node_id] = future.result()
return results
合并点(Convergence Point):当所有并行分支到达一个共同的汇聚节点时,图引擎等待所有分支完成,再将所有结果合并到变量池中,继续后续执行。
3.3 错误传播链与异常上下文
Dify 工作流的错误处理涉及完整的异常上下文链:
@dataclass
class WorkflowError:
node_id: str
node_title: str
error_type: str # "timeout" | "type_error" | "api_error" | ...
error_message: str
error_traceback: str
timestamp: datetime
retry_count: int
# 错误传播路径
caused_by: Optional['WorkflowError'] = None
class ErrorHandler:
def handle_node_error(
self,
node: BaseNode,
error: Exception,
variable_pool: VariablePool
) -> NodeRunResult:
error_mode = node.node_data.error_handling_mode
if error_mode == ErrorHandlingMode.TERMINATE:
# 终止工作流,返回错误状态
raise WorkflowTerminateException(
f"节点 '{node.title}' 失败: {str(error)}"
)
elif error_mode == ErrorHandlingMode.CONTINUE_WITH_DEFAULT:
# 使用默认值继续
default_outputs = node.node_data.error_default_outputs
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
outputs=default_outputs,
error=WorkflowError(
node_id=node.id,
error_message=str(error),
...
)
)
elif error_mode == ErrorHandlingMode.FALLBACK:
# 切换到备用分支(在图引擎层面路由到备用节点)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED_WITH_FALLBACK,
error=WorkflowError(...)
)
Level 4:生产陷阱与决策(专家视角)
4.1 陷阱一:迭代节点的内存爆炸
问题:迭代处理 1000 个文档,每个文档内容 2000 字,LLM 输出 500 字。迭代节点的输出数组会包含 1000 个对象,在内存中可能达到数百 MB,导致工作流 OOM(内存溢出)。
症状:工作流在迭代到 500+ 个元素后突然失败,或变得极慢。
解决方案一:流式处理,实时写入存储
不把结果累积在迭代节点的输出数组中,而是在每次迭代内部直接写入数据库/对象存储:
# 迭代内部的代码节点
def main(
item: str,
index: int,
job_id: str,
db_url: str
) -> dict:
import psycopg2
import json
# 处理当前 item
result = process_item(item)
# 直接写入数据库,不返回到工作流变量池
conn = psycopg2.connect(db_url)
cur = conn.cursor()
cur.execute(
"INSERT INTO results (job_id, index, result) VALUES (%s, %s, %s)",
(job_id, index, json.dumps(result))
)
conn.commit()
# 只返回状态,不返回实际数据
return {"status": "ok", "index": index}
解决方案二:分批处理
把 1000 个任务拆分为每批 50 个,每批是一个独立的工作流调用,通过外部调度器(Python 脚本)协调:
def process_in_batches(all_documents: list, batch_size: int = 50):
for i in range(0, len(all_documents), batch_size):
batch = all_documents[i:i + batch_size]
response = requests.post(
"https://api.dify.ai/v1/workflows/run",
json={
"inputs": {"documents": batch},
"response_mode": "blocking"
}
)
print(f"批次 {i//batch_size + 1} 完成: {response.json()['status']}")
4.2 陷阱二:并行分支中的 Rate Limit 炸弹
场景:设计了 10 路并行,同时发起 10 个 LLM 调用。对于 GPT-4o,免费层的速率限制是 500 RPM(Requests Per Minute),付费用户是 5000 RPM。
问题:如果工作流被频繁触发(例如每 5 秒一次),10 路并行 × 每秒 0.2 次 = 2 次 LLM 调用/秒 = 120 次/分钟,看起来不多;但如果并行路数增加到 50 路,就可能触发 Rate Limit,导致整批失败。
监控和防御:
import time
from threading import Semaphore
# 使用信号量控制并发数
rate_limiter = Semaphore(10) # 最多 10 个并发 LLM 调用
def call_llm_with_rate_limit(prompt: str) -> str:
with rate_limiter:
try:
return call_llm(prompt)
except RateLimitError:
# 触发速率限制,等待后重试
time.sleep(60) # 等待速率限制窗口重置
return call_llm(prompt)
4.3 陷阱三:并行分支中的写入竞争
场景:3 个并行分支都需要向同一个 JSON 对象追加数据:
# 分支 A
results["category_a"] = analyze_category_a()
# 分支 B(与 A 并发)
results["category_b"] = analyze_category_b()
# 分支 C(与 A、B 并发)
results["category_c"] = analyze_category_c()
如果 results 是共享的可变对象,并发写入会产生竞争条件。
Dify 的解决方式:通过变量池副本(前面提到的 variable_pool.copy())隔离各分支的写操作。各分支只能写入自己的变量池副本,在汇聚节点时由引擎统一合并。
最佳实践:在汇聚节点的代码节点中显式合并各分支的结果,不要依赖隐式合并:
def main(
result_a: dict,
result_b: dict,
result_c: dict
) -> dict:
# 显式合并,清晰且可调试
return {
"category_a": result_a,
"category_b": result_b,
"category_c": result_c,
"total_score": (
result_a["score"] + result_b["score"] + result_c["score"]
) / 3
}
4.4 错误恢复策略决策矩阵
| 场景 | 推荐策略 | 原因 |
|---|---|---|
| 外部 API 偶发超时 | 指数退避重试(3次) | 临时问题,等待后多半能成功 |
| LLM 输出格式错误 | 重新生成(不同温度参数) | 重采样可以得到不同格式的输出 |
| 知识库连接失败 | 降级到通用 LLM(无 RAG) | 无知识库比直接失败更好 |
| 数据验证失败 | 直接终止 + 返回错误详情 | 无效数据没有重试意义 |
| 下游服务熔断 | 等待 5 分钟后恢复 | 防止雪崩效应 |
4.5 超大规模工作流的架构升级
当单个 Dify 工作流无法满足需求时(需处理百万量级数据、SLA 要求毫秒级响应),考虑:
模式一:Dify + 消息队列
Dify 工作流(触发端)
↓ 向 Kafka/RabbitMQ 发布消息
消息队列
↓ 消费者并发处理
Worker 集群(多个 Dify 工作流实例或直接 LLM 调用)
↓ 结果写入
数据存储
↓ 通知
Dify 工作流(汇总端)
模式二:Dify 作为编排层,核心计算外移
把计算密集型工作(向量搜索、模型推理)封装为独立微服务,Dify 工作流只做编排调用,通过 HTTP 节点调用这些服务。这样 Dify 保持轻量,核心服务可以独立扩缩容。
本章小结
复杂工作流的三个核心能力各有其用武之地:
迭代节点:批量处理列表数据的首选;注意并发数设置(防速率限制)和输出数组大小(防内存溢出)。
并行分支:同时执行多个独立任务的最佳方式;注意在汇聚节点显式合并结果,防止竞争条件。
错误恢复:根据失败原因选择策略——临时性错误用重试,持续性失败用降级,数据错误直接终止。
关键清单:
- 迭代节点的数组大小已评估(> 100 个元素建议流式写入存储)
- 并发数已根据 API 速率限制合理设置
- 每个可能失败的节点已配置错误处理模式
- 并行分支的汇聚节点有显式的结果合并逻辑
- 长时间运行的工作流有状态持久化机制
- 已测试节点失败时的工作流行为