代码节点与自定义函数:Python/JS 扩展工作流能力
第11章:代码节点与自定义函数——Python/JS 扩展工作流能力
代码节点是工作流的逃生舱——当内置节点无法满足需求时,Python 或 JavaScript 代码让你可以做任何数据处理、转换和计算,不受平台限制。
本章导读
Dify 的内置节点覆盖了大多数常见场景:LLM 调用、知识库检索、HTTP 请求、条件分支。但在实际业务中,总会遇到内置节点无法直接解决的问题:
- 解析 LLM 输出的复杂 JSON,进行多步骤数据清洗
- 从文本中用正则表达式提取邮箱、手机号、日期
- 对检索结果进行自定义的相关性排序和去重
- 计算统计指标(平均分、分位数、标准差)
- 格式转换(Markdown 转 HTML,Excel 数据处理)
代码节点(Code Node)正是为这些场景设计的。它在沙箱环境中执行 Python 3 或 JavaScript 代码,让工作流具备图灵完备的数据处理能力。
本章将深入讲解:
- 代码节点的执行环境、能力边界与安全限制
- Python 代码节点的实用技巧与最佳实践
- JavaScript 代码节点的特性与适用场景
- 如何通过外部 HTTP 服务绕过代码节点的网络限制
- 代码节点的调试技术
Level 1:基础认知(1-3 年经验)
1.1 代码节点的能力边界
能做什么:
- 所有 Python 3 / JavaScript 标准库操作
- 字符串处理、数学计算、日期时间处理
- JSON/XML/CSV 解析和生成
- Base64 编码解码
- 正则表达式
- 列表/字典操作和排序
- 简单的机器学习计算(如果库可用)
不能做什么(安全沙箱限制):
- 发起网络请求(
requests.get()、fetch()等均被禁用) - 文件系统访问(不能读写本地文件)
- 执行系统命令(
os.system()、subprocess被禁用) - 无限循环(超时会被强制终止,默认 10 秒)
- 多线程/多进程
如果需要网络访问,应该使用 HTTP 请求节点;如果需要访问文件,先通过 HTTP 节点下载再在代码节点处理(以字符串或 bytes 形式传入)。
1.2 代码节点的基本结构
Python 代码节点必须定义一个 main 函数,函数参数对应上游节点的变量,返回值是一个字典:
def main(
text: str, # 接收上游节点的 text 变量
count: int = 10, # 带默认值的参数
items: list = None # 数组参数
) -> dict:
# 处理逻辑
result = text.upper()[:count]
# 必须返回字典
return {
"processed_text": result,
"word_count": len(text.split()),
"is_long": len(text) > 100
}
JavaScript 代码节点必须定义 main 函数,同样返回对象:
async function main({ text, count = 10, items = [] }) {
// 处理逻辑
const result = text.toUpperCase().substring(0, count);
return {
processedText: result,
wordCount: text.split(' ').length,
isLong: text.length > 100
};
}
注意:JavaScript 代码节点支持 async/await,但由于网络访问被禁用,实际上不能进行真正的异步网络操作。
1.3 常用场景一:JSON 解析和数据清洗
这是代码节点最高频的使用场景。LLM 输出的 JSON 经常包含多余的文字、不完整的引号或其他格式问题:
import json
import re
def main(llm_output: str) -> dict:
"""从 LLM 输出中可靠地提取 JSON"""
# 清理输入
text = llm_output.strip()
# 方案1:直接解析
try:
return {"data": json.loads(text), "success": True}
except json.JSONDecodeError:
pass
# 方案2:提取 ```json ... ``` 代码块
pattern = r'```(?:json)?\s*\n?([\s\S]*?)\n?\s*```'
match = re.search(pattern, text)
if match:
try:
return {"data": json.loads(match.group(1)), "success": True}
except json.JSONDecodeError:
pass
# 方案3:找第一个完整的 {...} 或 [...] 结构
for start_char, end_char in [('{', '}'), ('[', ']')]:
start = text.find(start_char)
if start == -1:
continue
# 用括号匹配找到对应的结束位置
depth = 0
for i, char in enumerate(text[start:], start):
if char == start_char:
depth += 1
elif char == end_char:
depth -= 1
if depth == 0:
json_str = text[start:i+1]
try:
return {"data": json.loads(json_str), "success": True}
except json.JSONDecodeError:
break
# 所有方法都失败
return {
"data": None,
"success": False,
"error": f"无法解析 JSON,原始输出(前200字): {text[:200]}"
}
1.4 常用场景二:文本提取和格式化
import re
from datetime import datetime
def main(raw_text: str) -> dict:
"""从非结构化文本中提取关键信息"""
# 提取邮箱地址
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
emails = re.findall(email_pattern, raw_text)
# 提取中国手机号
phone_pattern = r'(?<!\d)1[3-9]\d{9}(?!\d)'
phones = re.findall(phone_pattern, raw_text)
# 提取日期(多种格式)
date_patterns = [
r'\d{4}[-/]\d{1,2}[-/]\d{1,2}', # 2024-01-15
r'\d{1,2}[-/]\d{1,2}[-/]\d{4}', # 15/01/2024
r'\d{4}年\d{1,2}月\d{1,2}日' # 2024年1月15日
]
dates = []
for pattern in date_patterns:
dates.extend(re.findall(pattern, raw_text))
# 提取 URL
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
urls = re.findall(url_pattern, raw_text)
# 统计字数
chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', raw_text))
english_words = len(re.findall(r'\b[a-zA-Z]+\b', raw_text))
return {
"emails": list(set(emails)), # 去重
"phones": list(set(phones)),
"dates": list(set(dates)),
"urls": list(set(urls)),
"char_count": {
"chinese": chinese_chars,
"english_words": english_words,
"total_chars": len(raw_text)
}
}
1.5 常用场景三:列表处理和排序
def main(
items: list,
sort_by: str = "score",
descending: bool = True,
top_n: int = 5
) -> dict:
"""对列表进行排序、过滤和聚合"""
if not items:
return {"result": [], "stats": {}}
# 过滤掉无效数据
valid_items = [
item for item in items
if isinstance(item, dict) and sort_by in item
]
# 排序
sorted_items = sorted(
valid_items,
key=lambda x: x.get(sort_by, 0),
reverse=descending
)
# 取 Top-N
top_items = sorted_items[:top_n]
# 计算统计数据
scores = [item.get(sort_by, 0) for item in valid_items]
return {
"result": top_items,
"stats": {
"total": len(valid_items),
"max": max(scores) if scores else 0,
"min": min(scores) if scores else 0,
"avg": sum(scores) / len(scores) if scores else 0
}
}
Level 2:机制深解(3-5 年经验)
2.1 代码节点的执行环境详解
Python 版本:Dify 代码节点使用 Python 3.10+。
预装的可用库:
# 以下库在 Dify 代码节点中可直接使用(无需 import 声明)
# 标准库(完整支持)
import json, re, math, datetime, collections, itertools
import base64, hashlib, hmac, uuid, string
import functools, operator, copy, typing
# 第三方库(预装,可用但需 import)
import numpy as np # 数值计算
import pandas as pd # 数据处理
import jieba # 中文分词
import yaml # YAML 解析
import markdown # Markdown 渲染
import jinja2 # 模板引擎
不可用的库:
requests、httpx、aiohttp(网络)flask、fastapi(服务器)torch、tensorflow(太大)subprocess、os.system(系统命令)open()、os.path(文件系统)
超时限制:
- 默认执行超时:10 秒
- 可在节点设置中调整(最大 60 秒)
- 超时后工作流报错,不会返回部分结果
2.2 pandas 在代码节点中的使用
pandas 是处理结构化数据最强大的工具之一,在代码节点中可以发挥重要作用:
import pandas as pd
import json
def main(data_json: str) -> dict:
"""
使用 pandas 处理表格数据
data_json: JSON 字符串,格式为 [{col1: val1, col2: val2}, ...]
"""
# 解析 JSON 数据
records = json.loads(data_json)
df = pd.DataFrame(records)
# 数据清洗
df = df.dropna(subset=['score', 'name']) # 删除关键字段缺失的行
df['score'] = pd.to_numeric(df['score'], errors='coerce') # 转换为数字
df = df[df['score'] >= 0] # 过滤无效分数
# 统计分析
stats = {
"total_records": len(df),
"avg_score": round(df['score'].mean(), 2),
"median_score": round(df['score'].median(), 2),
"std_score": round(df['score'].std(), 2),
"score_distribution": {
"0-60": int((df['score'] < 60).sum()),
"60-80": int(((df['score'] >= 60) & (df['score'] < 80)).sum()),
"80-100": int((df['score'] >= 80).sum())
}
}
# 分组聚合
if 'department' in df.columns:
dept_stats = df.groupby('department')['score'].agg(
['mean', 'count', 'max']
).round(2).to_dict('index')
stats['by_department'] = dept_stats
# 返回 Top-10
top_performers = df.nlargest(10, 'score')[
['name', 'score', 'department']
].to_dict('records')
return {
"stats": stats,
"top_performers": top_performers,
"cleaned_records": len(df)
}
2.3 JavaScript 代码节点的特性
JavaScript 代码节点使用 Node.js 环境,特别适合:
字符串处理和模板:
async function main({
template,
variables,
items
}) {
// 模板字符串替换
let result = template;
// 替换 {{variable}} 占位符
for (const [key, value] of Object.entries(variables)) {
result = result.replace(
new RegExp(`\\{\\{${key}\\}\\}`, 'g'),
String(value)
);
}
// 处理列表渲染
if (items && items.length > 0) {
const listHtml = items
.map((item, idx) => `${idx + 1}. ${item}`)
.join('\n');
result = result.replace('{{items}}', listHtml);
}
return { rendered: result };
}
日期和时间处理(JavaScript 的 Date API 比 Python 更直观):
async function main({
start_date_str,
end_date_str,
timezone = 'Asia/Shanghai'
}) {
const start = new Date(start_date_str);
const end = new Date(end_date_str);
const diffMs = end - start;
const diffDays = Math.floor(diffMs / (1000 * 60 * 60 * 24));
const diffHours = Math.floor((diffMs % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60));
// 格式化日期
const formatDate = (date) => {
return date.toLocaleDateString('zh-CN', {
timeZone: timezone,
year: 'numeric',
month: 'long',
day: 'numeric',
weekday: 'long'
});
};
return {
start_formatted: formatDate(start),
end_formatted: formatDate(end),
duration_days: diffDays,
duration_hours: diffHours,
is_same_month: start.getMonth() === end.getMonth()
};
}
JSON 操作和深度处理:
async function main({ nested_json_str }) {
const data = JSON.parse(nested_json_str);
// 递归扁平化嵌套对象
function flatten(obj, prefix = '') {
return Object.keys(obj).reduce((acc, key) => {
const fullKey = prefix ? `${prefix}.${key}` : key;
if (typeof obj[key] === 'object' && obj[key] !== null && !Array.isArray(obj[key])) {
Object.assign(acc, flatten(obj[key], fullKey));
} else {
acc[fullKey] = obj[key];
}
return acc;
}, {});
}
const flattened = flatten(data);
return {
flattened_json: JSON.stringify(flattened),
key_count: Object.keys(flattened).length,
keys: Object.keys(flattened)
};
}
2.4 通过外部服务扩展代码节点能力
由于代码节点无法访问网络,对于需要网络的操作,推荐的模式是:
模式:代码节点 + HTTP 节点组合
代码节点(准备请求参数)
↓
HTTP 节点(发起实际网络请求)
↓
代码节点(处理响应数据)
自建 Sidecar 服务
创建一个轻量级 HTTP 服务,将复杂的外部集成逻辑封装进去,在 HTTP 节点中调用:
# sidecar_service.py(独立 Python 服务,不在 Dify 中运行)
from fastapi import FastAPI
import requests
import openai
app = FastAPI()
@app.post("/extract-from-pdf")
async def extract_pdf(url: str):
"""下载 PDF 并提取文本(代码节点无法做到)"""
response = requests.get(url)
# 使用 pdfplumber 或 pypdf2 提取文本
text = extract_text_from_pdf(response.content)
return {"text": text}
@app.post("/call-embedding")
async def call_embedding(texts: list):
"""调用 Embedding API(绕过代码节点的网络限制)"""
response = openai.embeddings.create(
model="text-embedding-3-small",
input=texts
)
return {"embeddings": [e.embedding for e in response.data]}
在 Dify HTTP 节点中调用 http://your-sidecar-service:8000/extract-from-pdf,实现代码节点无法完成的操作。
2.5 代码节点的错误处理
代码节点内部应该有完善的错误处理,避免因为边缘输入导致整个工作流失败:
def main(data: str, operation: str = "parse") -> dict:
"""带完善错误处理的代码节点"""
result = {
"success": False,
"data": None,
"error": None,
"error_type": None
}
try:
if operation == "parse":
import json
parsed = json.loads(data)
result["data"] = parsed
result["success"] = True
elif operation == "process":
# 主要处理逻辑
processed = do_complex_processing(data)
result["data"] = processed
result["success"] = True
else:
result["error"] = f"未知操作: {operation}"
result["error_type"] = "invalid_operation"
except json.JSONDecodeError as e:
result["error"] = f"JSON 解析失败: {str(e)}"
result["error_type"] = "json_error"
except ValueError as e:
result["error"] = f"数据验证失败: {str(e)}"
result["error_type"] = "validation_error"
except Exception as e:
result["error"] = f"未知错误: {str(e)}"
result["error_type"] = "unknown_error"
return result
然后在下游节点的条件分支中检查 code_node.success,决定走正常路径还是错误处理路径。
Level 3:源码与原理(5 年以上)
3.1 代码节点沙箱实现原理
Dify 使用 DifySandbox 来安全执行用户代码。沙箱的核心机制:
# api/core/workflow/nodes/code/code_node.py
class CodeNode(BaseNode):
def _run(self, variable_pool: VariablePool) -> NodeRunResult:
# 准备输入变量
inputs = self._prepare_inputs(variable_pool)
# 通过沙箱执行代码
runner = CodeExecutor(
code=self.node_data.code,
code_language=self.node_data.code_language,
timeout=self.node_data.timeout or 10
)
result = runner.execute(inputs)
if not result.success:
raise CodeExecutionError(result.error_message)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs=result.outputs
)
沙箱的安全机制(api/core/tools/utils/dify_sandbox/):
- 进程隔离:用户代码在独立的子进程中执行,崩溃不影响主进程
- 资源限制:
- CPU 时间:最大 10 秒(可配置)
- 内存:最大 256MB
- 文件描述符:受限
- 系统调用过滤(seccomp):禁止危险的系统调用
- 网络命名空间隔离:Python 的网络调用在沙箱中被拒绝
Python 代码的执行流程:
import subprocess
import json
import sys
def execute_python_code(code: str, inputs: dict, timeout: int) -> dict:
# 构建完整的执行脚本
wrapper = f"""
import sys
import json
# 注入输入变量
{chr(10).join(f"{k} = {repr(v)}" for k, v in inputs.items())}
# 用户代码
{code}
# 执行并输出结果
result = main({', '.join(f'{k}={k}' for k in inputs)})
print(json.dumps(result))
"""
# 在子进程中执行(带超时)
proc = subprocess.run(
[sys.executable, '-c', wrapper],
capture_output=True,
timeout=timeout,
# 通过 seccomp 限制系统调用
preexec_fn=apply_seccomp_filter
)
if proc.returncode != 0:
raise CodeExecutionError(proc.stderr.decode())
return json.loads(proc.stdout.decode())
3.2 代码节点的类型系统
Dify 在代码节点的输入输出之间有严格的类型转换系统:
class CodeNodeVariableTypeConverter:
"""负责代码节点输入输出的类型转换"""
TYPE_MAP = {
"string": str,
"number": float,
"boolean": bool,
"object": dict,
"array": list,
"file": dict # 文件被转换为 {name, url, mime_type} 字典
}
@classmethod
def convert_input(cls, value, expected_type: str):
"""将工作流变量转换为代码节点期望的类型"""
converter = cls.TYPE_MAP.get(expected_type)
if expected_type == "number":
# 字符串 "42" → 整数/浮点数 42
try:
return int(value) if str(value).isdigit() else float(value)
except (ValueError, TypeError):
return 0
elif expected_type == "array" and isinstance(value, str):
# 字符串 '["a","b"]' → Python 列表 ["a", "b"]
import json
try:
parsed = json.loads(value)
if isinstance(parsed, list):
return parsed
except json.JSONDecodeError:
pass
# 如果解析失败,按逗号分割
return [v.strip() for v in value.split(',')]
# 类型已匹配,直接返回
if isinstance(value, cls.TYPE_MAP.get(expected_type, type(None))):
return value
# 强制转换
try:
return converter(value)
except (ValueError, TypeError):
return None
3.3 JavaScript 代码节点的 V8 沙箱
JavaScript 代码节点使用基于 V8 引擎的隔离执行环境:
Node.js 主进程(Dify API Server)
↓ 启动子进程
Node.js 子进程(隔离的 JS 执行环境)
↓ 使用 vm 模块创建独立上下文
vm.createContext({
// 注入安全的全局对象
console: sandboxedConsole,
JSON: JSON,
Math: Math,
Date: Date,
// 不注入 fetch、require、process 等危险对象
})
↓ 执行用户代码
vm.runInContext(userCode, context, { timeout: 10000 })
关键:通过 vm.createContext() 创建的上下文不包含 require、process、fetch 等模块,用户代码无法访问文件系统或网络,即使尝试也会抛出 ReferenceError。
Level 4:生产陷阱与决策(专家视角)
4.1 陷阱一:代码节点中的大数据处理
问题:在代码节点中处理 10MB+ 的数据,可能超过内存限制或超时。
场景:迭代节点处理了 500 份文档,每份文档的分析结果作为列表传入代码节点进行汇总。500 × 2KB = 1MB,看起来没问题,但实际上数据在序列化/反序列化过程中会膨胀 3-5 倍。
解决方案:
- 在迭代内部就做聚合(流式归约):
# 不好的方式:收集所有结果再汇总(内存峰值高)
def main(all_results: list) -> dict:
# all_results 可能很大
total = sum(r["score"] for r in all_results)
return {"total": total}
# 好的方式:在迭代节点内部的每次迭代中维护累积状态
# (通过外部存储如 Redis 实现)
def main(score: float, job_id: str, redis_url: str) -> dict:
import redis
r = redis.from_url(redis_url)
# 原子性地累加分数
r.incrbyfloat(f"job:{job_id}:total_score", score)
r.incr(f"job:{job_id}:count")
return {"status": "accumulated"}
- 流式处理而非批量处理:不要把所有数据塞进一次代码节点调用,而是分批次处理,每批处理完就输出结果。
4.2 陷阱二:不纯函数导致的幂等性问题
问题:代码节点中使用了依赖时间、随机数或外部状态的逻辑:
# 有问题的代码:每次运行结果不同
def main(text: str) -> dict:
import random
import time
# 随机选择处理方式(非确定性)
strategy = random.choice(["strategy_a", "strategy_b"])
# 使用当前时间(可能因为重试导致不一致)
timestamp = time.time()
return {"strategy": strategy, "ts": timestamp}
问题影响:工作流重试时,两次执行结果不一致,难以调试。
正确做法:把随机种子或时间戳作为输入参数传入,而不是在代码节点内部生成:
# 正确:确定性函数,相同输入 = 相同输出
def main(text: str, seed: int = 42, timestamp: float = 0) -> dict:
import random
rng = random.Random(seed) # 使用固定种子
strategy = rng.choice(["strategy_a", "strategy_b"])
return {"strategy": strategy, "ts": timestamp}
4.3 陷阱三:调试困难——打印语句看不到
代码节点不提供 print() 输出的查看界面,调试只能看到最终的返回值或错误。
调试技巧:
- 把调试信息塞进返回值:
def main(data: str) -> dict:
debug_log = []
debug_log.append(f"输入长度: {len(data)}")
try:
result = process(data)
debug_log.append(f"处理成功: {type(result).__name__}")
except Exception as e:
debug_log.append(f"处理失败: {str(e)}")
result = None
return {
"result": result,
"_debug": debug_log # 下划线开头的字段不会影响正常流程
}
- 在 Dify 外部先测试代码:
# 本地测试脚本(不在 Dify 中运行)
def main(data: str) -> dict:
# 代码节点的逻辑
...
# 本地测试
if __name__ == "__main__":
test_input = "测试数据..."
result = main(test_input)
print(result)
- 使用异常信息传递调试信息:
def main(data: str) -> dict:
try:
result = process(data)
return {"result": result}
except Exception as e:
# 故意抛出包含调试信息的异常
raise ValueError(f"处理失败,输入: {repr(data[:100])}, 错误: {str(e)}")
4.4 代码节点 vs HTTP 节点:正确选型
| 需求 | 推荐节点 | 原因 |
|---|---|---|
| 字符串处理、正则 | 代码节点 | 无需网络,速度快 |
| JSON 解析/格式化 | 代码节点 | 标准库充足 |
| 数学计算、统计 | 代码节点 | numpy/pandas 可用 |
| 调用外部 REST API | HTTP 节点 | 代码节点无网络权限 |
| 数据库查询 | HTTP 节点(查数据库代理) | 需要网络连接 |
| LLM API 调用 | LLM 节点 | 专用节点更简洁 |
| 文件下载处理 | HTTP 节点下载 + 代码节点处理 | 分工合作 |
4.5 代码节点的版本管理
代码节点的代码直接内嵌在工作流定义中,随工作流一起进行版本控制。但这带来一个问题:同样的处理逻辑(如 JSON 提取函数)可能在十几个工作流中重复出现。
最佳实践:工具函数服务化
把通用工具函数部署为轻量级 HTTP 服务(自建 Sidecar),通过 HTTP 节点调用:
# 通用工具服务(独立部署)
from fastapi import FastAPI
app = FastAPI()
@app.post("/utils/extract-json")
def extract_json(text: str) -> dict:
"""标准化的 JSON 提取逻辑,所有工作流共用一个版本"""
# ... 完善的 JSON 提取实现
pass
@app.post("/utils/normalize-text")
def normalize_text(text: str, options: dict) -> dict:
"""文本标准化"""
pass
这样,通用逻辑集中维护、统一版本,各工作流通过 HTTP 节点调用,避免重复代码带来的维护噩梦。
本章小结
代码节点是 Dify 工作流中最灵活的"逃生舱",用好它需要:
理解能力边界:网络访问是不可逾越的红线,所有需要网络的操作必须通过 HTTP 节点;文件系统同样不可访问。
Python vs JavaScript 选择:数据处理和计算优选 Python(numpy/pandas 支持好);字符串模板和前端数据转换可选 JavaScript(Date API 更方便)。
可靠性设计:每个代码节点都应该有 try/except,输出 success 字段,让下游节点能判断是否成功。
可调试性:在开发阶段,把调试信息也包含在返回值中;在生产阶段用自定义异常消息传递错误上下文。
关键清单:
- 代码节点有 try/except 包裹所有主要逻辑
- 函数参数有类型注解(帮助 Dify 做类型转换)
- 返回值包含 success 字段,方便条件分支判断
- 没有使用 requests/fetch 等网络调用
- 处理大数据时考虑了内存限制(256MB)
- 在本地测试过代码逻辑,再放入工作流
- 通用逻辑已提取为外部服务,避免重复