Chapter 9

Workflow Basics: Node Types, Variable System and Conditional Branching

Chapter 9: Workflow Fundamentals — Node Types, Variable System, and Conditional Branching

Workflows are one of Dify's most powerful capabilities; mastering node types, variable passing, and conditional branching is the foundation for building complex AI automation systems.

Chapter Overview

If a conversational application (chatbot) is about having AI answer questions, then a workflow (Workflow) is about having AI execute tasks. The fundamental difference: a conversation is a single-turn or multi-turn interaction, while a workflow is an ordered, branchable, composable automated process.

Imagine this scenario: a user submits a resume, and the system must automatically: extract the candidate's key information → score against job requirements → if qualified, generate an interview invitation email; otherwise, generate a polite rejection email → send the email. This process involves multiple steps, conditional logic, and external service integration — exactly what workflows are designed for.

This chapter systematically covers:


Level 1: Fundamentals (1–3 Years Experience)

1.1 Workflow vs. Conversational Application: When to Use Which?

Feature Conversational App (Chatbot) Workflow
Interaction mode Multi-turn conversation Single execution (clear start and end)
User perception Real-time chat interface Usually runs in background, returns results
Best for Q&A, consulting, companionship Batch processing, automation, data processing
Error handling Model explains errors on its own Can design try/catch logic
I/O format Text dialogue Structured input/output, multiple format support

Typical scenarios for choosing workflows:

1.2 Core Node Types in Dify Workflows

Start Node Every workflow must have exactly one Start node. It defines the workflow's input parameters:

LLM Node The core node that calls a language model to process text:

Knowledge Retrieval Node Retrieves relevant content from Dify knowledge bases:

IF/ELSE Node (Conditional Branch) Determines workflow direction based on conditions:

Code Node Executes Python or JavaScript code:

HTTP Request Node Calls external APIs:

End Node Defines the workflow's output:

1.3 Variable System: Data Flowing Between Nodes

Variable reference syntax: {{node_name.output_variable_name}}

Examples:

Using variables in LLM node prompts:

You are a resume analysis assistant.

Candidate information:
{{start.resume_text}}

Job requirements:
{{start.job_requirements}}

Analyze whether the candidate meets the job requirements.
Output the following JSON format:
{
  "score": integer score from 0 to 100,
  "strengths": ["strength 1", "strength 2"],
  "gaps": ["gap 1", "gap 2"],
  "recommendation": "recommend/reject"
}

Variable types:

1.4 Building Your First Workflow: Resume Analyzer

Goal: User submits resume text → AI analyzes → outputs score and recommendations

