工作流基础:节点类型、变量系统与条件分支
第9章:工作流基础——节点类型、变量系统与条件分支
工作流是 Dify 最强大的功能之一;掌握节点类型、变量传递和条件分支,是构建复杂 AI 自动化系统的基础。
本章导读
如果说对话型应用(Chatbot)是让 AI 回答问题,那工作流(Workflow)就是让 AI 执行任务。两者的本质区别在于:对话是单轮或多轮的交互,而工作流是有序的、可分支的、可组合的自动化流程。
想象这样一个场景:用户提交一份简历,系统要自动完成:提取候选人关键信息 → 对照岗位要求评分 → 如果评分合格则生成面试邀请邮件,否则生成婉拒邮件 → 发送邮件。这个流程涉及多步骤、有条件判断、需要集成外部服务,正是工作流的用武之地。
本章将系统讲解:
- Dify 工作流的核心节点类型及各自用途
- 变量系统:如何在节点间传递和转换数据
- 条件分支:IF/ELSE 逻辑的正确使用方式
- 工作流的触发方式:手动、API 调用、事件驱动
Level 1:基础认知(1-3 年经验)
1.1 工作流 vs 对话型应用:什么时候用哪个?
| 特性 | 对话型应用(Chatbot) | 工作流(Workflow) |
|---|---|---|
| 交互模式 | 多轮对话 | 单次执行(有明确开始和结束) |
| 用户感知 | 实时对话界面 | 通常后台运行,返回结果 |
| 适合场景 | 问答、咨询、陪伴 | 批处理、自动化、数据处理 |
| 错误处理 | 模型自行解释错误 | 可以设计 try/catch 逻辑 |
| 输入输出 | 文本对话 | 结构化输入/输出,多格式支持 |
选工作流的典型场景:
- 文档自动分析和摘要生成
- 简历/合同/发票的信息提取
- 多步骤内容生产(调研 → 写作 → 审核)
- 与外部系统的数据同步(CRM、数据库、邮件)
1.2 Dify 工作流的核心节点类型
开始节点(Start Node) 每个工作流必须有且只有一个开始节点。定义工作流的输入参数:
- 文本(Text):字符串输入
- 数字(Number):整数或小数
- 段落(Paragraph):长文本,支持换行
- 下拉选择(Select):从预定义选项中选择
- 文件(File):上传文件(PDF、图片等)
LLM 节点(LLM Node) 调用语言模型处理文本的核心节点:
- 配置模型和参数(Temperature、Max Tokens 等)
- 编写系统提示词和用户提示词
- 可以在提示词中引用其他节点的输出变量
知识库节点(Knowledge Retrieval Node) 从 Dify 知识库中检索相关内容:
- 选择要查询的知识库(可多选)
- 配置检索策略(向量/全文/混合)
- 输出:检索到的文档片段列表
条件分支节点(IF/ELSE Node) 根据条件决定工作流走向:
- 支持多种比较运算(等于、包含、大于等)
- 支持 AND/OR 逻辑组合
- 可以有多个 IF 分支 + 一个 ELSE 分支
代码节点(Code Node) 执行 Python 或 JavaScript 代码:
- 适合数据转换、格式化、计算
- 可以调用标准库
- 不能访问外部网络(安全限制)
HTTP 请求节点(HTTP Request Node) 调用外部 API:
- 支持 GET/POST/PUT/DELETE 等方法
- 可配置请求头、请求体
- 响应自动解析为 JSON 或文本
结束节点(End Node) 定义工作流的输出:
- 指定哪些变量作为最终输出
- 工作流可以有多个结束节点(不同分支各自的结束)
1.3 变量系统:数据在节点间流动
变量引用语法:{{节点名.输出变量名}}
例如:
{{start.user_query}}— 开始节点的 user_query 输入{{llm_1.text}}— 名为 "llm_1" 的 LLM 节点的输出文本{{knowledge_1.result}}— 知识库节点的检索结果
在 LLM 节点提示词中使用变量:
你是一个简历分析助手。
候选人信息:
{{start.resume_text}}
岗位要求:
{{start.job_requirements}}
请分析候选人是否符合岗位要求,输出以下JSON格式:
{
"score": 0-100的整数评分,
"strengths": ["优势1", "优势2"],
"gaps": ["不足1", "不足2"],
"recommendation": "推荐/不推荐"
}
变量类型:
- String:文本
- Number:数字
- Boolean:true/false
- Object:JSON 对象(从代码节点或 HTTP 节点获得)
- Array:数组
- File:文件对象
1.4 构建第一个工作流:简历分析器
目标:用户上传简历文本 → AI 分析 → 输出评分和建议
步骤:
-
新建工作流:Dify → 工作室 → 创建应用 → 工作流
-
配置开始节点:
- 输入变量 1:
resume_text(段落类型,必填) - 输入变量 2:
job_title(文本类型,必填)
- 输入变量 1:
-
添加 LLM 节点:
- 选择模型(如 GPT-4o mini)
- 系统提示词:
你是一个专业的HR助手,擅长简历分析 - 用户提示词:
请分析以下简历,评估候选人是否适合"{{start.job_title}}"职位。 简历内容: {{start.resume_text}} 输出 JSON 格式(只输出 JSON,不要其他文字): {"score": 数字, "highlights": ["亮点"], "concerns": ["不足"], "verdict": "recommend/reject"}
-
添加代码节点(解析 LLM 输出的 JSON):
import json def main(llm_output: str) -> dict: # 清理可能的 markdown 代码块 clean = llm_output.strip() if clean.startswith("```"): clean = clean.split("```")[1] if clean.startswith("json"): clean = clean[4:] result = json.loads(clean.strip()) return { "score": result["score"], "verdict": result["verdict"], "highlights": "\n".join(result["highlights"]), "concerns": "\n".join(result["concerns"]) } -
添加结束节点:输出 score、verdict、highlights、concerns
-
测试:点击右上角「运行」,填入测试数据
Level 2:机制深解(3-5 年经验)
2.1 条件分支的高级用法
基础条件(单条件判断):
code_node.score > 60 → 走"合格"分支
否则 → 走"不合格"分支
复合条件(AND 逻辑):
code_node.score > 60 AND start.years_of_experience > 3
→ 走"高质量候选人"分支
多分支(IF / ELSE IF / ELSE):
IF code_node.score >= 80 → 立即邀请面试
ELIF code_node.score >= 60 → 放入候选池
ELIF code_node.score >= 40 → 发送感谢信
ELSE → 直接拒绝
在 Dify 工作流中实现:每个 IF/ELIF 分支可以连接到不同的后续节点链,形成真正的多路径执行。
条件涉及数组或对象时的技巧:
# 在代码节点中预处理复杂条件
def main(analysis_result: dict) -> dict:
score = analysis_result["score"]
has_required_skills = all(
skill in analysis_result["skills"]
for skill in ["Python", "Machine Learning"]
)
return {
"final_tier": (
"tier_1" if score >= 80 and has_required_skills else
"tier_2" if score >= 60 else
"tier_3"
)
}
然后基于 code_node.final_tier 做条件分支,避免在 IF/ELSE 节点中写复杂逻辑。
2.2 变量的生命周期与作用域
Dify 工作流中的变量有明确的作用域规则:
规则1:变量只能引用已执行的上游节点
工作流按 DAG(有向无环图)顺序执行。如果节点 B 还没有执行,节点 C 就不能引用节点 B 的输出。
规则2:分支内的节点在分支外不可见
IF 分支 A → LLM_A(此节点的输出变量只在分支A内可用)
ELSE 分支 B → LLM_B(此节点的输出变量只在分支B内可用)
↓
合并节点(无法直接引用 LLM_A 或 LLM_B 的输出)
解决方案:在分支末尾将结果统一到一个标准变量名,然后在合并节点引用。
# 分支 A 的代码节点
def main(lm_a_output: str) -> dict:
return {"result": lm_a_output, "branch": "A"}
# 分支 B 的代码节点
def main(lm_b_output: str) -> dict:
return {"result": lm_b_output, "branch": "B"}
# 合并节点可以引用 code_a.result 或 code_b.result
# 但需要在两个分支之后连接同一个下游节点
规则3:循环节点内的变量在每次迭代中独立
(详见第 10 章循环节点)
2.3 工作流的触发方式详解
方式1:Dify 界面手动触发
- 适合:测试、内部工具
- 在工作流编辑器中点击"运行",填写输入参数
方式2:API 调用触发
# 同步调用(等待工作流完成,获取最终输出)
curl -X POST 'https://api.dify.ai/v1/workflows/run' \
-H 'Authorization: Bearer YOUR_API_KEY' \
-H 'Content-Type: application/json' \
-d '{
"inputs": {
"resume_text": "张三,5年Python开发经验...",
"job_title": "高级后端工程师"
},
"response_mode": "blocking",
"user": "user-001"
}'
响应:
{
"workflow_run_id": "run-xxxxx",
"task_id": "task-xxxxx",
"data": {
"status": "succeeded",
"outputs": {
"score": 85,
"verdict": "recommend",
"highlights": "Python 熟练\n系统设计经验丰富"
},
"elapsed_time": 3.24,
"total_tokens": 1250
}
}
流式调用(streaming,实时显示进度):
import requests
import json
def run_workflow_streaming(inputs: dict, api_key: str):
response = requests.post(
"https://api.dify.ai/v1/workflows/run",
headers={"Authorization": f"Bearer {api_key}"},
json={
"inputs": inputs,
"response_mode": "streaming",
"user": "user-001"
},
stream=True
)
for line in response.iter_lines():
if line.startswith(b"data: "):
event = json.loads(line[6:])
if event["event"] == "node_started":
print(f"节点开始: {event['data']['title']}")
elif event["event"] == "node_finished":
print(f"节点完成: {event['data']['title']}, 耗时: {event['data']['elapsed_time']:.2f}s")
elif event["event"] == "workflow_finished":
print(f"工作流完成,输出: {event['data']['outputs']}")
break
方式3:Webhook 触发
配置工作流的 Webhook URL,外部系统(GitHub、企业微信、Slack 等)可以在特定事件发生时自动触发工作流。
方式4:定时触发(计划任务)
Dify 本身不支持定时触发,但可以通过外部定时任务(cron、Celery、n8n)定期调用工作流 API。
# 每天早上 8 点运行报告生成工作流
# crontab: 0 8 * * * python /path/to/trigger_daily_report.py
import requests
from datetime import datetime
def trigger_daily_report():
response = requests.post(
"https://api.dify.ai/v1/workflows/run",
headers={"Authorization": f"Bearer {API_KEY}"},
json={
"inputs": {
"report_date": datetime.now().strftime("%Y-%m-%d"),
"report_type": "daily_summary"
},
"response_mode": "blocking",
"user": "scheduler"
}
)
return response.json()
2.4 工作流输入验证与错误处理
输入验证最佳实践:
在开始节点之后立即添加一个验证节点(代码节点),检查输入的合法性:
def main(
resume_text: str,
job_title: str
) -> dict:
errors = []
if not resume_text or len(resume_text.strip()) < 100:
errors.append("简历内容太短(至少100字)")
if len(resume_text) > 10000:
errors.append("简历内容过长(不超过10000字)")
if not job_title or len(job_title.strip()) == 0:
errors.append("职位名称不能为空")
return {
"is_valid": len(errors) == 0,
"error_message": ";".join(errors) if errors else "",
"resume_length": len(resume_text)
}
然后用条件分支:
validation.is_valid == true→ 继续正常流程validation.is_valid == false→ 直接结束,输出错误信息
工作流级别的错误处理(Dify v0.10+):
在工作流设置中启用「异常处理」,可以为每个节点配置:
- 出错时继续执行(跳过该节点)
- 出错时停止整个工作流
- 出错时走备用分支
Level 3:源码与原理(5 年以上)
3.1 工作流执行引擎:DAG 调度原理
Dify 的工作流执行是基于 DAG(有向无环图)的拓扑排序调度。核心数据结构:
# 简化的工作流图结构
@dataclass
class WorkflowGraph:
nodes: dict[str, WorkflowNode] # node_id -> node
edges: list[tuple[str, str]] # (source_node_id, target_node_id)
def get_execution_order(self) -> list[str]:
"""拓扑排序,确定节点执行顺序"""
in_degree = {node_id: 0 for node_id in self.nodes}
for source, target in self.edges:
in_degree[target] += 1
# BFS 拓扑排序
queue = [nid for nid, deg in in_degree.items() if deg == 0]
order = []
while queue:
node_id = queue.pop(0)
order.append(node_id)
for source, target in self.edges:
if source == node_id:
in_degree[target] -= 1
if in_degree[target] == 0:
queue.append(target)
return order
关键源码位置:
api/core/workflow/workflow_engine_manager.py:工作流引擎入口api/core/workflow/graph_engine/:图执行引擎api/core/workflow/nodes/:各节点类型的实现
执行器核心逻辑(简化):
class GraphEngine:
def run(self, graph: WorkflowGraph, inputs: dict) -> dict:
# 初始化变量池
variable_pool = VariablePool(start_variables=inputs)
# 确定执行路径(考虑条件分支)
execution_queue = [graph.start_node_id]
executed = set()
while execution_queue:
node_id = execution_queue.pop(0)
node = graph.nodes[node_id]
# 执行节点
result = node.run(variable_pool)
variable_pool.add(node_id, result.outputs)
executed.add(node_id)
# 根据执行结果确定后续节点
next_nodes = self._get_next_nodes(
graph, node_id, result, variable_pool
)
execution_queue.extend(next_nodes)
return variable_pool.get_final_outputs()
3.2 变量池的内存模型
Dify 工作流中的变量池(VariablePool)是一个基于字典的内存数据结构:
class VariablePool:
def __init__(self, start_variables: dict):
# 结构:{node_id: {variable_name: value}}
self._pool = {
"sys": { # 系统变量
"workflow_id": "xxx",
"run_id": "yyy",
"user_id": "zzz"
},
"start": start_variables # 开始节点输入
}
def get(self, node_id: str, variable_name: str) -> any:
"""获取变量值,支持点分语法"""
return self._pool.get(node_id, {}).get(variable_name)
def get_any(self, selector: list[str]) -> any:
"""通过 selector 路径获取变量"""
# selector = ["llm_1", "text"] 对应 {{llm_1.text}}
node_id, *path = selector
value = self._pool.get(node_id, {})
for key in path:
if isinstance(value, dict):
value = value.get(key)
return value
变量类型转换:当变量从一个节点传到另一个节点时,Dify 会做类型检查和转换。如果 LLM 输出的是字符串,而下游代码节点期望整数,会自动尝试转换;失败则抛出 VariableTypeError。
3.3 条件分支的评估机制
IF/ELSE 节点的条件评估基于一个规则引擎:
class ConditionEvaluator:
OPERATORS = {
"eq": lambda a, b: a == b,
"neq": lambda a, b: a != b,
"gt": lambda a, b: float(a) > float(b),
"gte": lambda a, b: float(a) >= float(b),
"lt": lambda a, b: float(a) < float(b),
"lte": lambda a, b: float(a) <= float(b),
"contains": lambda a, b: str(b) in str(a),
"not_contains": lambda a, b: str(b) not in str(a),
"starts_with": lambda a, b: str(a).startswith(str(b)),
"ends_with": lambda a, b: str(a).endswith(str(b)),
"is_empty": lambda a, _: not a or str(a).strip() == "",
"is_not_empty": lambda a, _: bool(a) and str(a).strip() != "",
}
def evaluate(self, condition: Condition, variable_pool: VariablePool) -> bool:
left_value = variable_pool.get_any(condition.left_selector)
evaluator = self.OPERATORS.get(condition.operator)
if not evaluator:
raise ValueError(f"Unknown operator: {condition.operator}")
return evaluator(left_value, condition.right_value)
def evaluate_group(self, group: ConditionGroup, variable_pool) -> bool:
results = [self.evaluate(c, variable_pool) for c in group.conditions]
if group.logic == "AND":
return all(results)
elif group.logic == "OR":
return any(results)
重要限制:条件评估是在 Python 层面进行的,不支持正则表达式(需要在代码节点中处理)。复杂的字符串匹配逻辑应该先在代码节点中转换为布尔值,再用 eq: true 做条件判断。
Level 4:生产陷阱与决策(专家视角)
4.1 陷阱一:变量命名冲突导致的静默错误
问题:多个节点的输出包含同名变量时,后面节点的同名变量会覆盖前面的,导致意外行为。
错误示例:
- 节点 "llm_analysis" 输出变量
result(字符串) - 节点 "code_parser" 也输出变量
result(字典) - 下游节点引用
result时,到底是哪个?
正确做法:
- 每个节点使用有意义且唯一的变量名
- 如:
analysis_text、parsed_json,而非都叫result - 在代码节点中,返回的 key 应该描述内容,不要用通用名称
4.2 陷阱二:LLM 输出 JSON 解析失败
高频问题:LLM 被要求输出 JSON,但实际输出了带 markdown 代码块或额外文字的内容:
# LLM 实际输出(不是纯 JSON):
当然,以下是分析结果:
```json
{"score": 85, "verdict": "recommend"}
希望对您有帮助!
直接 `json.loads()` 会报错。
**健壮的 JSON 提取函数**:
```python
import json
import re
def extract_json(llm_output: str) -> dict:
"""从 LLM 输出中提取 JSON,处理各种格式"""
# 方法1:直接解析(最理想情况)
try:
return json.loads(llm_output.strip())
except json.JSONDecodeError:
pass
# 方法2:提取 markdown 代码块中的 JSON
code_block_pattern = r'```(?:json)?\s*([\s\S]*?)\s*```'
matches = re.findall(code_block_pattern, llm_output)
if matches:
try:
return json.loads(matches[0])
except json.JSONDecodeError:
pass
# 方法3:用正则找第一个完整的 JSON 对象
json_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
matches = re.findall(json_pattern, llm_output, re.DOTALL)
for match in matches:
try:
return json.loads(match)
except json.JSONDecodeError:
continue
raise ValueError(f"无法从 LLM 输出中提取 JSON: {llm_output[:200]}")
根本解决方案:使用支持 Structured Output 的模型(GPT-4o、Claude 3.5 Sonnet),在 Dify 节点配置中开启 JSON Schema 输出模式,从根本上避免格式错误。
4.3 陷阱三:工作流超时未处理
问题:工作流中某个节点(通常是外部 HTTP 请求或大文件处理)超时,整个工作流挂起,API 调用方长时间等待无响应。
Dify 的超时配置:
- 每个 HTTP 请求节点可设置单独的超时时间(建议 30s)
- LLM 节点的超时通过
max_tokens间接控制(token 越少越快) - 工作流整体超时默认 200s,可在工作流设置中修改
客户端超时处理:
import requests
from requests.exceptions import Timeout
def safe_run_workflow(inputs: dict, timeout: int = 60) -> dict:
try:
response = requests.post(
"https://api.dify.ai/v1/workflows/run",
headers={"Authorization": f"Bearer {API_KEY}"},
json={"inputs": inputs, "response_mode": "blocking"},
timeout=timeout # 客户端超时设置
)
return response.json()
except Timeout:
return {
"status": "timeout",
"error": f"工作流超时(>{timeout}s),请稍后重试"
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
异步处理方案:对于耗时超过 60s 的工作流,使用流式模式(streaming),让调用方实时接收进度,而不是傻等最终结果。
4.4 工作流复杂度控制:什么时候该拆分工作流?
一个工作流不要试图做所有事情。拆分判断标准:
超过以下情况时应考虑拆分:
- 节点数量 > 20 个
- 分支深度 > 4 层
- 单次执行耗时 > 30 秒
- 多个业务场景共用同一段逻辑(应抽取为子工作流)
拆分方式:Dify 支持在工作流中调用另一个工作流(子工作流调用),实现模块化复用。
主工作流:
开始 → 数据验证 → 调用子工作流A → 调用子工作流B → 汇总输出 → 结束
子工作流A(简历分析):
独立的完整分析流程
子工作流B(邮件生成):
独立的邮件生成流程
这样主工作流清晰,子工作流可独立测试和迭代。
本章小结
工作流是 Dify 中最灵活的能力,掌握它需要理解几个核心概念:
节点选择原则:数据处理用代码节点(精确、可测试);语义理解用 LLM 节点;外部集成用 HTTP 节点;检索用知识库节点。
变量系统:用有意义的名称,避免通用名(result、output);记住分支内的变量在分支外不可见。
条件分支:复杂条件先在代码节点中转换为布尔值,再在 IF/ELSE 节点中使用;避免在 IF/ELSE 节点中写过于复杂的条件表达式。
触发方式:开发测试用界面手动触发;生产集成用 API 调用;实时场景用流式模式。
关键清单:
- 工作流输入有验证节点(长度、格式、必填)
- LLM 输出 JSON 时有健壮的解析代码
- HTTP 节点设置了超时时间
- 条件分支考虑了所有可能路径(包括异常情况)
- 变量命名语义化,无重名冲突
- 工作流 API 调用端做了超时和错误处理