第 11 章

代码节点与自定义函数:Python/JS 扩展工作流能力

第11章:代码节点与自定义函数——Python/JS 扩展工作流能力

代码节点是工作流的逃生舱——当内置节点无法满足需求时,Python 或 JavaScript 代码让你可以做任何数据处理、转换和计算,不受平台限制。

本章导读

Dify 的内置节点覆盖了大多数常见场景:LLM 调用、知识库检索、HTTP 请求、条件分支。但在实际业务中,总会遇到内置节点无法直接解决的问题:

代码节点(Code Node)正是为这些场景设计的。它在沙箱环境中执行 Python 3 或 JavaScript 代码,让工作流具备图灵完备的数据处理能力。

本章将深入讲解:


Level 1:基础认知(1-3 年经验)

1.1 代码节点的能力边界

能做什么

不能做什么(安全沙箱限制):

如果需要网络访问,应该使用 HTTP 请求节点;如果需要访问文件,先通过 HTTP 节点下载再在代码节点处理(以字符串或 bytes 形式传入)。

1.2 代码节点的基本结构

Python 代码节点必须定义一个 main 函数,函数参数对应上游节点的变量,返回值是一个字典:

def main(
    text: str,          # 接收上游节点的 text 变量
    count: int = 10,    # 带默认值的参数
    items: list = None  # 数组参数
) -> dict:
    # 处理逻辑
    result = text.upper()[:count]
    
    # 必须返回字典
    return {
        "processed_text": result,
        "word_count": len(text.split()),
        "is_long": len(text) > 100
    }

JavaScript 代码节点必须定义 main 函数,同样返回对象:

async function main({ text, count = 10, items = [] }) {
    // 处理逻辑
    const result = text.toUpperCase().substring(0, count);
    
    return {
        processedText: result,
        wordCount: text.split(' ').length,
        isLong: text.length > 100
    };
}

注意:JavaScript 代码节点支持 async/await,但由于网络访问被禁用,实际上不能进行真正的异步网络操作。

1.3 常用场景一:JSON 解析和数据清洗

这是代码节点最高频的使用场景。LLM 输出的 JSON 经常包含多余的文字、不完整的引号或其他格式问题:

import json
import re

def main(llm_output: str) -> dict:
    """从 LLM 输出中可靠地提取 JSON"""
    
    # 清理输入
    text = llm_output.strip()
    
    # 方案1:直接解析
    try:
        return {"data": json.loads(text), "success": True}
    except json.JSONDecodeError:
        pass
    
    # 方案2:提取 ```json ... ``` 代码块
    pattern = r'```(?:json)?\s*\n?([\s\S]*?)\n?\s*```'
    match = re.search(pattern, text)
    if match:
        try:
            return {"data": json.loads(match.group(1)), "success": True}
        except json.JSONDecodeError:
            pass
    
    # 方案3:找第一个完整的 {...} 或 [...] 结构
    for start_char, end_char in [('{', '}'), ('[', ']')]:
        start = text.find(start_char)
        if start == -1:
            continue
        
        # 用括号匹配找到对应的结束位置
        depth = 0
        for i, char in enumerate(text[start:], start):
            if char == start_char:
                depth += 1
            elif char == end_char:
                depth -= 1
                if depth == 0:
                    json_str = text[start:i+1]
                    try:
                        return {"data": json.loads(json_str), "success": True}
                    except json.JSONDecodeError:
                        break
    
    # 所有方法都失败
    return {
        "data": None,
        "success": False,
        "error": f"无法解析 JSON,原始输出(前200字): {text[:200]}"
    }

1.4 常用场景二:文本提取和格式化

import re
from datetime import datetime

