Chapter 11

Code Nodes and Custom Functions: Extending Workflows with Python and JS

Chapter 11: Code Nodes and Custom Functions โ€” Python/JS to Extend Workflow Capabilities

Code nodes are the escape hatch of workflows โ€” when built-in nodes fall short, Python or JavaScript code lets you handle any data processing, transformation, and computation without platform constraints.

Chapter Overview

Dify's built-in nodes cover most common scenarios: LLM calls, knowledge base retrieval, HTTP requests, conditional branches. But in real business situations, you will always encounter problems that built-in nodes cannot solve directly:

Code nodes (Code Node) are designed precisely for these scenarios. They execute Python 3 or JavaScript code in a sandboxed environment, giving workflows Turing-complete data processing capabilities.

This chapter covers in depth:


Level 1: Fundamentals (1โ€“3 Years Experience)

1.1 Code Node Capability Boundaries

What you can do:

What you cannot do (secure sandbox restrictions):

For network access, use HTTP Request nodes. For file access, download via HTTP node first, then process in a Code node (passed as string or bytes).

1.2 Basic Structure of Code Nodes

Python Code nodes must define a main function. Function parameters correspond to upstream node variables, and the return value must be a dictionary:

def main(
    text: str,          # Receives the 'text' variable from upstream node
    count: int = 10,    # Parameter with default value
    items: list = None  # Array parameter
) -> dict:
    # Processing logic
    result = text.upper()[:count]

    # Must return a dictionary
    return {
        "processed_text": result,
        "word_count": len(text.split()),
        "is_long": len(text) > 100
    }

JavaScript Code nodes must define a main function returning an object:

async function main({ text, count = 10, items = [] }) {
    // Processing logic
    const result = text.toUpperCase().substring(0, count);

    return {
        processedText: result,
        wordCount: text.split(' ').length,
        isLong: text.length > 100
    };
}

Note: JavaScript Code nodes support async/await, but since network access is disabled, true async network operations are not possible.

1.3 Common Use Case 1: JSON Parsing and Data Cleaning

This is the most frequent use case for Code nodes. LLM output JSON often contains extra text, incomplete quotes, or other formatting issues:

import json
import re

def main(llm_output: str) -> dict:
    """Reliably extract JSON from LLM output"""

    text = llm_output.strip()

    # Method 1: Direct parse (ideal case)
    try:
        return {"data": json.loads(text), "success": True}
    except json.JSONDecodeError:
        pass

    # Method 2: Extract ```json ... ``` code block
    pattern = r'```(?:json)?\s*\n?([\s\S]*?)\n?\s*```'
    match = re.search(pattern, text)
    if match:
        try:
            return {"data": json.loads(match.group(1)), "success": True}
        except json.JSONDecodeError:
            pass

    # Method 3: Find first complete {...} or [...] structure
    for start_char, end_char in [('{', '}'), ('[', ']')]:
        start = text.find(start_char)
        if start == -1:
            continue

        # Use bracket matching to find the closing position
        depth = 0
        for i, char in enumerate(text[start:], start):
            if char == start_char:
                depth += 1
            elif char == end_char:
                depth -= 1
                if depth == 0:
                    json_str = text[start:i+1]
                    try:
                        return {"data": json.loads(json_str), "success": True}
                    except json.JSONDecodeError:
                        break

    return {
        "data": None,
        "success": False,
        "error": f"Cannot parse JSON. First 200 chars: {text[:200]}"
    }

1.4 Common Use Case 2: Text Extraction and Formatting

import re

def main(raw_text: str) -> dict:
    """Extract key information from unstructured text"""

    # Extract email addresses
    email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    emails = re.findall(email_pattern, raw_text)

    # Extract US phone numbers (10-digit format)
    phone_pattern = r'\b(?:\+1[-.\s]?)?\(?[0-9]{3}\)?[-.\s]?[0-9]{3}[-.\s]?[0-9]{4}\b'
    phones = re.findall(phone_pattern, raw_text)

    # Extract dates (multiple formats)
    date_patterns = [
        r'\d{4}[-/]\d{1,2}[-/]\d{1,2}',   # 2024-01-15
        r'\d{1,2}[-/]\d{1,2}[-/]\d{4}',    # 15/01/2024
        r'(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)[a-z]* \d{1,2},? \d{4}'
    ]
    dates = []
    for pattern in date_patterns:
        dates.extend(re.findall(pattern, raw_text, re.IGNORECASE))

    # Extract URLs
    url_pattern = r'https?://[^\s<>"{}|\\^`\[\]]+'
    urls = re.findall(url_pattern, raw_text)

    # Word count statistics
    words = len(re.findall(r'\b[a-zA-Z]+\b', raw_text))
    sentences = len(re.findall(r'[.!?]+', raw_text)) + 1

    return {
        "emails": list(set(emails)),
        "phones": list(set(phones)),
        "dates": list(set(dates)),
        "urls": list(set(urls)),
        "stats": {
            "word_count": words,
            "sentence_count": sentences,
            "char_count": len(raw_text)
        }
    }

