第 10 章

复杂工作流:循环、并行分支与错误恢复

第10章:复杂工作流——循环、并行分支与错误恢复

当业务场景超出简单的线性流程时,循环处理批量数据、并行加速多路任务、健壮的错误恢复机制是区分玩具工作流和生产级系统的关键。

本章导读

在实际业务中,你很快会遇到单一线性工作流无法满足的场景:

这些场景分别对应工作流的三个进阶能力:循环节点(Loop)并行分支(Parallel Branch)错误恢复(Error Recovery)

本章深入讲解:


Level 1:基础认知(1-3 年经验)

1.1 循环节点:对列表做批处理

**迭代节点(Iteration Node)**是 Dify 中处理列表数据的核心节点。它接收一个数组,对每个元素执行相同的子流程,最终汇总所有结果。

类比:迭代节点就像一个 for 循环,把同样的处理逻辑应用到列表中的每个元素。

配置步骤

  1. 添加迭代节点到工作流
  2. 在"迭代数组变量"中选择要遍历的变量(必须是数组类型)
  3. 在迭代节点内部(子图)添加处理逻辑
  4. 迭代节点的输出是所有单次迭代结果组成的数组

实战示例:批量翻译

开始节点(输入:documents = ["文档1内容", "文档2内容", "文档3内容"])
  ↓
