第 9 章

工作流基础:节点类型、变量系统与条件分支

第9章:工作流基础——节点类型、变量系统与条件分支

工作流是 Dify 最强大的功能之一;掌握节点类型、变量传递和条件分支,是构建复杂 AI 自动化系统的基础。

本章导读

如果说对话型应用(Chatbot)是让 AI 回答问题,那工作流(Workflow)就是让 AI 执行任务。两者的本质区别在于:对话是单轮或多轮的交互,而工作流是有序的、可分支的、可组合的自动化流程。

想象这样一个场景:用户提交一份简历,系统要自动完成:提取候选人关键信息 → 对照岗位要求评分 → 如果评分合格则生成面试邀请邮件,否则生成婉拒邮件 → 发送邮件。这个流程涉及多步骤、有条件判断、需要集成外部服务,正是工作流的用武之地。

本章将系统讲解:


Level 1:基础认知(1-3 年经验)

1.1 工作流 vs 对话型应用:什么时候用哪个?

特性 对话型应用(Chatbot) 工作流(Workflow)
交互模式 多轮对话 单次执行(有明确开始和结束)
用户感知 实时对话界面 通常后台运行,返回结果
适合场景 问答、咨询、陪伴 批处理、自动化、数据处理
错误处理 模型自行解释错误 可以设计 try/catch 逻辑
输入输出 文本对话 结构化输入/输出,多格式支持

选工作流的典型场景

1.2 Dify 工作流的核心节点类型

开始节点(Start Node) 每个工作流必须有且只有一个开始节点。定义工作流的输入参数:

LLM 节点(LLM Node) 调用语言模型处理文本的核心节点:

知识库节点(Knowledge Retrieval Node) 从 Dify 知识库中检索相关内容:

条件分支节点(IF/ELSE Node) 根据条件决定工作流走向:

代码节点(Code Node) 执行 Python 或 JavaScript 代码:

HTTP 请求节点(HTTP Request Node) 调用外部 API:

结束节点(End Node) 定义工作流的输出:

1.3 变量系统:数据在节点间流动

变量引用语法{{节点名.输出变量名}}

例如:

在 LLM 节点提示词中使用变量

你是一个简历分析助手。

候选人信息:
{{start.resume_text}}

岗位要求:
{{start.job_requirements}}

请分析候选人是否符合岗位要求,输出以下JSON格式:
{
  "score": 0-100的整数评分,
  "strengths": ["优势1", "优势2"],
  "gaps": ["不足1", "不足2"],
  "recommendation": "推荐/不推荐"
}

变量类型

1.4 构建第一个工作流:简历分析器

目标:用户上传简历文本 → AI 分析 → 输出评分和建议

步骤

  1. 新建工作流:Dify → 工作室 → 创建应用 → 工作流

  2. 配置开始节点

    • 输入变量 1:resume_text(段落类型,必填)
    • 输入变量 2:job_title(文本类型,必填)
  3. 添加 LLM 节点

    • 选择模型(如 GPT-4o mini)
    • 系统提示词:你是一个专业的HR助手,擅长简历分析
    • 用户提示词:
      请分析以下简历,评估候选人是否适合"{{start.job_title}}"职位。
      
      简历内容:
      {{start.resume_text}}
      
      输出 JSON 格式(只输出 JSON,不要其他文字):
      {"score": 数字, "highlights": ["亮点"], "concerns": ["不足"], "verdict": "recommend/reject"}
      
  4. 添加代码节点(解析 LLM 输出的 JSON):

    import json
    
    def main(llm_output: str) -> dict:
        # 清理可能的 markdown 代码块
        clean = llm_output.strip()
        if clean.startswith("```"):
            clean = clean.split("```")[1]
            if clean.startswith("json"):
                clean = clean[4:]
    
        result = json.loads(clean.strip())
        return {
            "score": result["score"],
            "verdict": result["verdict"],
            "highlights": "\n".join(result["highlights"]),
            "concerns": "\n".join(result["concerns"])
        }
    
  5. 添加结束节点:输出 score、verdict、highlights、concerns

  6. 测试:点击右上角「运行」,填入测试数据


Level 2:机制深解(3-5 年经验)

2.1 条件分支的高级用法

基础条件(单条件判断):

code_node.score > 60 → 走"合格"分支
                      否则 → 走"不合格"分支

复合条件(AND 逻辑)

code_node.score > 60 AND start.years_of_experience > 3
→ 走"高质量候选人"分支

多分支(IF / ELSE IF / ELSE)

IF   code_node.score >= 80  → 立即邀请面试
ELIF code_node.score >= 60  → 放入候选池
ELIF code_node.score >= 40  → 发送感谢信
ELSE                        → 直接拒绝