1.5 Common Use Case 3: List Processing and Sorting

def main(
    items: list,
    sort_by: str = "score",
    descending: bool = True,
    top_n: int = 5
) -> dict:
    """Sort, filter, and aggregate a list"""

    if not items:
        return {"result": [], "stats": {}}

    # Filter out invalid data
    valid_items = [
        item for item in items
        if isinstance(item, dict) and sort_by in item
    ]

    # Sort
    sorted_items = sorted(
        valid_items,
        key=lambda x: x.get(sort_by, 0),
        reverse=descending
    )

    # Take Top-N
    top_items = sorted_items[:top_n]

    # Calculate statistics
    scores = [item.get(sort_by, 0) for item in valid_items]

    return {
        "result": top_items,
        "stats": {
            "total": len(valid_items),
            "max": max(scores) if scores else 0,
            "min": min(scores) if scores else 0,
            "avg": sum(scores) / len(scores) if scores else 0
        }
    }

Level 2: Mechanisms in Depth (3โ€“5 Years Experience)

2.1 Code Node Execution Environment in Detail

Python version: Dify Code nodes use Python 3.10+.

Pre-installed available libraries:

# Standard library (full support โ€” import as needed)
import json, re, math, datetime, collections, itertools
import base64, hashlib, hmac, uuid, string
import functools, operator, copy, typing

# Third-party libraries (pre-installed, import required)
import numpy as np           # Numerical computation
import pandas as pd          # Data manipulation
import yaml                  # YAML parsing
import markdown              # Markdown rendering
import jinja2                # Template engine

Unavailable libraries:

Timeout limits:

2.2 Using pandas in Code Nodes

pandas is one of the most powerful tools for structured data processing, and it's available in Code nodes:

import pandas as pd
import json

def main(data_json: str) -> dict:
    """
    Process tabular data using pandas.
    data_json: JSON string in format [{col1: val1, col2: val2}, ...]
    """

    # Parse JSON data
    records = json.loads(data_json)
    df = pd.DataFrame(records)

    # Data cleaning
    df = df.dropna(subset=['score', 'name'])
    df['score'] = pd.to_numeric(df['score'], errors='coerce')
    df = df[df['score'] >= 0]

    # Statistical analysis
    stats = {
        "total_records": len(df),
        "avg_score": round(df['score'].mean(), 2),
        "median_score": round(df['score'].median(), 2),
        "std_score": round(df['score'].std(), 2),
        "score_distribution": {
            "0-60": int((df['score'] < 60).sum()),
            "60-80": int(((df['score'] >= 60) & (df['score'] < 80)).sum()),
            "80-100": int((df['score'] >= 80).sum())
        }
    }

    # Group aggregation
    if 'department' in df.columns:
        dept_stats = df.groupby('department')['score'].agg(
            ['mean', 'count', 'max']
        ).round(2).to_dict('index')
        stats['by_department'] = dept_stats

    # Return Top-10 performers
    top_performers = df.nlargest(10, 'score')[
        ['name', 'score', 'department']
    ].to_dict('records')

    return {
        "stats": stats,
        "top_performers": top_performers,
        "cleaned_records": len(df)
    }

2.3 JavaScript Code Node Features

JavaScript Code nodes use a Node.js environment, particularly well-suited for:

String processing and templating:

async function main({ template, variables, items }) {
    // Template string substitution
    let result = template;

    // Replace {{variable}} placeholders
    for (const [key, value] of Object.entries(variables)) {
        result = result.replace(
            new RegExp(`\\{\\{${key}\\}\\}`, 'g'),
            String(value)
        );
    }

    // Handle list rendering
    if (items && items.length > 0) {
        const listText = items
            .map((item, idx) => `${idx + 1}. ${item}`)
            .join('\n');
        result = result.replace('{{items}}', listText);
    }

    return { rendered: result };
}