def main(raw_text: str) -> dict:
    """从非结构化文本中提取关键信息"""
    
    # 提取邮箱地址
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    emails = re.findall(email_pattern, raw_text)
    
    # 提取中国手机号
    phone_pattern = r'(?<!\d)1[3-9]\d{9}(?!\d)'
    phones = re.findall(phone_pattern, raw_text)
    
    # 提取日期(多种格式)
    date_patterns = [
        r'\d{4}[-/]\d{1,2}[-/]\d{1,2}',  # 2024-01-15
        r'\d{1,2}[-/]\d{1,2}[-/]\d{4}',   # 15/01/2024
        r'\d{4}年\d{1,2}月\d{1,2}日'       # 2024年1月15日
    ]
    dates = []
    for pattern in date_patterns:
        dates.extend(re.findall(pattern, raw_text))
    
    # 提取 URL
    url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
    urls = re.findall(url_pattern, raw_text)
    
    # 统计字数
    chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', raw_text))
    english_words = len(re.findall(r'\b[a-zA-Z]+\b', raw_text))
    
    return {
        "emails": list(set(emails)),  # 去重
        "phones": list(set(phones)),
        "dates": list(set(dates)),
        "urls": list(set(urls)),
        "char_count": {
            "chinese": chinese_chars,
            "english_words": english_words,
            "total_chars": len(raw_text)
        }
    }

1.5 常用场景三:列表处理和排序

def main(
    items: list,
    sort_by: str = "score",
    descending: bool = True,
    top_n: int = 5
) -> dict:
    """对列表进行排序、过滤和聚合"""
    
    if not items:
        return {"result": [], "stats": {}}
    
    # 过滤掉无效数据
    valid_items = [
        item for item in items
        if isinstance(item, dict) and sort_by in item
    ]
    
    # 排序
    sorted_items = sorted(
        valid_items,
        key=lambda x: x.get(sort_by, 0),
        reverse=descending
    )
    
    # 取 Top-N
    top_items = sorted_items[:top_n]
    
    # 计算统计数据
    scores = [item.get(sort_by, 0) for item in valid_items]
    
    return {
        "result": top_items,
        "stats": {
            "total": len(valid_items),
            "max": max(scores) if scores else 0,
            "min": min(scores) if scores else 0,
            "avg": sum(scores) / len(scores) if scores else 0
        }
    }

Level 2:机制深解(3-5 年经验)

2.1 代码节点的执行环境详解

Python 版本:Dify 代码节点使用 Python 3.10+。

预装的可用库

# 以下库在 Dify 代码节点中可直接使用(无需 import 声明)
# 标准库(完整支持)
import json, re, math, datetime, collections, itertools
import base64, hashlib, hmac, uuid, string
import functools, operator, copy, typing

# 第三方库(预装,可用但需 import)
import numpy as np           # 数值计算
import pandas as pd          # 数据处理
import jieba                 # 中文分词
import yaml                  # YAML 解析
import markdown              # Markdown 渲染
import jinja2                # 模板引擎

不可用的库

超时限制

2.2 pandas 在代码节点中的使用

pandas 是处理结构化数据最强大的工具之一,在代码节点中可以发挥重要作用:

import pandas as pd
import json

def main(data_json: str) -> dict:
    """
    使用 pandas 处理表格数据
    data_json: JSON 字符串,格式为 [{col1: val1, col2: val2}, ...]
    """
    
    # 解析 JSON 数据
    records = json.loads(data_json)
    df = pd.DataFrame(records)
    
    # 数据清洗
    df = df.dropna(subset=['score', 'name'])     # 删除关键字段缺失的行
    df['score'] = pd.to_numeric(df['score'], errors='coerce')  # 转换为数字
    df = df[df['score'] >= 0]                    # 过滤无效分数
    
    # 统计分析
    stats = {
        "total_records": len(df),
        "avg_score": round(df['score'].mean(), 2),
        "median_score": round(df['score'].median(), 2),
        "std_score": round(df['score'].std(), 2),
        "score_distribution": {
            "0-60": int((df['score'] < 60).sum()),
            "60-80": int(((df['score'] >= 60) & (df['score'] < 80)).sum()),
            "80-100": int((df['score'] >= 80).sum())
        }
    }
    
    # 分组聚合
    if 'department' in df.columns:
        dept_stats = df.groupby('department')['score'].agg(
            ['mean', 'count', 'max']
        ).round(2).to_dict('index')
        stats['by_department'] = dept_stats
    
    # 返回 Top-10
    top_performers = df.nlargest(10, 'score')[
        ['name', 'score', 'department']
    ].to_dict('records')
    
    return {
        "stats": stats,
        "top_performers": top_performers,
        "cleaned_records": len(df)
    }

