Chapter 10

Advanced Workflows: Loops, Parallel Branches and Error Recovery

Chapter 10: Complex Workflows โ€” Loops, Parallel Branches, and Error Recovery

When business scenarios exceed simple linear flows, batch-processing loops, parallel acceleration of multi-path tasks, and robust error recovery mechanisms are what separate toy workflows from production-grade systems.

Chapter Overview

In practice, you'll quickly encounter scenarios that a simple linear workflow cannot handle:

These scenarios correspond to three advanced workflow capabilities: Loop nodes (Iteration), Parallel Branches, and Error Recovery.

This chapter covers in depth:


Level 1: Fundamentals (1โ€“3 Years Experience)

1.1 Iteration Nodes: Batch Processing a List

The Iteration Node is Dify's core node for processing list data. It takes an array, applies the same sub-workflow to each element, and aggregates all results at the end.

Analogy: An Iteration node is like a for loop โ€” applying the same processing logic to every element in a list.

Configuration steps:

  1. Add an Iteration node to the workflow
  2. In "Iterator Array Variable," select the variable to iterate over (must be array type)
  3. Add processing logic inside the Iteration node (sub-graph)
  4. The Iteration node's output is an array of all individual iteration results

Practical example: Batch translation

Start node (input: documents = ["Doc 1 content", "Doc 2 content", "Doc 3 content"])
  โ†“