Steps:

  1. Create new workflow: Dify → Studio → Create Application → Workflow

  2. Configure Start node:

    • Input variable 1: resume_text (paragraph type, required)
    • Input variable 2: job_title (text type, required)
  3. Add LLM node:

    • Select model (e.g., GPT-4o mini)
    • System prompt: You are a professional HR assistant skilled at resume analysis
    • User prompt:
      Analyze the following resume and assess the candidate's fit for the "{{start.job_title}}" role.
      
      Resume:
      {{start.resume_text}}
      
      Output JSON format only (no other text):
      {"score": number, "highlights": ["highlight"], "concerns": ["concern"], "verdict": "recommend/reject"}
      
  4. Add Code node (parse LLM's JSON output):

    import json
    
    def main(llm_output: str) -> dict:
        # Clean possible markdown code blocks
        clean = llm_output.strip()
        if clean.startswith("```"):
            clean = clean.split("```")[1]
            if clean.startswith("json"):
                clean = clean[4:]
    
        result = json.loads(clean.strip())
        return {
            "score": result["score"],
            "verdict": result["verdict"],
            "highlights": "\n".join(result["highlights"]),
            "concerns": "\n".join(result["concerns"])
        }
    
  5. Add End node: output score, verdict, highlights, concerns

  6. Test: click "Run" in the top right, fill in test data


Level 2: Mechanisms in Depth (3–5 Years Experience)

2.1 Advanced Uses of Conditional Branching

Basic condition (single condition):

code_node.score > 60 → take "qualified" branch
                      otherwise → take "unqualified" branch

Compound condition (AND logic):

code_node.score > 60 AND start.years_of_experience > 3
→ take "high-quality candidate" branch

Multiple branches (IF / ELSE IF / ELSE):

IF   code_node.score >= 80  → Immediately invite for interview
ELIF code_node.score >= 60  → Add to candidate pool
ELIF code_node.score >= 40  → Send thank-you note
ELSE                        → Reject directly

In Dify workflows: each IF/ELIF branch connects to different subsequent node chains, forming true multi-path execution.

Technique for conditions involving arrays or objects:

# Pre-process complex conditions in a Code node
def main(analysis_result: dict) -> dict:
    score = analysis_result["score"]
    has_required_skills = all(
        skill in analysis_result.get("skills", [])
        for skill in ["Python", "Machine Learning"]
    )

    return {
        "final_tier": (
            "tier_1" if score >= 80 and has_required_skills else
            "tier_2" if score >= 60 else
            "tier_3"
        )
    }

Then branch on code_node.final_tier — avoid writing complex logic directly in IF/ELSE nodes.

2.2 Variable Lifecycle and Scope

Dify workflow variables have explicit scope rules:

Rule 1: Variables can only reference already-executed upstream nodes

Workflows execute according to DAG (directed acyclic graph) topological order. If node B hasn't executed yet, node C cannot reference node B's output.

Rule 2: Variables inside a branch are invisible outside that branch

IF branch A → LLM_A  (this node's output is only available inside branch A)
ELSE branch B → LLM_B (this node's output is only available inside branch B)
         ↓
Merge node (cannot directly reference LLM_A or LLM_B output)

Solution: At the end of each branch, normalize results to a standard variable name, then reference it at the merge node.

# Code node in branch A
def main(lm_a_output: str) -> dict:
    return {"result": lm_a_output, "branch": "A"}

# Code node in branch B
def main(lm_b_output: str) -> dict:
    return {"result": lm_b_output, "branch": "B"}

# Downstream node connects after both branches converge
# References code_a.result or code_b.result appropriately

Rule 3: Variables inside loop nodes are independent for each iteration

(Covered in detail in Chapter 10 on Loop nodes)

2.3 Workflow Trigger Methods in Depth

Method 1: Manual trigger in Dify interface

Method 2: API call trigger

# Synchronous call (wait for workflow to complete, get final output)
curl -X POST 'https://api.dify.ai/v1/workflows/run' \
  -H 'Authorization: Bearer YOUR_API_KEY' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {
      "resume_text": "John Smith, 5 years Python development experience...",
      "job_title": "Senior Backend Engineer"
    },
    "response_mode": "blocking",
    "user": "user-001"
  }'

Response:

{
  "workflow_run_id": "run-xxxxx",
  "task_id": "task-xxxxx",
  "data": {
    "status": "succeeded",
    "outputs": {
      "score": 85,
      "verdict": "recommend",
      "highlights": "Proficient in Python\nRich system design experience"
    },
    "elapsed_time": 3.24,
    "total_tokens": 1250
  }
}

Streaming call (real-time progress):

import requests
import json

def run_workflow_streaming(inputs: dict, api_key: str):
    response = requests.post(
        "https://api.dify.ai/v1/workflows/run",
        headers={"Authorization": f"Bearer {api_key}"},
        json={
            "inputs": inputs,
            "response_mode": "streaming",
            "user": "user-001"
        },
        stream=True
    )

    for line in response.iter_lines():
        if line.startswith(b"data: "):
            event = json.loads(line[6:])

            if event["event"] == "node_started":
                print(f"Node started: {event['data']['title']}")
            elif event["event"] == "node_finished":
                elapsed = event['data']['elapsed_time']
                print(f"Node finished: {event['data']['title']}, took {elapsed:.2f}s")
            elif event["event"] == "workflow_finished":
                print(f"Workflow complete. Outputs: {event['data']['outputs']}")
                break

Method 3: Webhook trigger

Configure the workflow's Webhook URL so external systems (GitHub, Slack, enterprise messaging) can automatically trigger the workflow when specific events occur.

Method 4: Scheduled trigger

Dify does not natively support scheduled triggers, but external schedulers (cron, Celery, n8n) can periodically call the workflow API:

# Trigger daily report workflow at 8 AM
# crontab: 0 8 * * * python /path/to/trigger_daily_report.py

import requests
from datetime import datetime

def trigger_daily_report():
    response = requests.post(
        "https://api.dify.ai/v1/workflows/run",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={
            "inputs": {
                "report_date": datetime.now().strftime("%Y-%m-%d"),
                "report_type": "daily_summary"
            },
            "response_mode": "blocking",
            "user": "scheduler"
        }
    )
    return response.json()

2.4 Workflow Input Validation and Error Handling

Input validation best practice:

Add a validation node (Code node) immediately after the Start node to check input validity:

def main(resume_text: str, job_title: str) -> dict:
    errors = []

    if not resume_text or len(resume_text.strip()) < 100:
        errors.append("Resume content too short (minimum 100 characters)")

    if len(resume_text) > 10000:
        errors.append("Resume content too long (maximum 10,000 characters)")

    if not job_title or len(job_title.strip()) == 0:
        errors.append("Job title cannot be empty")

    return {
        "is_valid": len(errors) == 0,
        "error_message": "; ".join(errors) if errors else "",
        "resume_length": len(resume_text)
    }

Then use conditional branching:

Workflow-level error handling (Dify v0.10+):

Enable "Exception Handling" in workflow settings to configure per-node behavior:


Level 3: Source Code and Principles (5+ Years Experience)

3.1 Workflow Execution Engine: DAG Scheduling Principles

Dify's workflow execution is based on topological sorting of a DAG (Directed Acyclic Graph). Core data structure:

@dataclass
class WorkflowGraph:
    nodes: dict[str, WorkflowNode]  # node_id -> node
    edges: list[tuple[str, str]]    # (source_node_id, target_node_id)

    def get_execution_order(self) -> list[str]:
        """Topological sort to determine node execution order"""
        in_degree = {node_id: 0 for node_id in self.nodes}

        for source, target in self.edges:
            in_degree[target] += 1

        # BFS topological sort
        queue = [nid for nid, deg in in_degree.items() if deg == 0]
        order = []

        while queue:
            node_id = queue.pop(0)
            order.append(node_id)

            for source, target in self.edges:
                if source == node_id:
                    in_degree[target] -= 1
                    if in_degree[target] == 0:
                        queue.append(target)

        return order

Key source locations:

Execution engine core logic (simplified):

class GraphEngine:
    def run(self, graph: WorkflowGraph, inputs: dict) -> dict:
        # Initialize variable pool
        variable_pool = VariablePool(start_variables=inputs)

        # Determine execution path (accounting for conditional branches)
        execution_queue = [graph.start_node_id]
        executed = set()

        while execution_queue:
            node_id = execution_queue.pop(0)
            node = graph.nodes[node_id]

            # Execute node
            result = node.run(variable_pool)
            variable_pool.add(node_id, result.outputs)
            executed.add(node_id)

            # Determine next nodes based on execution result
            next_nodes = self._get_next_nodes(
                graph, node_id, result, variable_pool
            )
            execution_queue.extend(next_nodes)

        return variable_pool.get_final_outputs()

3.2 Variable Pool Memory Model

The VariablePool in Dify workflows is a dictionary-based in-memory data structure:

class VariablePool:
    def __init__(self, start_variables: dict):
        # Structure: {node_id: {variable_name: value}}
        self._pool = {
            "sys": {  # System variables
                "workflow_id": "xxx",
                "run_id": "yyy",
                "user_id": "zzz"
            },
            "start": start_variables  # Start node inputs
        }

    def get(self, node_id: str, variable_name: str) -> any:
        """Get variable value"""
        return self._pool.get(node_id, {}).get(variable_name)

    def get_any(self, selector: list[str]) -> any:
        """Get variable via selector path"""
        # selector = ["llm_1", "text"] corresponds to {{llm_1.text}}
        node_id, *path = selector
        value = self._pool.get(node_id, {})
        for key in path:
            if isinstance(value, dict):
                value = value.get(key)
        return value

Variable type conversion: When variables pass between nodes, Dify performs type checking and conversion. If an LLM outputs a string and a downstream Code node expects an integer, automatic conversion is attempted; failure raises a VariableTypeError.

3.3 Condition Evaluation Mechanism

The IF/ELSE node's condition evaluation is based on a rule engine:

class ConditionEvaluator:
    OPERATORS = {
        "eq": lambda a, b: a == b,
        "neq": lambda a, b: a != b,
        "gt": lambda a, b: float(a) > float(b),
        "gte": lambda a, b: float(a) >= float(b),
        "lt": lambda a, b: float(a) < float(b),
        "lte": lambda a, b: float(a) <= float(b),
        "contains": lambda a, b: str(b) in str(a),
        "not_contains": lambda a, b: str(b) not in str(a),
        "starts_with": lambda a, b: str(a).startswith(str(b)),
        "ends_with": lambda a, b: str(a).endswith(str(b)),
        "is_empty": lambda a, _: not a or str(a).strip() == "",
        "is_not_empty": lambda a, _: bool(a) and str(a).strip() != "",
    }

    def evaluate(self, condition: Condition, variable_pool: VariablePool) -> bool:
        left_value = variable_pool.get_any(condition.left_selector)
        evaluator = self.OPERATORS.get(condition.operator)
        if not evaluator:
            raise ValueError(f"Unknown operator: {condition.operator}")
        return evaluator(left_value, condition.right_value)

    def evaluate_group(self, group: ConditionGroup, variable_pool) -> bool:
        results = [self.evaluate(c, variable_pool) for c in group.conditions]
        if group.logic == "AND":
            return all(results)
        elif group.logic == "OR":
            return any(results)

Important limitation: Condition evaluation runs at the Python layer and does not support regular expressions. Complex string matching should be converted to boolean values in a Code node first, then use eq: true for the conditional check.


Level 4: Production Pitfalls and Decision-Making (Expert Perspective)

4.1 Pitfall 1: Silent Errors from Variable Name Conflicts

Problem: When multiple nodes output variables with the same name, the later node's variable silently overwrites the earlier one.

Wrong example:

Correct approach:

4.2 Pitfall 2: LLM JSON Parsing Failures

Frequent problem: LLM is asked to output JSON but actually outputs content with markdown code blocks or extra text:

# Actual LLM output (not pure JSON):
Sure, here is the analysis:
```json
{"score": 85, "verdict": "recommend"}

I hope this helps!


Direct `json.loads()` will fail.

**Robust JSON extraction function**:

```python
import json
import re

def extract_json(llm_output: str) -> dict:
    """Extract JSON from LLM output, handling various formats"""
    # Method 1: Direct parse (ideal case)
    try:
        return json.loads(llm_output.strip())
    except json.JSONDecodeError:
        pass

    # Method 2: Extract JSON from markdown code block
    code_block_pattern = r'```(?:json)?\s*([\s\S]*?)\s*```'
    matches = re.findall(code_block_pattern, llm_output)
    if matches:
        try:
            return json.loads(matches[0])
        except json.JSONDecodeError:
            pass

    # Method 3: Find first complete JSON object with regex
    json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
    matches = re.findall(json_pattern, llm_output, re.DOTALL)
    for match in matches:
        try:
            return json.loads(match)
        except json.JSONDecodeError:
            continue

    raise ValueError(f"Cannot extract JSON from LLM output: {llm_output[:200]}")

Root solution: Use models that support Structured Output (GPT-4o, Claude 3.5 Sonnet), and enable JSON Schema output mode in the Dify node configuration — eliminating format errors at the source.

4.3 Pitfall 3: Unhandled Workflow Timeouts

Problem: A node in the workflow (usually an external HTTP request or large file processing) times out, the entire workflow hangs, and the API caller waits indefinitely.

Dify timeout configuration:

Client-side timeout handling:

import requests
from requests.exceptions import Timeout

def safe_run_workflow(inputs: dict, timeout: int = 60) -> dict:
    try:
        response = requests.post(
            "https://api.dify.ai/v1/workflows/run",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={"inputs": inputs, "response_mode": "blocking"},
            timeout=timeout  # Client-side timeout
        )
        return response.json()

    except Timeout:
        return {
            "status": "timeout",
            "error": f"Workflow timed out (>{timeout}s), please try again later"
        }
    except Exception as e:
        return {"status": "error", "error": str(e)}

Async processing approach: For workflows that take more than 60 seconds, use streaming mode so the caller receives real-time progress rather than waiting blindly for the final result.

4.4 Complexity Control: When Should You Split a Workflow?

Don't try to do everything in one workflow. Split when:

Consider splitting when exceeding these thresholds:

How to split: Dify supports calling another workflow from within a workflow (sub-workflow call), enabling modular reuse.

Main workflow:
  Start → Validate → Call Sub-workflow A → Call Sub-workflow B → Merge output → End

Sub-workflow A (Resume Analysis):
  Complete independent analysis process

Sub-workflow B (Email Generation):
  Complete independent email generation process

This keeps the main workflow clean while sub-workflows can be independently tested and iterated.


Chapter Summary

Workflows are Dify's most flexible capability; mastering them requires understanding several core concepts:

Node selection principle: Use Code nodes for data processing (precise, testable); LLM nodes for semantic understanding; HTTP nodes for external integration; Knowledge Retrieval nodes for search.

Variable system: Use meaningful names, avoid generic ones (result, output); remember that variables inside a branch are invisible outside it.

Conditional branching: Convert complex conditions to boolean values in Code nodes first, then use IF/ELSE nodes; avoid overly complex condition expressions directly in IF/ELSE nodes.

Trigger methods: Use UI for development/testing; API calls for production integration; streaming mode for real-time scenarios.

Key checklist:

Rate this chapter
4.7  / 5  (37 ratings)

💬 Comments