2.3 JavaScript 代码节点的特性

JavaScript 代码节点使用 Node.js 环境,特别适合:

字符串处理和模板

async function main({ 
    template, 
    variables, 
    items 
}) {
    // 模板字符串替换
    let result = template;
    
    // 替换 {{variable}} 占位符
    for (const [key, value] of Object.entries(variables)) {
        result = result.replace(
            new RegExp(`\\{\\{${key}\\}\\}`, 'g'), 
            String(value)
        );
    }
    
    // 处理列表渲染
    if (items && items.length > 0) {
        const listHtml = items
            .map((item, idx) => `${idx + 1}. ${item}`)
            .join('\n');
        result = result.replace('{{items}}', listHtml);
    }
    
    return { rendered: result };
}

日期和时间处理(JavaScript 的 Date API 比 Python 更直观):

async function main({ 
    start_date_str, 
    end_date_str,
    timezone = 'Asia/Shanghai'
}) {
    const start = new Date(start_date_str);
    const end = new Date(end_date_str);
    
    const diffMs = end - start;
    const diffDays = Math.floor(diffMs / (1000 * 60 * 60 * 24));
    const diffHours = Math.floor((diffMs % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60));
    
    // 格式化日期
    const formatDate = (date) => {
        return date.toLocaleDateString('zh-CN', {
            timeZone: timezone,
            year: 'numeric',
            month: 'long',
            day: 'numeric',
            weekday: 'long'
        });
    };
    
    return {
        start_formatted: formatDate(start),
        end_formatted: formatDate(end),
        duration_days: diffDays,
        duration_hours: diffHours,
        is_same_month: start.getMonth() === end.getMonth()
    };
}

JSON 操作和深度处理

async function main({ nested_json_str }) {
    const data = JSON.parse(nested_json_str);
    
    // 递归扁平化嵌套对象
    function flatten(obj, prefix = '') {
        return Object.keys(obj).reduce((acc, key) => {
            const fullKey = prefix ? `${prefix}.${key}` : key;
            
            if (typeof obj[key] === 'object' && obj[key] !== null && !Array.isArray(obj[key])) {
                Object.assign(acc, flatten(obj[key], fullKey));
            } else {
                acc[fullKey] = obj[key];
            }
            
            return acc;
        }, {});
    }
    
    const flattened = flatten(data);
    
    return {
        flattened_json: JSON.stringify(flattened),
        key_count: Object.keys(flattened).length,
        keys: Object.keys(flattened)
    };
}

2.4 通过外部服务扩展代码节点能力

由于代码节点无法访问网络,对于需要网络的操作,推荐的模式是:

模式:代码节点 + HTTP 节点组合

代码节点(准备请求参数)
    ↓
HTTP 节点(发起实际网络请求)
    ↓
代码节点(处理响应数据)

自建 Sidecar 服务

创建一个轻量级 HTTP 服务,将复杂的外部集成逻辑封装进去,在 HTTP 节点中调用:

# sidecar_service.py(独立 Python 服务,不在 Dify 中运行)
from fastapi import FastAPI
import requests
import openai

app = FastAPI()

@app.post("/extract-from-pdf")
async def extract_pdf(url: str):
    """下载 PDF 并提取文本(代码节点无法做到)"""
    response = requests.get(url)
    # 使用 pdfplumber 或 pypdf2 提取文本
    text = extract_text_from_pdf(response.content)
    return {"text": text}