Iteration node (iterate over documents array)
  โ”œโ”€ [Internal sub-graph]
  โ”‚   LLM node: translate current document (use {{#item#}} to reference current element)
  โ”‚   โ†’ output: translated_text
  โ†“
End node (output: iteration_node.output = ["Translation 1", "Translation 2", "Translation 3"])

Key variables: Inside an Iteration node, use {{#item#}} to reference the current element and {{#index#}} for the current index (starting at 0).

1.2 Parallel Branches: Executing Multiple Tasks Simultaneously

Parallel Branches allow a workflow to execute multiple branches simultaneously, then wait for all to complete before aggregating results.

Analogy: If conditional branching (IF/ELSE) is "go left OR go right," parallel branching is "go left AND right simultaneously."

Use cases:

How to configure:

  1. In the workflow, add a "Parallel" construct (start of parallel execution)
  2. Draw multiple connections from the same source node to different processing nodes
  3. Add a convergence node (end of parallel execution) connecting the endpoints of all parallel branches

Note: In Dify, parallel execution is a structural design in the workflow โ€” not a separate node type. Pulling multiple connections from the same parent node automatically creates parallel execution.

1.3 Understanding the Importance of Error Handling

In production, workflows face these common failure scenarios:

Scenario Failure Cause Frequency
HTTP Request node External API timeout, rate limiting, service unavailable Several times daily
LLM node Model overloaded, output format mismatch, context too long Several times weekly
Code node Type errors, division by zero, parse failures Several times monthly
Knowledge Retrieval node Vector database connection failure Rare but severe

A workflow without error handling will fail the entire flow when any single node fails, showing users a cold error message.

Dify's three error handling modes (configured in each node's settings):

  1. Terminate workflow (default): immediately stop the entire workflow when a node fails
  2. Continue with default value: when a node fails, substitute a pre-defined default value and continue downstream nodes
  3. Use fallback: when a node fails, route to a pre-defined fallback node chain

1.4 Building a Practical Workflow with Iteration

Scenario: Batch analyze product reviews, extracting sentiment and keywords

Workflow structure:

Start node
  Input: review_list (array, each element is one review text)
  Input: product_name (text)
    โ†“
Iteration node (iterate over review_list)
  Internal sub-graph:
    โ”œโ”€ LLM node: analyze a single review
    โ”‚   Prompt:
    โ”‚   """
    โ”‚   Analyze this user review about {{start.product_name}}:
    โ”‚   {{#item#}}
    โ”‚
    โ”‚   Output JSON:
    โ”‚   {"sentiment": "positive/negative/neutral",
    โ”‚    "score": 1-5,
    โ”‚    "keywords": ["word1", "word2"]}
    โ”‚   """
    โ”‚    โ†“
    โ””โ”€ Code node: parse JSON output
        def main(llm_output: str) -> dict:
            return json.loads(extract_json(llm_output))
    โ†“
Code node: aggregate analysis results
  def main(iteration_results: list) -> dict:
      sentiments = [r["sentiment"] for r in iteration_results]
      avg_score = sum(r["score"] for r in iteration_results) / len(iteration_results)
      return {
          "positive_rate": sentiments.count("positive") / len(sentiments),
          "avg_score": round(avg_score, 2),
          "negative_reviews": [
              r for r in iteration_results if r["sentiment"] == "negative"
          ]
      }
    โ†“
End node (output aggregated analysis)

Level 2: Mechanisms in Depth (3โ€“5 Years Experience)

2.1 Performance Characteristics and Concurrency Control of Iteration Nodes

By default, Iteration nodes execute sequentially: process element 0, then element 1, and so on.

For 100 elements with each LLM call taking 2 seconds, sequential execution requires 200 seconds.

Concurrent iteration (Dify v0.10+):

Iteration nodes support configuring a concurrency number, processing multiple elements simultaneously:

Concurrency = 1 (default): Sequential, 100 elements ร— 2s = 200s
Concurrency = 5: 5 concurrent, 100 elements / 5 ร— 2s = 40s (5x faster)
Concurrency = 10: 10 concurrent, 100 elements / 10 ร— 2s = 20s (10x faster)

Cost of concurrency:

Recommended settings:

Configure in Dify: Click the Iteration node โ†’ Settings โ†’ Concurrency count.

2.2 Advanced Parallel Branch Patterns

Pattern 1: Fan-out / Fan-in (Aggregation)

Single input
    โ†“
  /  |  \     (parallel branches)
A   B   C
  \  |  /     (convergence node)
    โ†“
Merged results

Implementation โ€” in the post-convergence Code node, reference each branch's output:

def main(
    analysis_a: dict,   # from branch A
    analysis_b: dict,   # from branch B
    analysis_c: dict,   # from branch C
) -> dict:
    # Fuse results from three paths
    return {
        "combined_score": (
            analysis_a["score"] * 0.4 +
            analysis_b["score"] * 0.3 +
            analysis_c["score"] * 0.3
        ),
        "all_insights": (
            analysis_a["insights"] +
            analysis_b["insights"] +
            analysis_c["insights"]
        )
    }

Pattern 2: Race (First-Wins)

Have multiple LLMs process the same task simultaneously and take the fastest result:

# Needs to be implemented as an external HTTP service
# (Code nodes cannot make network requests)
import asyncio
import aiohttp

async def race_llm_calls(prompt: str, models: list) -> dict:
    """Call multiple models, return the fastest result"""
    tasks = [call_llm(prompt, model) for model in models]

    # FIRST_COMPLETED: return as soon as one finishes
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )

    # Cancel unfinished tasks
    for task in pending:
        task.cancel()

    return done.pop().result()

Pattern 3: Conditional Parallel

Dynamically decide which parallel branches to activate based on input (implemented in Dify using IF/ELSE combined with parallel design):

IF query involves financial data?
    โ†’ Activate financial database query
IF query involves product info?
    โ†’ Activate product knowledge base query
IF query involves user data?
    โ†’ Activate CRM system query
    โ†“
Converge all activated query results

2.3 Error Recovery Implementation Patterns

Retry mechanism (implemented through looping):

Attempt node (HTTP request)
    โ†“
Decision node (IF: request succeeded?)
    โ”œโ”€โ”€ Success โ†’ Continue main flow
    โ””โ”€โ”€ Failure โ†’ Retry counter + 1
                   IF retry count < 3
                     โ†’ Wait 1 second (via Code node)
                     โ†’ Return to attempt node
                   ELSE
                     โ†’ Take degradation branch

Exponential backoff retry in a Code node:

import time
import requests

def main(
    url: str,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> dict:
    """HTTP request with exponential backoff"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return {
                "success": True,
                "data": response.json(),
                "attempts": attempt + 1
            }
        except Exception as e:
            if attempt < max_retries - 1:
                # Exponential backoff: 1s, 2s, 4s, ...
                wait_time = base_delay * (2 ** attempt)
                time.sleep(wait_time)
            else:
                return {
                    "success": False,
                    "error": str(e),
                    "attempts": attempt + 1
                }

Degradation strategy:

Primary path: query real-time data (HTTP โ†’ external API)
               โ†“ on failure
Degradation path: query cached data (knowledge base retrieval)
               โ†“ still failing
Final fallback: return preset default answer

Implemented in Dify via node error handling settings combined with conditional branches.

2.4 Workflow State Persistence

For long-running workflows (e.g., batch processing hundreds of documents), implement state persistence to enable resumption from checkpoints on failure:

# Record progress to Redis/database at the start of the workflow
def main(
    documents: list,
    job_id: str,
    redis_url: str
) -> dict:
    import redis
    import json

    r = redis.from_url(redis_url)

    # Check for unfinished tasks (resume from checkpoint)
    progress_key = f"workflow_progress:{job_id}"
    existing_progress = r.get(progress_key)

    if existing_progress:
        processed_ids = json.loads(existing_progress)
    else:
        processed_ids = []

    # Filter out already-processed documents
    remaining = [
        doc for i, doc in enumerate(documents)
        if str(i) not in processed_ids
    ]

    return {
        "remaining_documents": remaining,
        "already_processed": len(processed_ids),
        "total": len(documents)
    }

Level 3: Source Code and Principles (5+ Years Experience)

3.1 Iteration Node Internal Implementation

Core logic of Dify's Iteration Node:

# api/core/workflow/nodes/iteration/iteration_node.py

class IterationNode(BaseNode):
    def _run(self, variable_pool: VariablePool) -> NodeRunResult:
        # Get the array to iterate over
        iterator_list = variable_pool.get_any(
            self.node_data.iterator_selector
        )

        if not isinstance(iterator_list, list):
            raise ValueError("Iterator variable must be array type")

        outputs = []

        # Core of concurrent execution
        if self.node_data.is_parallel and self.node_data.parallel_nums > 1:
            # Process in batches, each batch runs concurrently
            batch_size = self.node_data.parallel_nums
            for i in range(0, len(iterator_list), batch_size):
                batch = iterator_list[i:i + batch_size]
                batch_results = self._run_batch_concurrent(
                    batch, variable_pool, start_index=i
                )
                outputs.extend(batch_results)
        else:
            # Sequential execution
            for index, item in enumerate(iterator_list):
                result = self._run_single_iteration(
                    item, index, variable_pool
                )
                outputs.append(result)

        return NodeRunResult(
            status=WorkflowNodeExecutionStatus.SUCCEEDED,
            outputs={"output": outputs}
        )

    def _run_batch_concurrent(
        self, batch: list, variable_pool: VariablePool, start_index: int
    ) -> list:
        import concurrent.futures

        with concurrent.futures.ThreadPoolExecutor(
            max_workers=len(batch)
        ) as executor:
            futures = {
                executor.submit(
                    self._run_single_iteration,
                    item, start_index + i, variable_pool.copy()
                ): i
                for i, item in enumerate(batch)
            }

            results = [None] * len(batch)
            for future in concurrent.futures.as_completed(futures):
                idx = futures[future]
                results[idx] = future.result()

        return results

Key design: Each concurrent iteration receives a copy of the variable pool (variable_pool.copy()), ensuring data isolation between iterations and preventing concurrent write conflicts.

3.2 Parallel Branch Execution Model

Parallel branches in Dify are implemented at the graph engine level:

class GraphEngine:
    def _get_next_nodes(
        self,
        graph: WorkflowGraph,
        completed_node_id: str,
        result: NodeRunResult,
        variable_pool: VariablePool
    ) -> list[str]:
        """Determine which nodes to execute next"""
        outgoing_edges = graph.get_outgoing_edges(completed_node_id)

        if len(outgoing_edges) == 1:
            # Single successor node
            return [outgoing_edges[0].target]

        elif len(outgoing_edges) > 1:
            # Multiple successor nodes: parallel execution
            # All successors are added to the execution queue simultaneously
            return [edge.target for edge in outgoing_edges]

        return []

    def _execute_parallel(
        self,
        nodes: list[str],
        graph: WorkflowGraph,
        variable_pool: VariablePool
    ) -> dict[str, NodeRunResult]:
        """Concurrently execute multiple node branches"""
        import concurrent.futures

        with concurrent.futures.ThreadPoolExecutor() as executor:
            future_to_node = {
                executor.submit(
                    self._execute_branch,
                    node_id, graph, variable_pool.copy()
                ): node_id
                for node_id in nodes
            }

            results = {}
            for future in concurrent.futures.as_completed(future_to_node):
                node_id = future_to_node[future]
                results[node_id] = future.result()

        return results

Convergence point: When all parallel branches reach a shared convergence node, the graph engine waits for all branches to complete, merges all results into the variable pool, then continues subsequent execution.

3.3 Error Propagation Chain and Exception Context

Dify workflow error handling involves a complete exception context chain:

@dataclass
class WorkflowError:
    node_id: str
    node_title: str
    error_type: str           # "timeout" | "type_error" | "api_error" | ...
    error_message: str
    error_traceback: str
    timestamp: datetime
    retry_count: int

    # Error propagation path
    caused_by: Optional['WorkflowError'] = None

class ErrorHandler:
    def handle_node_error(
        self,
        node: BaseNode,
        error: Exception,
        variable_pool: VariablePool
    ) -> NodeRunResult:

        error_mode = node.node_data.error_handling_mode

        if error_mode == ErrorHandlingMode.TERMINATE:
            # Terminate workflow, return error status
            raise WorkflowTerminateException(
                f"Node '{node.title}' failed: {str(error)}"
            )

        elif error_mode == ErrorHandlingMode.CONTINUE_WITH_DEFAULT:
            # Continue with default value
            default_outputs = node.node_data.error_default_outputs
            return NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED,
                outputs=default_outputs,
                error=WorkflowError(node_id=node.id, error_message=str(error))
            )

        elif error_mode == ErrorHandlingMode.FALLBACK:
            # Route to fallback branch (handled at graph engine level)
            return NodeRunResult(
                status=WorkflowNodeExecutionStatus.FAILED_WITH_FALLBACK,
                error=WorkflowError(node_id=node.id, error_message=str(error))
            )

Level 4: Production Pitfalls and Decision-Making (Expert Perspective)

4.1 Pitfall 1: Memory Explosion in Iteration Nodes

Problem: Iterating over 1,000 documents with each document containing 2,000 characters and LLM output of 500 characters. The Iteration node's output array would contain 1,000 objects, potentially reaching hundreds of MB in memory, causing an OOM (out-of-memory) error.

Symptom: Workflow fails suddenly after iterating past 500+ elements, or becomes extremely slow.

Solution 1: Stream processing โ€” write directly to storage

Instead of accumulating results in the Iteration node's output array, write directly to a database or object storage during each iteration:

# Code node inside the iteration
def main(item: str, index: int, job_id: str, db_url: str) -> dict:
    import psycopg2
    import json

    # Process current item
    result = process_item(item)

    # Write directly to database, not returning to workflow variable pool
    conn = psycopg2.connect(db_url)
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO results (job_id, index, result) VALUES (%s, %s, %s)",
        (job_id, index, json.dumps(result))
    )
    conn.commit()

    # Only return status, not the actual data
    return {"status": "ok", "index": index}

Solution 2: Batch processing

Split 1,000 tasks into batches of 50, with each batch as an independent workflow call, coordinated by an external scheduler (Python script):

def process_in_batches(all_documents: list, batch_size: int = 50):
    for i in range(0, len(all_documents), batch_size):
        batch = all_documents[i:i + batch_size]

        response = requests.post(
            "https://api.dify.ai/v1/workflows/run",
            json={
                "inputs": {"documents": batch},
                "response_mode": "blocking"
            }
        )

        batch_num = i // batch_size + 1
        print(f"Batch {batch_num} complete: {response.json()['status']}")

4.2 Pitfall 2: Rate Limit Bomb in Parallel Branches

Scenario: A 10-way parallel design fires 10 simultaneous LLM calls. For GPT-4o, the free tier rate limit is 500 RPM, paid users get 5,000 RPM.

Problem: If the workflow is triggered frequently (e.g., every 5 seconds), 10 parallel paths ร— 0.2 triggers/second = 2 LLM calls/second = 120 calls/minute โ€” sounds manageable. But scale to 50 parallel paths and you'll hit the rate limit, causing the entire batch to fail.

Monitoring and defense:

import time
from threading import Semaphore

# Use semaphore to control concurrency
rate_limiter = Semaphore(10)  # Maximum 10 concurrent LLM calls

def call_llm_with_rate_limit(prompt: str) -> str:
    with rate_limiter:
        try:
            return call_llm(prompt)
        except RateLimitError:
            # Rate limit hit, wait and retry
            time.sleep(60)  # Wait for rate limit window to reset
            return call_llm(prompt)

4.3 Pitfall 3: Write Race Conditions in Parallel Branches

Scenario: 3 parallel branches all need to append data to the same JSON object:

# Branch A
results["category_a"] = analyze_category_a()

# Branch B (concurrent with A)
results["category_b"] = analyze_category_b()

# Branch C (concurrent with A and B)
results["category_c"] = analyze_category_c()

If results is a shared mutable object, concurrent writes create a race condition.

Dify's solution: Via variable pool copies (the variable_pool.copy() mentioned earlier), write operations in each branch are isolated. Branches can only write to their own variable pool copy; the engine merges them at the convergence node.

Best practice: Explicitly merge results from each branch in the convergence Code node โ€” don't rely on implicit merging:

def main(
    result_a: dict,
    result_b: dict,
    result_c: dict
) -> dict:
    # Explicit merge โ€” clear and debuggable
    return {
        "category_a": result_a,
        "category_b": result_b,
        "category_c": result_c,
        "total_score": (
            result_a["score"] + result_b["score"] + result_c["score"]
        ) / 3
    }

4.4 Error Recovery Strategy Decision Matrix

Scenario Recommended Strategy Reason
External API occasional timeout Exponential backoff retry (3 attempts) Temporary issue, likely succeeds after waiting
LLM output format error Regenerate with different temperature Resampling produces different output format
Knowledge base connection failure Degrade to general LLM (no RAG) No knowledge base is better than total failure
Data validation failure Terminate immediately + return error details Invalid data has no reason to retry
Downstream service circuit break Wait 5 minutes before recovering Prevents cascade failures

4.5 Architecture Upgrade for Massive-Scale Workflows

When a single Dify workflow cannot meet requirements (millions of records, millisecond SLA requirements), consider:

Pattern 1: Dify + Message Queue

Dify workflow (trigger side)
    โ†“ publishes message to Kafka/RabbitMQ
Message queue
    โ†“ concurrent consumers
Worker cluster (multiple Dify workflow instances or direct LLM calls)
    โ†“ write results
Data store
    โ†“ notification
Dify workflow (aggregation side)

Pattern 2: Dify as Orchestration Layer, Core Computation Offloaded

Wrap compute-intensive work (vector search, model inference) as independent microservices; Dify workflows only orchestrate calls via HTTP Request nodes. This keeps Dify lightweight while core services can scale independently.


Chapter Summary

The three advanced workflow capabilities each have their place:

Iteration nodes: First choice for batch-processing list data. Watch concurrency settings (prevent rate limits) and output array size (prevent memory overflow).

Parallel branches: Best approach for executing multiple independent tasks simultaneously. Explicitly merge results at the convergence node to prevent race conditions.

Error recovery: Match strategy to failure cause โ€” use retry for transient errors, degradation for persistent failures, immediate termination for data errors.

Key checklist:

Rate this chapter
4.5  / 5  (32 ratings)

๐Ÿ’ฌ Comments