在 Dify 工作流中实现:每个 IF/ELIF 分支可以连接到不同的后续节点链,形成真正的多路径执行。

条件涉及数组或对象时的技巧

# 在代码节点中预处理复杂条件
def main(analysis_result: dict) -> dict:
    score = analysis_result["score"]
    has_required_skills = all(
        skill in analysis_result["skills"]
        for skill in ["Python", "Machine Learning"]
    )
    
    return {
        "final_tier": (
            "tier_1" if score >= 80 and has_required_skills else
            "tier_2" if score >= 60 else
            "tier_3"
        )
    }

然后基于 code_node.final_tier 做条件分支,避免在 IF/ELSE 节点中写复杂逻辑。

2.2 变量的生命周期与作用域

Dify 工作流中的变量有明确的作用域规则:

规则1:变量只能引用已执行的上游节点

工作流按 DAG(有向无环图)顺序执行。如果节点 B 还没有执行,节点 C 就不能引用节点 B 的输出。

规则2:分支内的节点在分支外不可见

IF 分支 A → LLM_A(此节点的输出变量只在分支A内可用)
ELSE 分支 B → LLM_B(此节点的输出变量只在分支B内可用)
         ↓
合并节点(无法直接引用 LLM_A 或 LLM_B 的输出)

解决方案:在分支末尾将结果统一到一个标准变量名,然后在合并节点引用。

# 分支 A 的代码节点
def main(lm_a_output: str) -> dict:
    return {"result": lm_a_output, "branch": "A"}

# 分支 B 的代码节点  
def main(lm_b_output: str) -> dict:
    return {"result": lm_b_output, "branch": "B"}

# 合并节点可以引用 code_a.result 或 code_b.result
# 但需要在两个分支之后连接同一个下游节点

规则3:循环节点内的变量在每次迭代中独立

(详见第 10 章循环节点)

2.3 工作流的触发方式详解

方式1:Dify 界面手动触发

方式2:API 调用触发

# 同步调用(等待工作流完成,获取最终输出)
curl -X POST 'https://api.dify.ai/v1/workflows/run' \
  -H 'Authorization: Bearer YOUR_API_KEY' \
  -H 'Content-Type: application/json' \
  -d '{
    "inputs": {
      "resume_text": "张三,5年Python开发经验...",
      "job_title": "高级后端工程师"
    },
    "response_mode": "blocking",
    "user": "user-001"
  }'

响应:

{
  "workflow_run_id": "run-xxxxx",
  "task_id": "task-xxxxx",
  "data": {
    "status": "succeeded",
    "outputs": {
      "score": 85,
      "verdict": "recommend",
      "highlights": "Python 熟练\n系统设计经验丰富"
    },
    "elapsed_time": 3.24,
    "total_tokens": 1250
  }
}

流式调用(streaming,实时显示进度)

import requests
import json

def run_workflow_streaming(inputs: dict, api_key: str):
    response = requests.post(
        "https://api.dify.ai/v1/workflows/run",
        headers={"Authorization": f"Bearer {api_key}"},
        json={
            "inputs": inputs,
            "response_mode": "streaming",
            "user": "user-001"
        },
        stream=True
    )
    
    for line in response.iter_lines():
        if line.startswith(b"data: "):
            event = json.loads(line[6:])
            
            if event["event"] == "node_started":
                print(f"节点开始: {event['data']['title']}")
            elif event["event"] == "node_finished":
                print(f"节点完成: {event['data']['title']}, 耗时: {event['data']['elapsed_time']:.2f}s")
            elif event["event"] == "workflow_finished":
                print(f"工作流完成,输出: {event['data']['outputs']}")
                break

方式3:Webhook 触发

配置工作流的 Webhook URL,外部系统(GitHub、企业微信、Slack 等)可以在特定事件发生时自动触发工作流。

方式4:定时触发(计划任务)

Dify 本身不支持定时触发,但可以通过外部定时任务(cron、Celery、n8n)定期调用工作流 API。

# 每天早上 8 点运行报告生成工作流
# crontab: 0 8 * * * python /path/to/trigger_daily_report.py

import requests
from datetime import datetime

def trigger_daily_report():
    response = requests.post(
        "https://api.dify.ai/v1/workflows/run",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={
            "inputs": {
                "report_date": datetime.now().strftime("%Y-%m-%d"),
                "report_type": "daily_summary"
            },
            "response_mode": "blocking",
            "user": "scheduler"
        }
    )
    return response.json()

2.4 工作流输入验证与错误处理

输入验证最佳实践

在开始节点之后立即添加一个验证节点(代码节点),检查输入的合法性:

def main(
    resume_text: str,
    job_title: str
) -> dict:
    errors = []
    
    if not resume_text or len(resume_text.strip()) < 100:
        errors.append("简历内容太短(至少100字)")
    
    if len(resume_text) > 10000:
        errors.append("简历内容过长(不超过10000字)")
    
    if not job_title or len(job_title.strip()) == 0:
        errors.append("职位名称不能为空")
    
    return {
        "is_valid": len(errors) == 0,
        "error_message": ";".join(errors) if errors else "",
        "resume_length": len(resume_text)
    }

然后用条件分支:

工作流级别的错误处理(Dify v0.10+):

在工作流设置中启用「异常处理」,可以为每个节点配置:


Level 3:源码与原理(5 年以上)

3.1 工作流执行引擎:DAG 调度原理

Dify 的工作流执行是基于 DAG(有向无环图)的拓扑排序调度。核心数据结构:

# 简化的工作流图结构
@dataclass
class WorkflowGraph:
    nodes: dict[str, WorkflowNode]  # node_id -> node
    edges: list[tuple[str, str]]    # (source_node_id, target_node_id)
    
    def get_execution_order(self) -> list[str]:
        """拓扑排序,确定节点执行顺序"""
        in_degree = {node_id: 0 for node_id in self.nodes}
        
        for source, target in self.edges:
            in_degree[target] += 1
        
        # BFS 拓扑排序
        queue = [nid for nid, deg in in_degree.items() if deg == 0]
        order = []
        
        while queue:
            node_id = queue.pop(0)
            order.append(node_id)
            
            for source, target in self.edges:
                if source == node_id:
                    in_degree[target] -= 1
                    if in_degree[target] == 0:
                        queue.append(target)
        
        return order

关键源码位置

执行器核心逻辑(简化):

class GraphEngine:
    def run(self, graph: WorkflowGraph, inputs: dict) -> dict:
        # 初始化变量池
        variable_pool = VariablePool(start_variables=inputs)
        
        # 确定执行路径(考虑条件分支)
        execution_queue = [graph.start_node_id]
        executed = set()
        
        while execution_queue:
            node_id = execution_queue.pop(0)
            node = graph.nodes[node_id]
            
            # 执行节点
            result = node.run(variable_pool)
            variable_pool.add(node_id, result.outputs)
            executed.add(node_id)
            
            # 根据执行结果确定后续节点
            next_nodes = self._get_next_nodes(
                graph, node_id, result, variable_pool
            )
            execution_queue.extend(next_nodes)
        
        return variable_pool.get_final_outputs()

3.2 变量池的内存模型

Dify 工作流中的变量池(VariablePool)是一个基于字典的内存数据结构:

class VariablePool:
    def __init__(self, start_variables: dict):
        # 结构:{node_id: {variable_name: value}}
        self._pool = {
            "sys": {  # 系统变量
                "workflow_id": "xxx",
                "run_id": "yyy",
                "user_id": "zzz"
            },
            "start": start_variables  # 开始节点输入
        }
    
    def get(self, node_id: str, variable_name: str) -> any:
        """获取变量值,支持点分语法"""
        return self._pool.get(node_id, {}).get(variable_name)
    
    def get_any(self, selector: list[str]) -> any:
        """通过 selector 路径获取变量"""
        # selector = ["llm_1", "text"] 对应 {{llm_1.text}}
        node_id, *path = selector
        value = self._pool.get(node_id, {})
        for key in path:
            if isinstance(value, dict):
                value = value.get(key)
        return value

变量类型转换:当变量从一个节点传到另一个节点时,Dify 会做类型检查和转换。如果 LLM 输出的是字符串,而下游代码节点期望整数,会自动尝试转换;失败则抛出 VariableTypeError

3.3 条件分支的评估机制

IF/ELSE 节点的条件评估基于一个规则引擎:

class ConditionEvaluator:
    OPERATORS = {
        "eq": lambda a, b: a == b,
        "neq": lambda a, b: a != b,
        "gt": lambda a, b: float(a) > float(b),
        "gte": lambda a, b: float(a) >= float(b),
        "lt": lambda a, b: float(a) < float(b),
        "lte": lambda a, b: float(a) <= float(b),
        "contains": lambda a, b: str(b) in str(a),
        "not_contains": lambda a, b: str(b) not in str(a),
        "starts_with": lambda a, b: str(a).startswith(str(b)),
        "ends_with": lambda a, b: str(a).endswith(str(b)),
        "is_empty": lambda a, _: not a or str(a).strip() == "",
        "is_not_empty": lambda a, _: bool(a) and str(a).strip() != "",
    }
    
    def evaluate(self, condition: Condition, variable_pool: VariablePool) -> bool:
        left_value = variable_pool.get_any(condition.left_selector)
        
        evaluator = self.OPERATORS.get(condition.operator)
        if not evaluator:
            raise ValueError(f"Unknown operator: {condition.operator}")
        
        return evaluator(left_value, condition.right_value)
    
    def evaluate_group(self, group: ConditionGroup, variable_pool) -> bool:
        results = [self.evaluate(c, variable_pool) for c in group.conditions]
        
        if group.logic == "AND":
            return all(results)
        elif group.logic == "OR":
            return any(results)