@app.post("/call-embedding")
async def call_embedding(texts: list):
    """调用 Embedding API(绕过代码节点的网络限制)"""
    response = openai.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return {"embeddings": [e.embedding for e in response.data]}

在 Dify HTTP 节点中调用 http://your-sidecar-service:8000/extract-from-pdf,实现代码节点无法完成的操作。

2.5 代码节点的错误处理

代码节点内部应该有完善的错误处理,避免因为边缘输入导致整个工作流失败:

def main(data: str, operation: str = "parse") -> dict:
    """带完善错误处理的代码节点"""
    
    result = {
        "success": False,
        "data": None,
        "error": None,
        "error_type": None
    }
    
    try:
        if operation == "parse":
            import json
            parsed = json.loads(data)
            result["data"] = parsed
            result["success"] = True
            
        elif operation == "process":
            # 主要处理逻辑
            processed = do_complex_processing(data)
            result["data"] = processed
            result["success"] = True
            
        else:
            result["error"] = f"未知操作: {operation}"
            result["error_type"] = "invalid_operation"
    
    except json.JSONDecodeError as e:
        result["error"] = f"JSON 解析失败: {str(e)}"
        result["error_type"] = "json_error"
    
    except ValueError as e:
        result["error"] = f"数据验证失败: {str(e)}"
        result["error_type"] = "validation_error"
    
    except Exception as e:
        result["error"] = f"未知错误: {str(e)}"
        result["error_type"] = "unknown_error"
    
    return result

然后在下游节点的条件分支中检查 code_node.success,决定走正常路径还是错误处理路径。


Level 3:源码与原理(5 年以上)

3.1 代码节点沙箱实现原理

Dify 使用 DifySandbox 来安全执行用户代码。沙箱的核心机制:

# api/core/workflow/nodes/code/code_node.py

class CodeNode(BaseNode):
    def _run(self, variable_pool: VariablePool) -> NodeRunResult:
        # 准备输入变量
        inputs = self._prepare_inputs(variable_pool)
        
        # 通过沙箱执行代码
        runner = CodeExecutor(
            code=self.node_data.code,
            code_language=self.node_data.code_language,
            timeout=self.node_data.timeout or 10
        )
        
        result = runner.execute(inputs)
        
        if not result.success:
            raise CodeExecutionError(result.error_message)
        
        return NodeRunResult(
            status=WorkflowNodeExecutionStatus.SUCCEEDED,
            outputs=result.outputs
        )

沙箱的安全机制api/core/tools/utils/dify_sandbox/):

  1. 进程隔离:用户代码在独立的子进程中执行,崩溃不影响主进程
  2. 资源限制
    • CPU 时间:最大 10 秒(可配置)
    • 内存:最大 256MB
    • 文件描述符:受限
  3. 系统调用过滤(seccomp):禁止危险的系统调用
  4. 网络命名空间隔离:Python 的网络调用在沙箱中被拒绝

Python 代码的执行流程

import subprocess
import json
import sys

def execute_python_code(code: str, inputs: dict, timeout: int) -> dict:
    # 构建完整的执行脚本
    wrapper = f"""
import sys
import json

# 注入输入变量
{chr(10).join(f"{k} = {repr(v)}" for k, v in inputs.items())}

# 用户代码
{code}

# 执行并输出结果
result = main({', '.join(f'{k}={k}' for k in inputs)})
print(json.dumps(result))
"""
    
    # 在子进程中执行(带超时)
    proc = subprocess.run(
        [sys.executable, '-c', wrapper],
        capture_output=True,
        timeout=timeout,
        # 通过 seccomp 限制系统调用
        preexec_fn=apply_seccomp_filter
    )
    
    if proc.returncode != 0:
        raise CodeExecutionError(proc.stderr.decode())
    
    return json.loads(proc.stdout.decode())

3.2 代码节点的类型系统

Dify 在代码节点的输入输出之间有严格的类型转换系统:

class CodeNodeVariableTypeConverter:
    """负责代码节点输入输出的类型转换"""
    
    TYPE_MAP = {
        "string": str,
        "number": float,
        "boolean": bool,
        "object": dict,
        "array": list,
        "file": dict  # 文件被转换为 {name, url, mime_type} 字典
    }
    
    @classmethod
    def convert_input(cls, value, expected_type: str):
        """将工作流变量转换为代码节点期望的类型"""
        converter = cls.TYPE_MAP.get(expected_type)
        
        if expected_type == "number":
            # 字符串 "42" → 整数/浮点数 42
            try:
                return int(value) if str(value).isdigit() else float(value)
            except (ValueError, TypeError):
                return 0
        
        elif expected_type == "array" and isinstance(value, str):
            # 字符串 '["a","b"]' → Python 列表 ["a", "b"]
            import json
            try:
                parsed = json.loads(value)
                if isinstance(parsed, list):
                    return parsed
            except json.JSONDecodeError:
                pass
            # 如果解析失败,按逗号分割
            return [v.strip() for v in value.split(',')]
        
        # 类型已匹配,直接返回
        if isinstance(value, cls.TYPE_MAP.get(expected_type, type(None))):
            return value
        
        # 强制转换
        try:
            return converter(value)
        except (ValueError, TypeError):
            return None

3.3 JavaScript 代码节点的 V8 沙箱

JavaScript 代码节点使用基于 V8 引擎的隔离执行环境:

Node.js 主进程(Dify API Server)
    ↓ 启动子进程
Node.js 子进程(隔离的 JS 执行环境)
    ↓ 使用 vm 模块创建独立上下文
vm.createContext({
    // 注入安全的全局对象
    console: sandboxedConsole,
    JSON: JSON,
    Math: Math,
    Date: Date,
    // 不注入 fetch、require、process 等危险对象
})
    ↓ 执行用户代码
vm.runInContext(userCode, context, { timeout: 10000 })

关键:通过 vm.createContext() 创建的上下文不包含 requireprocessfetch 等模块,用户代码无法访问文件系统或网络,即使尝试也会抛出 ReferenceError


Level 4:生产陷阱与决策(专家视角)

4.1 陷阱一:代码节点中的大数据处理

问题:在代码节点中处理 10MB+ 的数据,可能超过内存限制或超时。

场景:迭代节点处理了 500 份文档,每份文档的分析结果作为列表传入代码节点进行汇总。500 × 2KB = 1MB,看起来没问题,但实际上数据在序列化/反序列化过程中会膨胀 3-5 倍。

解决方案

  1. 在迭代内部就做聚合(流式归约):
# 不好的方式:收集所有结果再汇总(内存峰值高)
def main(all_results: list) -> dict:
    # all_results 可能很大
    total = sum(r["score"] for r in all_results)
    return {"total": total}

# 好的方式:在迭代节点内部的每次迭代中维护累积状态
# (通过外部存储如 Redis 实现)
def main(score: float, job_id: str, redis_url: str) -> dict:
    import redis
    r = redis.from_url(redis_url)
    
    # 原子性地累加分数
    r.incrbyfloat(f"job:{job_id}:total_score", score)
    r.incr(f"job:{job_id}:count")
    
    return {"status": "accumulated"}
  1. 流式处理而非批量处理:不要把所有数据塞进一次代码节点调用,而是分批次处理,每批处理完就输出结果。

4.2 陷阱二:不纯函数导致的幂等性问题

问题:代码节点中使用了依赖时间、随机数或外部状态的逻辑:

# 有问题的代码:每次运行结果不同
def main(text: str) -> dict:
    import random
    import time
    
    # 随机选择处理方式(非确定性)
    strategy = random.choice(["strategy_a", "strategy_b"])
    
    # 使用当前时间(可能因为重试导致不一致)
    timestamp = time.time()
    
    return {"strategy": strategy, "ts": timestamp}