迭代节点(遍历 documents 数组)
  ├─ [内部子图]
  │   LLM节点:翻译当前文档(使用 {{#item#}} 引用当前元素)
  │   → 输出:translated_text
  ↓
结束节点(输出:迭代节点.output = ["翻译1", "翻译2", "翻译3"])

关键变量:迭代节点内部使用 {{#item#}} 引用当前迭代的元素,使用 {{#index#}} 引用当前索引(从 0 开始)。

1.2 并行分支:同时执行多个任务

**并行分支(Parallel Branch)**让工作流同时执行多个分支,然后等待所有分支完成后汇总结果。

类比:如果说条件分支(IF/ELSE)是"走左边或走右边",那并行分支就是"左边和右边同时走"。

使用场景

配置方法

  1. 在工作流中添加"并行"节点(开始并行)
  2. 从并行节点拉出多条线,分别连接不同的处理节点
  3. 添加"汇聚"节点(结束并行),连接所有并行分支的终点

注意:Dify 中的并行节点(Parallel)是工作流的结构设计,而非独立的节点类型。通过将多个节点从同一个父节点拖出,即可自动形成并行执行。

1.3 理解错误处理的重要性

生产环境中,工作流面临的常见失败场景:

场景 失败原因 频率
HTTP 请求节点 外部 API 超时、限速、服务不可用 每日数次
LLM 节点 模型过载、输出格式不符、上下文超长 每周数次
代码节点 类型错误、除零错误、解析失败 每月数次
知识库节点 向量数据库连接失败 极少但严重

没有错误处理的工作流,一个节点失败会导致整个工作流失败,用户看到的是冷冰冰的错误信息。

Dify 的三种错误处理模式(在每个节点的设置中配置):

  1. 终止工作流(默认):节点失败时立即停止整个工作流
  2. 继续执行(使用默认值):节点失败时,用预设的默认值代替输出,继续执行下游节点
  3. 使用备用方案:节点失败时,走预先定义的备用节点链

1.4 构建一个包含循环的实用工作流

场景:批量分析产品评论,提取情感和关键词

工作流结构

开始节点
  输入:review_list(数组,每个元素是一条评论文本)
  输入:product_name(文本)
    ↓
迭代节点(遍历 review_list)
  内部子图:
    ├─ LLM节点:分析单条评论
    │   提示词:
    │   """
    │   分析以下关于{{start.product_name}}的用户评论:
    │   {{#item#}}
    │   
    │   输出 JSON:
    │   {"sentiment": "positive/negative/neutral", 
    │    "score": 1-5, 
    │    "keywords": ["词1", "词2"]}
    │   """
    │    ↓
    └─ 代码节点:解析 JSON 输出
        def main(llm_output: str) -> dict:
            return json.loads(extract_json(llm_output))
    ↓
代码节点:汇总分析结果
  def main(iteration_results: list) -> dict:
      sentiments = [r["sentiment"] for r in iteration_results]
      avg_score = sum(r["score"] for r in iteration_results) / len(iteration_results)
      return {
          "positive_rate": sentiments.count("positive") / len(sentiments),
          "avg_score": round(avg_score, 2),
          "negative_reviews": [
              r for r in iteration_results if r["sentiment"] == "negative"
          ]
      }
    ↓
结束节点(输出分析汇总)

Level 2:机制深解(3-5 年经验)

2.1 迭代节点的性能特征与并发控制

迭代节点默认是顺序执行:先处理元素 0,再处理元素 1,以此类推。

对于 100 个元素,每个 LLM 调用 2 秒,顺序执行需要 200 秒。

并发迭代(Dify v0.10+):

迭代节点支持配置并发数(Concurrency),同时处理多个元素:

并发数 = 1(默认):顺序执行,100个元素 × 2s = 200s
并发数 = 5:5个并发,100个元素 / 5 × 2s = 40s(提速5倍)
并发数 = 10:10个并发,100个元素 / 10 × 2s = 20s(提速10倍)

并发的代价

推荐配置

在 Dify 中配置并发:点击迭代节点 → 设置 → 并发数量。

2.2 并行分支的高级模式

模式一:聚合(Fan-out / Fan-in)

单一输入
    ↓
  /  |  \     (并行分支)
A   B   C
  \  |  /     (汇聚节点)
    ↓
合并结果

实现:在汇聚后的代码节点中,分别引用各分支的输出:

def main(
    analysis_a: dict,   # 来自分支 A
    analysis_b: dict,   # 来自分支 B
    analysis_c: dict,   # 来自分支 C
) -> dict:
    # 融合三路分析结果
    return {
        "combined_score": (
            analysis_a["score"] * 0.4 +
            analysis_b["score"] * 0.3 +
            analysis_c["score"] * 0.3
        ),
        "all_insights": (
            analysis_a["insights"] +
            analysis_b["insights"] +
            analysis_c["insights"]
        )
    }

模式二:竞速(Race / First-Wins)

让多个 LLM 同时处理同一个任务,取最快的结果:

# 需要在代码节点中实现
import asyncio
import aiohttp

async def race_llm_calls(prompt: str, models: list) -> dict:
    """调用多个模型,返回最快的结果"""
    tasks = [call_llm(prompt, model) for model in models]
    
    # asyncio.FIRST_COMPLETED:第一个完成就返回
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )
    
    # 取消未完成的任务
    for task in pending:
        task.cancel()
    
    return done.pop().result()

注意:代码节点不能发起网络请求,此模式需要在 Dify 外部封装为 HTTP 接口,通过 HTTP 节点调用。

模式三:条件并行

根据输入条件动态决定启动哪些并行分支(在 Dify 中需要结合 IF/ELSE + 并行来实现):

IF query 涉及财务数据?
    → 启动 财务数据库查询
IF query 涉及产品信息?
    → 启动 产品知识库查询
IF query 涉及用户数据?
    → 启动 CRM 系统查询
    ↓
汇聚所有已启动的查询结果

2.3 错误恢复的实现模式

重试机制(通过循环实现):

尝试节点(HTTP请求)
    ↓
判断节点(IF: 请求成功?)
    ├── 成功 → 继续主流程
    └── 失败 → 重试计数器 +1
                 IF 重试次数 < 3
                   → 等待一秒(通过代码节点实现)
                   → 回到尝试节点
                 ELSE
                   → 走降级分支

在代码节点中实现指数退避重试

import time
import requests

def main(
    url: str,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> dict:
    """带指数退避的 HTTP 请求"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return {
                "success": True,
                "data": response.json(),
                "attempts": attempt + 1
            }
        except Exception as e:
            if attempt < max_retries - 1:
                # 指数退避:1s, 2s, 4s, ...
                wait_time = base_delay * (2 ** attempt)
                time.sleep(wait_time)
            else:
                return {
                    "success": False,
                    "error": str(e),
                    "attempts": attempt + 1
                }

降级策略

主路径:查询实时数据(HTTP → 外部 API)
         ↓ 失败时
降级路径:查询缓存数据(知识库检索)
         ↓ 仍然失败
最终降级:返回预设的兜底回答

在 Dify 中通过节点的"错误处理"设置 + 条件分支组合实现。

2.4 工作流状态持久化

对于长时间运行的工作流(如批处理数百个文档),需要实现状态持久化,以便失败时能从断点恢复:

外部状态存储方案

# 在工作流开始时,记录进度到 Redis/数据库
def main(
    documents: list,
    job_id: str,
    redis_url: str
) -> dict:
    import redis
    import json
    
    r = redis.from_url(redis_url)
    
    # 检查是否有未完成的任务(断点续传)
    progress_key = f"workflow_progress:{job_id}"
    existing_progress = r.get(progress_key)
    
    if existing_progress:
        processed_ids = json.loads(existing_progress)
    else:
        processed_ids = []
    
    # 过滤出未处理的文档
    remaining = [
        doc for i, doc in enumerate(documents)
        if str(i) not in processed_ids
    ]
    
    return {
        "remaining_documents": remaining,
        "already_processed": len(processed_ids),
        "total": len(documents)
    }

Level 3:源码与原理(5 年以上)

3.1 迭代节点的内部实现

Dify 迭代节点(Iteration Node)的核心逻辑:

# api/core/workflow/nodes/iteration/iteration_node.py

class IterationNode(BaseNode):
    def _run(self, variable_pool: VariablePool) -> NodeRunResult:
        # 获取要迭代的数组
        iterator_list = variable_pool.get_any(
            self.node_data.iterator_selector
        )
        
        if not isinstance(iterator_list, list):
            raise ValueError("迭代变量必须是数组类型")
        
        outputs = []
        
        # 并发执行的核心
        if self.node_data.is_parallel and self.node_data.parallel_nums > 1:
            # 将列表分批,每批并发执行
            batch_size = self.node_data.parallel_nums
            for i in range(0, len(iterator_list), batch_size):
                batch = iterator_list[i:i + batch_size]
                batch_results = self._run_batch_concurrent(
                    batch, variable_pool, start_index=i
                )
                outputs.extend(batch_results)
        else:
            # 顺序执行
            for index, item in enumerate(iterator_list):
                result = self._run_single_iteration(
                    item, index, variable_pool
                )
                outputs.append(result)
        
        return NodeRunResult(
            status=WorkflowNodeExecutionStatus.SUCCEEDED,
            outputs={"output": outputs}
        )
    
    def _run_batch_concurrent(
        self, batch: list, variable_pool: VariablePool, start_index: int
    ) -> list:
        import concurrent.futures
        
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=len(batch)
        ) as executor:
            futures = {
                executor.submit(
                    self._run_single_iteration,
                    item, start_index + i, variable_pool.copy()
                ): i
                for i, item in enumerate(batch)
            }
            
            results = [None] * len(batch)
            for future in concurrent.futures.as_completed(futures):
                idx = futures[future]
                results[idx] = future.result()
        
        return results

关键设计:每个并发迭代都拿到变量池的一个副本variable_pool.copy()),确保迭代间的数据隔离,避免并发写入冲突。

3.2 并行分支的执行模型

Dify 的并行分支在图引擎层面的实现:

class GraphEngine:
    def _get_next_nodes(
        self,
        graph: WorkflowGraph,
        completed_node_id: str,
        result: NodeRunResult,
        variable_pool: VariablePool
    ) -> list[str]:
        """确定下一步要执行的节点"""
        
        outgoing_edges = graph.get_outgoing_edges(completed_node_id)
        
        if len(outgoing_edges) == 1:
            # 单一后继节点
            return [outgoing_edges[0].target]
        
        elif len(outgoing_edges) > 1:
            # 多个后继节点:并行执行
            # 所有后继节点同时加入执行队列
            return [edge.target for edge in outgoing_edges]
        
        return []
    
    def _execute_parallel(
        self,
        nodes: list[str],
        graph: WorkflowGraph,
        variable_pool: VariablePool
    ) -> dict[str, NodeRunResult]:
        """并发执行多个节点分支"""
        import concurrent.futures
        
        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_node = {
                executor.submit(
                    self._execute_branch,
                    node_id, graph, variable_pool.copy()
                ): node_id
                for node_id in nodes
            }
            
            results = {}
            for future in concurrent.futures.as_completed(future_to_node):
                node_id = future_to_node[future]
                results[node_id] = future.result()
        
        return results

合并点(Convergence Point):当所有并行分支到达一个共同的汇聚节点时,图引擎等待所有分支完成,再将所有结果合并到变量池中,继续后续执行。

3.3 错误传播链与异常上下文

Dify 工作流的错误处理涉及完整的异常上下文链:

@dataclass
class WorkflowError:
    node_id: str
    node_title: str
    error_type: str           # "timeout" | "type_error" | "api_error" | ...
    error_message: str
    error_traceback: str
    timestamp: datetime
    retry_count: int
    
    # 错误传播路径
    caused_by: Optional['WorkflowError'] = None

class ErrorHandler:
    def handle_node_error(
        self,
        node: BaseNode,
        error: Exception,
        variable_pool: VariablePool
    ) -> NodeRunResult:
        
        error_mode = node.node_data.error_handling_mode
        
        if error_mode == ErrorHandlingMode.TERMINATE:
            # 终止工作流,返回错误状态
            raise WorkflowTerminateException(
                f"节点 '{node.title}' 失败: {str(error)}"
            )
        
        elif error_mode == ErrorHandlingMode.CONTINUE_WITH_DEFAULT:
            # 使用默认值继续
            default_outputs = node.node_data.error_default_outputs
            return NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED,
                outputs=default_outputs,
                error=WorkflowError(
                    node_id=node.id,
                    error_message=str(error),
                    ...
                )
            )
        
        elif error_mode == ErrorHandlingMode.FALLBACK:
            # 切换到备用分支(在图引擎层面路由到备用节点)
            return NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED_WITH_FALLBACK,
                error=WorkflowError(...)
            )

Level 4:生产陷阱与决策(专家视角)

4.1 陷阱一:迭代节点的内存爆炸

问题:迭代处理 1000 个文档,每个文档内容 2000 字,LLM 输出 500 字。迭代节点的输出数组会包含 1000 个对象,在内存中可能达到数百 MB,导致工作流 OOM(内存溢出)。

症状:工作流在迭代到 500+ 个元素后突然失败,或变得极慢。

解决方案一:流式处理,实时写入存储

不把结果累积在迭代节点的输出数组中,而是在每次迭代内部直接写入数据库/对象存储:

# 迭代内部的代码节点
def main(
    item: str,
    index: int,
    job_id: str,
    db_url: str
) -> dict:
    import psycopg2
    import json
    
    # 处理当前 item
    result = process_item(item)
    
    # 直接写入数据库,不返回到工作流变量池
    conn = psycopg2.connect(db_url)
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO results (job_id, index, result) VALUES (%s, %s, %s)",
        (job_id, index, json.dumps(result))
    )
    conn.commit()
    
    # 只返回状态,不返回实际数据
    return {"status": "ok", "index": index}

解决方案二:分批处理

把 1000 个任务拆分为每批 50 个,每批是一个独立的工作流调用,通过外部调度器(Python 脚本)协调:

def process_in_batches(all_documents: list, batch_size: int = 50):
    for i in range(0, len(all_documents), batch_size):
        batch = all_documents[i:i + batch_size]
        
        response = requests.post(
            "https://api.dify.ai/v1/workflows/run",
            json={
                "inputs": {"documents": batch},
                "response_mode": "blocking"
            }
        )
        
        print(f"批次 {i//batch_size + 1} 完成: {response.json()['status']}")

4.2 陷阱二:并行分支中的 Rate Limit 炸弹

场景:设计了 10 路并行,同时发起 10 个 LLM 调用。对于 GPT-4o,免费层的速率限制是 500 RPM(Requests Per Minute),付费用户是 5000 RPM。

问题:如果工作流被频繁触发(例如每 5 秒一次),10 路并行 × 每秒 0.2 次 = 2 次 LLM 调用/秒 = 120 次/分钟,看起来不多;但如果并行路数增加到 50 路,就可能触发 Rate Limit,导致整批失败。

监控和防御

import time
from threading import Semaphore

# 使用信号量控制并发数
rate_limiter = Semaphore(10)  # 最多 10 个并发 LLM 调用

def call_llm_with_rate_limit(prompt: str) -> str:
    with rate_limiter:
        try:
            return call_llm(prompt)
        except RateLimitError:
            # 触发速率限制,等待后重试
            time.sleep(60)  # 等待速率限制窗口重置
            return call_llm(prompt)

4.3 陷阱三:并行分支中的写入竞争

场景:3 个并行分支都需要向同一个 JSON 对象追加数据:

# 分支 A
results["category_a"] = analyze_category_a()

# 分支 B(与 A 并发)
results["category_b"] = analyze_category_b()

# 分支 C(与 A、B 并发)
results["category_c"] = analyze_category_c()

如果 results 是共享的可变对象,并发写入会产生竞争条件。

Dify 的解决方式:通过变量池副本(前面提到的 variable_pool.copy())隔离各分支的写操作。各分支只能写入自己的变量池副本,在汇聚节点时由引擎统一合并。

最佳实践:在汇聚节点的代码节点中显式合并各分支的结果,不要依赖隐式合并:

def main(
    result_a: dict,
    result_b: dict,
    result_c: dict
) -> dict:
    # 显式合并,清晰且可调试
    return {
        "category_a": result_a,
        "category_b": result_b,
        "category_c": result_c,
        "total_score": (
            result_a["score"] + result_b["score"] + result_c["score"]
        ) / 3
    }

4.4 错误恢复策略决策矩阵

场景 推荐策略 原因
外部 API 偶发超时 指数退避重试(3次) 临时问题,等待后多半能成功
LLM 输出格式错误 重新生成(不同温度参数) 重采样可以得到不同格式的输出
知识库连接失败 降级到通用 LLM(无 RAG) 无知识库比直接失败更好
数据验证失败 直接终止 + 返回错误详情 无效数据没有重试意义
下游服务熔断 等待 5 分钟后恢复 防止雪崩效应

4.5 超大规模工作流的架构升级

当单个 Dify 工作流无法满足需求时(需处理百万量级数据、SLA 要求毫秒级响应),考虑:

模式一:Dify + 消息队列

Dify 工作流(触发端)
    ↓ 向 Kafka/RabbitMQ 发布消息
消息队列
    ↓ 消费者并发处理
Worker 集群(多个 Dify 工作流实例或直接 LLM 调用)
    ↓ 结果写入
数据存储
    ↓ 通知
Dify 工作流(汇总端)

模式二:Dify 作为编排层,核心计算外移

把计算密集型工作(向量搜索、模型推理)封装为独立微服务,Dify 工作流只做编排调用,通过 HTTP 节点调用这些服务。这样 Dify 保持轻量,核心服务可以独立扩缩容。


本章小结

复杂工作流的三个核心能力各有其用武之地:

迭代节点:批量处理列表数据的首选;注意并发数设置(防速率限制)和输出数组大小(防内存溢出)。

并行分支:同时执行多个独立任务的最佳方式;注意在汇聚节点显式合并结果,防止竞争条件。

错误恢复:根据失败原因选择策略——临时性错误用重试,持续性失败用降级,数据错误直接终止。

关键清单

本章评分
4.5  / 5  (32 评分)

💬 留言讨论