重要限制:条件评估是在 Python 层面进行的,不支持正则表达式(需要在代码节点中处理)。复杂的字符串匹配逻辑应该先在代码节点中转换为布尔值,再用 eq: true 做条件判断。


Level 4:生产陷阱与决策(专家视角)

4.1 陷阱一:变量命名冲突导致的静默错误

问题:多个节点的输出包含同名变量时,后面节点的同名变量会覆盖前面的,导致意外行为。

错误示例

正确做法

4.2 陷阱二:LLM 输出 JSON 解析失败

高频问题:LLM 被要求输出 JSON,但实际输出了带 markdown 代码块或额外文字的内容:

# LLM 实际输出(不是纯 JSON):
当然,以下是分析结果:
```json
{"score": 85, "verdict": "recommend"}

希望对您有帮助!


直接 `json.loads()` 会报错。

**健壮的 JSON 提取函数**:

```python
import json
import re

def extract_json(llm_output: str) -> dict:
    """从 LLM 输出中提取 JSON,处理各种格式"""
    # 方法1:直接解析(最理想情况)
    try:
        return json.loads(llm_output.strip())
    except json.JSONDecodeError:
        pass
    
    # 方法2:提取 markdown 代码块中的 JSON
    code_block_pattern = r'```(?:json)?\s*([\s\S]*?)\s*```'
    matches = re.findall(code_block_pattern, llm_output)
    if matches:
        try:
            return json.loads(matches[0])
        except json.JSONDecodeError:
            pass
    
    # 方法3:用正则找第一个完整的 JSON 对象
    json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
    matches = re.findall(json_pattern, llm_output, re.DOTALL)
    for match in matches:
        try:
            return json.loads(match)
        except json.JSONDecodeError:
            continue
    
    raise ValueError(f"无法从 LLM 输出中提取 JSON: {llm_output[:200]}")

根本解决方案:使用支持 Structured Output 的模型(GPT-4o、Claude 3.5 Sonnet),在 Dify 节点配置中开启 JSON Schema 输出模式,从根本上避免格式错误。

4.3 陷阱三:工作流超时未处理

问题:工作流中某个节点(通常是外部 HTTP 请求或大文件处理)超时,整个工作流挂起,API 调用方长时间等待无响应。

Dify 的超时配置

客户端超时处理

import requests
from requests.exceptions import Timeout

def safe_run_workflow(inputs: dict, timeout: int = 60) -> dict:
    try:
        response = requests.post(
            "https://api.dify.ai/v1/workflows/run",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={"inputs": inputs, "response_mode": "blocking"},
            timeout=timeout  # 客户端超时设置
        )
        return response.json()
    
    except Timeout:
        return {
            "status": "timeout",
            "error": f"工作流超时(>{timeout}s),请稍后重试"
        }
    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

异步处理方案:对于耗时超过 60s 的工作流,使用流式模式(streaming),让调用方实时接收进度,而不是傻等最终结果。

4.4 工作流复杂度控制:什么时候该拆分工作流?

一个工作流不要试图做所有事情。拆分判断标准:

超过以下情况时应考虑拆分

拆分方式:Dify 支持在工作流中调用另一个工作流(子工作流调用),实现模块化复用。

主工作流:
  开始 → 数据验证 → 调用子工作流A → 调用子工作流B → 汇总输出 → 结束

子工作流A(简历分析):
  独立的完整分析流程

子工作流B(邮件生成):
  独立的邮件生成流程

这样主工作流清晰,子工作流可独立测试和迭代。


本章小结

工作流是 Dify 中最灵活的能力,掌握它需要理解几个核心概念:

节点选择原则:数据处理用代码节点(精确、可测试);语义理解用 LLM 节点;外部集成用 HTTP 节点;检索用知识库节点。

变量系统:用有意义的名称,避免通用名(resultoutput);记住分支内的变量在分支外不可见。

条件分支:复杂条件先在代码节点中转换为布尔值,再在 IF/ELSE 节点中使用;避免在 IF/ELSE 节点中写过于复杂的条件表达式。

触发方式:开发测试用界面手动触发;生产集成用 API 调用;实时场景用流式模式。

关键清单

本章评分
4.7  / 5  (37 评分)

💬 留言讨论