Advanced Workflows: Loops, Parallel Branches and Error Recovery
Chapter 10: Complex Workflows — Loops, Parallel Branches, and Error Recovery
When business scenarios exceed simple linear flows, batch-processing loops, parallel acceleration of multi-path tasks, and robust error recovery mechanisms are what separate toy workflows from production-grade systems.
Chapter Overview
In practice, you'll quickly encounter scenarios that a simple linear workflow cannot handle:
- Processing a list of documents (10 contracts, 100 reviews, 500 data rows), each requiring AI analysis
- Simultaneously calling multiple LLMs or data sources — either taking the fastest response (race) or aggregating all results
- A step that might fail (external API timeout, LLM output format error) requiring automatic retry or a fallback plan
These scenarios correspond to three advanced workflow capabilities: Loop nodes (Iteration), Parallel Branches, and Error Recovery.
This chapter covers in depth:
- The difference between Iteration nodes and Loop nodes, and when to use each
- Parallel branch design patterns: parallel-aggregate, race, fan-out/fan-in
- Error handling strategies: retry, degradation, exception branches
- Performance optimization and resource control in production
Level 1: Fundamentals (1–3 Years Experience)
1.1 Iteration Nodes: Batch Processing a List
The Iteration Node is Dify's core node for processing list data. It takes an array, applies the same sub-workflow to each element, and aggregates all results at the end.
Analogy: An Iteration node is like a for loop — applying the same processing logic to every element in a list.
Configuration steps:
- Add an Iteration node to the workflow
- In "Iterator Array Variable," select the variable to iterate over (must be array type)
- Add processing logic inside the Iteration node (sub-graph)
- The Iteration node's output is an array of all individual iteration results
Practical example: Batch translation
Start node (input: documents = ["Doc 1 content", "Doc 2 content", "Doc 3 content"])
↓
Iteration node (iterate over documents array)
├─ [Internal sub-graph]
│ LLM node: translate current document (use {{#item#}} to reference current element)
│ → output: translated_text
↓
End node (output: iteration_node.output = ["Translation 1", "Translation 2", "Translation 3"])
Key variables: Inside an Iteration node, use {{#item#}} to reference the current element and {{#index#}} for the current index (starting at 0).
1.2 Parallel Branches: Executing Multiple Tasks Simultaneously
Parallel Branches allow a workflow to execute multiple branches simultaneously, then wait for all to complete before aggregating results.
Analogy: If conditional branching (IF/ELSE) is "go left OR go right," parallel branching is "go left AND right simultaneously."
Use cases:
- Process the same question with GPT-4o and Claude 3.5 simultaneously, then compare results
- Call multiple APIs simultaneously (weather, stock price, news) and aggregate data
- Simultaneously summarize a document, extract keywords, and generate tags, then merge
How to configure:
- In the workflow, add a "Parallel" construct (start of parallel execution)
- Draw multiple connections from the same source node to different processing nodes
- Add a convergence node (end of parallel execution) connecting the endpoints of all parallel branches
Note: In Dify, parallel execution is a structural design in the workflow — not a separate node type. Pulling multiple connections from the same parent node automatically creates parallel execution.
1.3 Understanding the Importance of Error Handling
In production, workflows face these common failure scenarios:
| Scenario | Failure Cause | Frequency |
|---|---|---|
| HTTP Request node | External API timeout, rate limiting, service unavailable | Several times daily |
| LLM node | Model overloaded, output format mismatch, context too long | Several times weekly |
| Code node | Type errors, division by zero, parse failures | Several times monthly |
| Knowledge Retrieval node | Vector database connection failure | Rare but severe |
A workflow without error handling will fail the entire flow when any single node fails, showing users a cold error message.
Dify's three error handling modes (configured in each node's settings):
- Terminate workflow (default): immediately stop the entire workflow when a node fails
- Continue with default value: when a node fails, substitute a pre-defined default value and continue downstream nodes
- Use fallback: when a node fails, route to a pre-defined fallback node chain
1.4 Building a Practical Workflow with Iteration
Scenario: Batch analyze product reviews, extracting sentiment and keywords
Workflow structure:
Start node
Input: review_list (array, each element is one review text)
Input: product_name (text)
↓
Iteration node (iterate over review_list)
Internal sub-graph:
├─ LLM node: analyze a single review
│ Prompt:
│ """
│ Analyze this user review about {{start.product_name}}:
│ {{#item#}}
│
│ Output JSON:
│ {"sentiment": "positive/negative/neutral",
│ "score": 1-5,
│ "keywords": ["word1", "word2"]}
│ """
│ ↓
└─ Code node: parse JSON output
def main(llm_output: str) -> dict:
return json.loads(extract_json(llm_output))
↓
Code node: aggregate analysis results
def main(iteration_results: list) -> dict:
sentiments = [r["sentiment"] for r in iteration_results]
avg_score = sum(r["score"] for r in iteration_results) / len(iteration_results)
return {
"positive_rate": sentiments.count("positive") / len(sentiments),
"avg_score": round(avg_score, 2),
"negative_reviews": [
r for r in iteration_results if r["sentiment"] == "negative"
]
}
↓
End node (output aggregated analysis)
Level 2: Mechanisms in Depth (3–5 Years Experience)
2.1 Performance Characteristics and Concurrency Control of Iteration Nodes
By default, Iteration nodes execute sequentially: process element 0, then element 1, and so on.
For 100 elements with each LLM call taking 2 seconds, sequential execution requires 200 seconds.
Concurrent iteration (Dify v0.10+):
Iteration nodes support configuring a concurrency number, processing multiple elements simultaneously:
Concurrency = 1 (default): Sequential, 100 elements × 2s = 200s
Concurrency = 5: 5 concurrent, 100 elements / 5 × 2s = 40s (5x faster)
Concurrency = 10: 10 concurrent, 100 elements / 10 × 2s = 20s (10x faster)
Cost of concurrency:
- Higher concurrency = faster API cost rate (token consumption accelerates)
- May trigger LLM API rate limits
- If database writes are involved, concurrent conflicts must be handled
Recommended settings:
- Small batch (fewer than 20): concurrency = element count (full parallel)
- Medium batch (20–100): concurrency = 5–10
- Large batch (more than 100): concurrency = 3–5 (prevent rate limit triggering)
Configure in Dify: Click the Iteration node → Settings → Concurrency count.
2.2 Advanced Parallel Branch Patterns
Pattern 1: Fan-out / Fan-in (Aggregation)
Single input
↓
/ | \ (parallel branches)
A B C
\ | / (convergence node)
↓
Merged results
Implementation — in the post-convergence Code node, reference each branch's output:
def main(
analysis_a: dict, # from branch A
analysis_b: dict, # from branch B
analysis_c: dict, # from branch C
) -> dict:
# Fuse results from three paths
return {
"combined_score": (
analysis_a["score"] * 0.4 +
analysis_b["score"] * 0.3 +
analysis_c["score"] * 0.3
),
"all_insights": (
analysis_a["insights"] +
analysis_b["insights"] +
analysis_c["insights"]
)
}
Pattern 2: Race (First-Wins)
Have multiple LLMs process the same task simultaneously and take the fastest result:
# Needs to be implemented as an external HTTP service
# (Code nodes cannot make network requests)
import asyncio
import aiohttp
async def race_llm_calls(prompt: str, models: list) -> dict:
"""Call multiple models, return the fastest result"""
tasks = [call_llm(prompt, model) for model in models]
# FIRST_COMPLETED: return as soon as one finishes
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
# Cancel unfinished tasks
for task in pending:
task.cancel()
return done.pop().result()
Pattern 3: Conditional Parallel
Dynamically decide which parallel branches to activate based on input (implemented in Dify using IF/ELSE combined with parallel design):
IF query involves financial data?
→ Activate financial database query
IF query involves product info?
→ Activate product knowledge base query
IF query involves user data?
→ Activate CRM system query
↓
Converge all activated query results
2.3 Error Recovery Implementation Patterns
Retry mechanism (implemented through looping):
Attempt node (HTTP request)
↓
Decision node (IF: request succeeded?)
├── Success → Continue main flow
└── Failure → Retry counter + 1
IF retry count < 3
→ Wait 1 second (via Code node)
→ Return to attempt node
ELSE
→ Take degradation branch
Exponential backoff retry in a Code node:
import time
import requests
def main(
url: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> dict:
"""HTTP request with exponential backoff"""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return {
"success": True,
"data": response.json(),
"attempts": attempt + 1
}
except Exception as e:
if attempt < max_retries - 1:
# Exponential backoff: 1s, 2s, 4s, ...
wait_time = base_delay * (2 ** attempt)
time.sleep(wait_time)
else:
return {
"success": False,
"error": str(e),
"attempts": attempt + 1
}
Degradation strategy:
Primary path: query real-time data (HTTP → external API)
↓ on failure
Degradation path: query cached data (knowledge base retrieval)
↓ still failing
Final fallback: return preset default answer
Implemented in Dify via node error handling settings combined with conditional branches.
2.4 Workflow State Persistence
For long-running workflows (e.g., batch processing hundreds of documents), implement state persistence to enable resumption from checkpoints on failure:
# Record progress to Redis/database at the start of the workflow
def main(
documents: list,
job_id: str,
redis_url: str
) -> dict:
import redis
import json
r = redis.from_url(redis_url)
# Check for unfinished tasks (resume from checkpoint)
progress_key = f"workflow_progress:{job_id}"
existing_progress = r.get(progress_key)
if existing_progress:
processed_ids = json.loads(existing_progress)
else:
processed_ids = []
# Filter out already-processed documents
remaining = [
doc for i, doc in enumerate(documents)
if str(i) not in processed_ids
]
return {
"remaining_documents": remaining,
"already_processed": len(processed_ids),
"total": len(documents)
}
Level 3: Source Code and Principles (5+ Years Experience)
3.1 Iteration Node Internal Implementation
Core logic of Dify's Iteration Node:
# api/core/workflow/nodes/iteration/iteration_node.py
class IterationNode(BaseNode):
def _run(self, variable_pool: VariablePool) -> NodeRunResult:
# Get the array to iterate over
iterator_list = variable_pool.get_any(
self.node_data.iterator_selector
)
if not isinstance(iterator_list, list):
raise ValueError("Iterator variable must be array type")
outputs = []
# Core of concurrent execution
if self.node_data.is_parallel and self.node_data.parallel_nums > 1:
# Process in batches, each batch runs concurrently
batch_size = self.node_data.parallel_nums
for i in range(0, len(iterator_list), batch_size):
batch = iterator_list[i:i + batch_size]
batch_results = self._run_batch_concurrent(
batch, variable_pool, start_index=i
)
outputs.extend(batch_results)
else:
# Sequential execution
for index, item in enumerate(iterator_list):
result = self._run_single_iteration(
item, index, variable_pool
)
outputs.append(result)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={"output": outputs}
)
def _run_batch_concurrent(
self, batch: list, variable_pool: VariablePool, start_index: int
) -> list:
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(
max_workers=len(batch)
) as executor:
futures = {
executor.submit(
self._run_single_iteration,
item, start_index + i, variable_pool.copy()
): i
for i, item in enumerate(batch)
}
results = [None] * len(batch)
for future in concurrent.futures.as_completed(futures):
idx = futures[future]
results[idx] = future.result()
return results
Key design: Each concurrent iteration receives a copy of the variable pool (variable_pool.copy()), ensuring data isolation between iterations and preventing concurrent write conflicts.
3.2 Parallel Branch Execution Model
Parallel branches in Dify are implemented at the graph engine level:
class GraphEngine:
def _get_next_nodes(
self,
graph: WorkflowGraph,
completed_node_id: str,
result: NodeRunResult,
variable_pool: VariablePool
) -> list[str]:
"""Determine which nodes to execute next"""
outgoing_edges = graph.get_outgoing_edges(completed_node_id)
if len(outgoing_edges) == 1:
# Single successor node
return [outgoing_edges[0].target]
elif len(outgoing_edges) > 1:
# Multiple successor nodes: parallel execution
# All successors are added to the execution queue simultaneously
return [edge.target for edge in outgoing_edges]
return []
def _execute_parallel(
self,
nodes: list[str],
graph: WorkflowGraph,
variable_pool: VariablePool
) -> dict[str, NodeRunResult]:
"""Concurrently execute multiple node branches"""
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_node = {
executor.submit(
self._execute_branch,
node_id, graph, variable_pool.copy()
): node_id
for node_id in nodes
}
results = {}
for future in concurrent.futures.as_completed(future_to_node):
node_id = future_to_node[future]
results[node_id] = future.result()
return results
Convergence point: When all parallel branches reach a shared convergence node, the graph engine waits for all branches to complete, merges all results into the variable pool, then continues subsequent execution.
3.3 Error Propagation Chain and Exception Context
Dify workflow error handling involves a complete exception context chain:
@dataclass
class WorkflowError:
node_id: str
node_title: str
error_type: str # "timeout" | "type_error" | "api_error" | ...
error_message: str
error_traceback: str
timestamp: datetime
retry_count: int
# Error propagation path
caused_by: Optional['WorkflowError'] = None
class ErrorHandler:
def handle_node_error(
self,
node: BaseNode,
error: Exception,
variable_pool: VariablePool
) -> NodeRunResult:
error_mode = node.node_data.error_handling_mode
if error_mode == ErrorHandlingMode.TERMINATE:
# Terminate workflow, return error status
raise WorkflowTerminateException(
f"Node '{node.title}' failed: {str(error)}"
)
elif error_mode == ErrorHandlingMode.CONTINUE_WITH_DEFAULT:
# Continue with default value
default_outputs = node.node_data.error_default_outputs
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
outputs=default_outputs,
error=WorkflowError(node_id=node.id, error_message=str(error))
)
elif error_mode == ErrorHandlingMode.FALLBACK:
# Route to fallback branch (handled at graph engine level)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED_WITH_FALLBACK,
error=WorkflowError(node_id=node.id, error_message=str(error))
)
Level 4: Production Pitfalls and Decision-Making (Expert Perspective)
4.1 Pitfall 1: Memory Explosion in Iteration Nodes
Problem: Iterating over 1,000 documents with each document containing 2,000 characters and LLM output of 500 characters. The Iteration node's output array would contain 1,000 objects, potentially reaching hundreds of MB in memory, causing an OOM (out-of-memory) error.
Symptom: Workflow fails suddenly after iterating past 500+ elements, or becomes extremely slow.
Solution 1: Stream processing — write directly to storage
Instead of accumulating results in the Iteration node's output array, write directly to a database or object storage during each iteration:
# Code node inside the iteration
def main(item: str, index: int, job_id: str, db_url: str) -> dict:
import psycopg2
import json
# Process current item
result = process_item(item)
# Write directly to database, not returning to workflow variable pool
conn = psycopg2.connect(db_url)
cur = conn.cursor()
cur.execute(
"INSERT INTO results (job_id, index, result) VALUES (%s, %s, %s)",
(job_id, index, json.dumps(result))
)
conn.commit()
# Only return status, not the actual data
return {"status": "ok", "index": index}
Solution 2: Batch processing
Split 1,000 tasks into batches of 50, with each batch as an independent workflow call, coordinated by an external scheduler (Python script):
def process_in_batches(all_documents: list, batch_size: int = 50):
for i in range(0, len(all_documents), batch_size):
batch = all_documents[i:i + batch_size]
response = requests.post(
"https://api.dify.ai/v1/workflows/run",
json={
"inputs": {"documents": batch},
"response_mode": "blocking"
}
)
batch_num = i // batch_size + 1
print(f"Batch {batch_num} complete: {response.json()['status']}")
4.2 Pitfall 2: Rate Limit Bomb in Parallel Branches
Scenario: A 10-way parallel design fires 10 simultaneous LLM calls. For GPT-4o, the free tier rate limit is 500 RPM, paid users get 5,000 RPM.
Problem: If the workflow is triggered frequently (e.g., every 5 seconds), 10 parallel paths × 0.2 triggers/second = 2 LLM calls/second = 120 calls/minute — sounds manageable. But scale to 50 parallel paths and you'll hit the rate limit, causing the entire batch to fail.
Monitoring and defense:
import time
from threading import Semaphore
# Use semaphore to control concurrency
rate_limiter = Semaphore(10) # Maximum 10 concurrent LLM calls
def call_llm_with_rate_limit(prompt: str) -> str:
with rate_limiter:
try:
return call_llm(prompt)
except RateLimitError:
# Rate limit hit, wait and retry
time.sleep(60) # Wait for rate limit window to reset
return call_llm(prompt)
4.3 Pitfall 3: Write Race Conditions in Parallel Branches
Scenario: 3 parallel branches all need to append data to the same JSON object:
# Branch A
results["category_a"] = analyze_category_a()
# Branch B (concurrent with A)
results["category_b"] = analyze_category_b()
# Branch C (concurrent with A and B)
results["category_c"] = analyze_category_c()
If results is a shared mutable object, concurrent writes create a race condition.
Dify's solution: Via variable pool copies (the variable_pool.copy() mentioned earlier), write operations in each branch are isolated. Branches can only write to their own variable pool copy; the engine merges them at the convergence node.
Best practice: Explicitly merge results from each branch in the convergence Code node — don't rely on implicit merging:
def main(
result_a: dict,
result_b: dict,
result_c: dict
) -> dict:
# Explicit merge — clear and debuggable
return {
"category_a": result_a,
"category_b": result_b,
"category_c": result_c,
"total_score": (
result_a["score"] + result_b["score"] + result_c["score"]
) / 3
}
4.4 Error Recovery Strategy Decision Matrix
| Scenario | Recommended Strategy | Reason |
|---|---|---|
| External API occasional timeout | Exponential backoff retry (3 attempts) | Temporary issue, likely succeeds after waiting |
| LLM output format error | Regenerate with different temperature | Resampling produces different output format |
| Knowledge base connection failure | Degrade to general LLM (no RAG) | No knowledge base is better than total failure |
| Data validation failure | Terminate immediately + return error details | Invalid data has no reason to retry |
| Downstream service circuit break | Wait 5 minutes before recovering | Prevents cascade failures |
4.5 Architecture Upgrade for Massive-Scale Workflows
When a single Dify workflow cannot meet requirements (millions of records, millisecond SLA requirements), consider:
Pattern 1: Dify + Message Queue
Dify workflow (trigger side)
↓ publishes message to Kafka/RabbitMQ
Message queue
↓ concurrent consumers
Worker cluster (multiple Dify workflow instances or direct LLM calls)
↓ write results
Data store
↓ notification
Dify workflow (aggregation side)
Pattern 2: Dify as Orchestration Layer, Core Computation Offloaded
Wrap compute-intensive work (vector search, model inference) as independent microservices; Dify workflows only orchestrate calls via HTTP Request nodes. This keeps Dify lightweight while core services can scale independently.
Chapter Summary
The three advanced workflow capabilities each have their place:
Iteration nodes: First choice for batch-processing list data. Watch concurrency settings (prevent rate limits) and output array size (prevent memory overflow).
Parallel branches: Best approach for executing multiple independent tasks simultaneously. Explicitly merge results at the convergence node to prevent race conditions.
Error recovery: Match strategy to failure cause — use retry for transient errors, degradation for persistent failures, immediate termination for data errors.
Key checklist:
- Iteration node array sizes evaluated (more than 100 elements should write directly to storage)
- Concurrency configured appropriately for API rate limits
- Every potentially-failing node has an error handling mode configured
- Parallel branch convergence nodes have explicit result merging logic
- Long-running workflows have state persistence mechanism
- Workflow behavior on node failure has been tested