问题影响:工作流重试时,两次执行结果不一致,难以调试。

正确做法:把随机种子或时间戳作为输入参数传入,而不是在代码节点内部生成:

# 正确:确定性函数,相同输入 = 相同输出
def main(text: str, seed: int = 42, timestamp: float = 0) -> dict:
    import random
    
    rng = random.Random(seed)  # 使用固定种子
    strategy = rng.choice(["strategy_a", "strategy_b"])
    
    return {"strategy": strategy, "ts": timestamp}

4.3 陷阱三:调试困难——打印语句看不到

代码节点不提供 print() 输出的查看界面,调试只能看到最终的返回值或错误。

调试技巧

  1. 把调试信息塞进返回值
def main(data: str) -> dict:
    debug_log = []
    
    debug_log.append(f"输入长度: {len(data)}")
    
    try:
        result = process(data)
        debug_log.append(f"处理成功: {type(result).__name__}")
    except Exception as e:
        debug_log.append(f"处理失败: {str(e)}")
        result = None
    
    return {
        "result": result,
        "_debug": debug_log  # 下划线开头的字段不会影响正常流程
    }
  1. 在 Dify 外部先测试代码
# 本地测试脚本(不在 Dify 中运行)
def main(data: str) -> dict:
    # 代码节点的逻辑
    ...

# 本地测试
if __name__ == "__main__":
    test_input = "测试数据..."
    result = main(test_input)
    print(result)
  1. 使用异常信息传递调试信息
def main(data: str) -> dict:
    try:
        result = process(data)
        return {"result": result}
    except Exception as e:
        # 故意抛出包含调试信息的异常
        raise ValueError(f"处理失败,输入: {repr(data[:100])}, 错误: {str(e)}")

4.4 代码节点 vs HTTP 节点:正确选型

需求 推荐节点 原因
字符串处理、正则 代码节点 无需网络,速度快
JSON 解析/格式化 代码节点 标准库充足
数学计算、统计 代码节点 numpy/pandas 可用
调用外部 REST API HTTP 节点 代码节点无网络权限
数据库查询 HTTP 节点(查数据库代理) 需要网络连接
LLM API 调用 LLM 节点 专用节点更简洁
文件下载处理 HTTP 节点下载 + 代码节点处理 分工合作

4.5 代码节点的版本管理

代码节点的代码直接内嵌在工作流定义中,随工作流一起进行版本控制。但这带来一个问题:同样的处理逻辑(如 JSON 提取函数)可能在十几个工作流中重复出现。

最佳实践:工具函数服务化

把通用工具函数部署为轻量级 HTTP 服务(自建 Sidecar),通过 HTTP 节点调用:

# 通用工具服务(独立部署)
from fastapi import FastAPI

app = FastAPI()

@app.post("/utils/extract-json")
def extract_json(text: str) -> dict:
    """标准化的 JSON 提取逻辑,所有工作流共用一个版本"""
    # ... 完善的 JSON 提取实现
    pass

@app.post("/utils/normalize-text")
def normalize_text(text: str, options: dict) -> dict:
    """文本标准化"""
    pass

这样,通用逻辑集中维护、统一版本,各工作流通过 HTTP 节点调用,避免重复代码带来的维护噩梦。


本章小结

代码节点是 Dify 工作流中最灵活的"逃生舱",用好它需要:

理解能力边界:网络访问是不可逾越的红线,所有需要网络的操作必须通过 HTTP 节点;文件系统同样不可访问。

Python vs JavaScript 选择:数据处理和计算优选 Python(numpy/pandas 支持好);字符串模板和前端数据转换可选 JavaScript(Date API 更方便)。

可靠性设计:每个代码节点都应该有 try/except,输出 success 字段,让下游节点能判断是否成功。

可调试性:在开发阶段,把调试信息也包含在返回值中;在生产阶段用自定义异常消息传递错误上下文。

关键清单

本章评分
4.8  / 5  (28 评分)

💬 留言讨论