Date and time processing (JavaScript's Date API is more intuitive for some use cases):

async function main({ start_date_str, end_date_str }) {
    const start = new Date(start_date_str);
    const end = new Date(end_date_str);

    const diffMs = end - start;
    const diffDays = Math.floor(diffMs / (1000 * 60 * 60 * 24));
    const diffHours = Math.floor(
        (diffMs % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60)
    );

    const formatDate = (date) => date.toLocaleDateString('en-US', {
        year: 'numeric',
        month: 'long',
        day: 'numeric',
        weekday: 'long'
    });

    return {
        start_formatted: formatDate(start),
        end_formatted: formatDate(end),
        duration_days: diffDays,
        duration_hours: diffHours,
        is_same_month: start.getMonth() === end.getMonth()
    };
}

Deep JSON manipulation:

async function main({ nested_json_str }) {
    const data = JSON.parse(nested_json_str);

    // Recursively flatten nested object
    function flatten(obj, prefix = '') {
        return Object.keys(obj).reduce((acc, key) => {
            const fullKey = prefix ? `${prefix}.${key}` : key;

            if (
                typeof obj[key] === 'object' &&
                obj[key] !== null &&
                !Array.isArray(obj[key])
            ) {
                Object.assign(acc, flatten(obj[key], fullKey));
            } else {
                acc[fullKey] = obj[key];
            }

            return acc;
        }, {});
    }

    const flattened = flatten(data);

    return {
        flattened_json: JSON.stringify(flattened),
        key_count: Object.keys(flattened).length,
        keys: Object.keys(flattened)
    };
}

2.4 Extending Code Node Capabilities via External Services

Since Code nodes cannot access the network, for operations requiring network access, the recommended pattern is:

Pattern: Code node + HTTP node combination

Code node (prepare request parameters)
    โ†“
HTTP node (make the actual network request)
    โ†“
Code node (process response data)

Build a sidecar service

Create a lightweight HTTP service that encapsulates complex external integration logic, called via HTTP nodes:

# sidecar_service.py (independent Python service, not running inside Dify)
from fastapi import FastAPI
import requests

app = FastAPI()

@app.post("/extract-from-pdf")
async def extract_pdf(url: str):
    """Download PDF and extract text (Code nodes cannot do this)"""
    response = requests.get(url)
    text = extract_text_from_pdf(response.content)
    return {"text": text}

@app.post("/call-embedding")
async def call_embedding(texts: list):
    """Call Embedding API (bypasses Code node network restriction)"""
    import openai
    response = openai.embeddings.create(
        model="text-embedding-3-small",
        input=texts
    )
    return {"embeddings": [e.embedding for e in response.data]}

Call http://your-sidecar-service:8000/extract-from-pdf via a Dify HTTP node to perform operations Code nodes cannot.

2.5 Error Handling in Code Nodes

Code nodes should have comprehensive error handling to prevent edge-case inputs from failing the entire workflow:

def main(data: str, operation: str = "parse") -> dict:
    """Code node with comprehensive error handling"""

    result = {
        "success": False,
        "data": None,
        "error": None,
        "error_type": None
    }

    try:
        if operation == "parse":
            import json
            parsed = json.loads(data)
            result["data"] = parsed
            result["success"] = True

        elif operation == "process":
            processed = do_complex_processing(data)
            result["data"] = processed
            result["success"] = True

        else:
            result["error"] = f"Unknown operation: {operation}"
            result["error_type"] = "invalid_operation"

    except json.JSONDecodeError as e:
        result["error"] = f"JSON parse failed: {str(e)}"
        result["error_type"] = "json_error"

    except ValueError as e:
        result["error"] = f"Data validation failed: {str(e)}"
        result["error_type"] = "validation_error"

    except Exception as e:
        result["error"] = f"Unknown error: {str(e)}"
        result["error_type"] = "unknown_error"

    return result

Then check code_node.success in downstream conditional branches to route to the normal path or the error handling path.


Level 3: Source Code and Principles (5+ Years Experience)

3.1 Code Node Sandbox Implementation Principles

Dify uses DifySandbox to safely execute user code. Core sandbox mechanisms:

# api/core/workflow/nodes/code/code_node.py

class CodeNode(BaseNode):
    def _run(self, variable_pool: VariablePool) -> NodeRunResult:
        # Prepare input variables
        inputs = self._prepare_inputs(variable_pool)

        # Execute code via sandbox
        runner = CodeExecutor(
            code=self.node_data.code,
            code_language=self.node_data.code_language,
            timeout=self.node_data.timeout or 10
        )

        result = runner.execute(inputs)

        if not result.success:
            raise CodeExecutionError(result.error_message)

        return NodeRunResult(
            status=WorkflowNodeExecutionStatus.SUCCEEDED,
            outputs=result.outputs
        )

Sandbox security mechanisms (api/core/tools/utils/dify_sandbox/):

  1. Process isolation: User code runs in an independent subprocess; crashes don't affect the main process
  2. Resource limits:
    • CPU time: max 10 seconds (configurable)
    • Memory: max 256MB
    • File descriptors: restricted
  3. System call filtering (seccomp): dangerous system calls are blocked
  4. Network namespace isolation: Python network calls are rejected inside the sandbox

Python code execution flow:

import subprocess
import json
import sys

def execute_python_code(code: str, inputs: dict, timeout: int) -> dict:
    # Build complete execution script
    wrapper = f"""
import sys
import json

# Inject input variables
{chr(10).join(f"{k} = {repr(v)}" for k, v in inputs.items())}

# User code
{code}

# Execute and output result
result = main({', '.join(f'{k}={k}' for k in inputs)})
print(json.dumps(result))
"""

    # Execute in subprocess with timeout
    proc = subprocess.run(
        [sys.executable, '-c', wrapper],
        capture_output=True,
        timeout=timeout,
        preexec_fn=apply_seccomp_filter  # Restrict system calls
    )

    if proc.returncode != 0:
        raise CodeExecutionError(proc.stderr.decode())

    return json.loads(proc.stdout.decode())

3.2 Code Node Type System

Dify has a strict type conversion system between Code node inputs and outputs:

class CodeNodeVariableTypeConverter:
    """Handles type conversion for Code node inputs and outputs"""

    TYPE_MAP = {
        "string": str,
        "number": float,
        "boolean": bool,
        "object": dict,
        "array": list,
        "file": dict  # Files are converted to {name, url, mime_type} dicts
    }

    @classmethod
    def convert_input(cls, value, expected_type: str):
        """Convert workflow variable to Code node's expected type"""

        if expected_type == "number":
            # String "42" โ†’ integer/float 42
            try:
                return int(value) if str(value).isdigit() else float(value)
            except (ValueError, TypeError):
                return 0

        elif expected_type == "array" and isinstance(value, str):
            # String '["a","b"]' โ†’ Python list ["a", "b"]
            import json
            try:
                parsed = json.loads(value)
                if isinstance(parsed, list):
                    return parsed
            except json.JSONDecodeError:
                pass
            # If parsing fails, split by comma
            return [v.strip() for v in value.split(',')]

        # Type already matches, return directly
        if isinstance(value, cls.TYPE_MAP.get(expected_type, type(None))):
            return value

        # Force conversion
        converter = cls.TYPE_MAP.get(expected_type)
        try:
            return converter(value)
        except (ValueError, TypeError):
            return None

3.3 JavaScript Code Node V8 Sandbox

JavaScript Code nodes use a V8-engine-based isolated execution environment:

Node.js main process (Dify API Server)
    โ†“ spawns subprocess
Node.js subprocess (isolated JS execution environment)
    โ†“ creates independent context using vm module
vm.createContext({
    // Inject safe global objects only
    console: sandboxedConsole,
    JSON: JSON,
    Math: Math,
    Date: Date,
    // DO NOT inject: fetch, require, process, etc.
})
    โ†“ execute user code
vm.runInContext(userCode, context, { timeout: 10000 })

Key: The context created by vm.createContext() does not include require, process, fetch, or other modules. User code cannot access the filesystem or network โ€” attempting to do so throws a ReferenceError.


Level 4: Production Pitfalls and Decision-Making (Expert Perspective)

4.1 Pitfall 1: Large Data Processing in Code Nodes

Problem: Processing 10MB+ of data in a Code node may exceed memory limits or timeout.

Scenario: An Iteration node processes 500 documents, each analysis result is passed as a list to a Code node for aggregation. 500 ร— 2KB = 1MB โ€” looks fine, but data inflates 3โ€“5x during serialization/deserialization.

Solution 1: Aggregate inside the iteration (streaming reduce)

# Bad approach: collect all results then aggregate (high memory peak)
def main(all_results: list) -> dict:
    total = sum(r["score"] for r in all_results)
    return {"total": total}

# Good approach: maintain cumulative state via external storage (Redis)
# in each iteration's Code node
def main(score: float, job_id: str, redis_url: str) -> dict:
    import redis
    r = redis.from_url(redis_url)

    # Atomically accumulate score
    r.incrbyfloat(f"job:{job_id}:total_score", score)
    r.incr(f"job:{job_id}:count")

    return {"status": "accumulated"}

Solution 2: Batch processing

Split 1,000 tasks into batches of 50, each batch as an independent workflow call, coordinated by an external scheduler.

4.2 Pitfall 2: Idempotency Problems from Impure Functions

Problem: Code nodes using time-dependent, random, or external state logic:

# Problematic: different result on every run
def main(text: str) -> dict:
    import random
    import time

    strategy = random.choice(["strategy_a", "strategy_b"])  # Non-deterministic
    timestamp = time.time()  # May cause inconsistency on retry

    return {"strategy": strategy, "ts": timestamp}

Impact: On workflow retry, two executions produce different results, making debugging difficult.

Correct approach: Pass random seeds or timestamps as input parameters, don't generate them inside the Code node:

# Correct: deterministic function, same input = same output
def main(text: str, seed: int = 42, timestamp: float = 0) -> dict:
    import random

    rng = random.Random(seed)  # Fixed seed
    strategy = rng.choice(["strategy_a", "strategy_b"])

    return {"strategy": strategy, "ts": timestamp}

4.3 Pitfall 3: Debugging Difficulty โ€” print() Output Is Invisible

Code nodes don't provide a view of print() output during debugging. You can only see the final return value or error.

Debugging techniques:

  1. Embed debug info in the return value:
def main(data: str) -> dict:
    debug_log = []
    debug_log.append(f"Input length: {len(data)}")

    try:
        result = process(data)
        debug_log.append(f"Processing succeeded: {type(result).__name__}")
    except Exception as e:
        debug_log.append(f"Processing failed: {str(e)}")
        result = None

    return {
        "result": result,
        "_debug": debug_log  # Underscore-prefixed fields don't affect normal flow
    }
  1. Test code outside Dify first:
# Local test script (not running in Dify)
def main(data: str) -> dict:
    # Code node logic here
    pass

# Local test
if __name__ == "__main__":
    test_input = "test data..."
    result = main(test_input)
    print(result)
  1. Pass debug info via exception messages:
def main(data: str) -> dict:
    try:
        result = process(data)
        return {"result": result}
    except Exception as e:
        # Intentionally raise an exception with debug context
        raise ValueError(f"Processing failed. Input: {repr(data[:100])}, Error: {str(e)}")

4.4 Code Node vs HTTP Node: Choosing Correctly

Need Recommended Node Reason
String processing, regex Code node No network needed, fast
JSON parsing/formatting Code node Standard library sufficient
Math calculations, statistics Code node numpy/pandas available
Call external REST API HTTP node Code nodes have no network access
Database query HTTP node (database proxy) Requires network connection
LLM API call LLM node Dedicated node is cleaner
File download + processing HTTP node download + Code node process Divide and conquer

4.5 Code Node Version Management

Code node code is embedded directly in the workflow definition and is version-controlled with the workflow. But this creates a problem: the same processing logic (e.g., a JSON extraction function) may appear in dozens of workflows.

Best practice: Extract common utility functions as services

Deploy common utility functions as a lightweight HTTP service (self-built sidecar), called via HTTP nodes:

# Utility service (independently deployed)
from fastapi import FastAPI

app = FastAPI()

@app.post("/utils/extract-json")
def extract_json_endpoint(text: str) -> dict:
    """Standardized JSON extraction โ€” shared by all workflows, single version"""
    # ... complete JSON extraction implementation
    pass

@app.post("/utils/normalize-text")
def normalize_text(text: str, options: dict) -> dict:
    """Text normalization"""
    pass

Common logic is maintained centrally with a unified version, called by all workflows via HTTP nodes โ€” avoiding the maintenance nightmare of duplicated code.


Chapter Summary

Code nodes are the most flexible "escape hatch" in Dify workflows. Using them well requires:

Understand capability boundaries: Network access is an uncrossable line โ€” all operations requiring network must go through HTTP nodes. The filesystem is equally inaccessible.

Python vs JavaScript choice: For data processing and computation, prefer Python (better numpy/pandas support). For string templating and frontend-style data transforms, JavaScript works well (more convenient Date API).

Reliability by design: Every Code node should have try/except coverage and output a success field so downstream nodes can detect success or failure.

Debuggability: During development, include debug information in return values. In production, use custom exception messages to pass error context.

Key checklist:

Rate this chapter
4.8  / 5  (28 ratings)

๐Ÿ’ฌ Comments