第 7 章

数据处理节点

Ch07 数据处理节点

数据在节点间流动的过程中,往往需要重塑、过滤、计算和合并。n8n 提供了一套完整的数据处理节点,从简单的字段增删到复杂的 JavaScript/Python 逻辑,覆盖所有典型的数据转换需求。本章通过电商订单清洗这一完整实战场景,把所有数据处理节点串联讲解。

数据处理节点速览

节点 主要作用 典型场景
Set 添加/修改/删除字段 给数据加标签、修改字段值
Edit Fields 批量字段映射与重命名 把 API 响应字段重命名为业务字段
Code JS/Python 自定义逻辑 复杂计算、字符串处理、业务规则
Filter 条件过滤 items 只保留状态为"已完成"的订单
Split In Batches 将 items 分批处理 大量数据分批写入 DB,控制 API 调用速率
Aggregate 多条 items 合并为一条 汇总求和、将列表打包发送
Merge 合并两个输入的数据 关联两个数据源的数据
Sort 对 items 按字段排序 按金额从大到小排序订单
Limit 限制输出 items 数量 调试时限制只处理 N 条

Set 节点:添加/修改/删除字段

Set 节点是最常用的数据处理节点,用于在现有 items 上添加新字段、修改已有字段值,或删除不需要的字段。

Set 节点有三种工作模式:

Set vs Edit Fields 的区别: Set 节点适合添加计算字段或修改少数字段;Edit Fields 适合批量重命名字段或做大规模字段映射。新版 n8n 推荐用 Edit Fields 替代 Rename Keys。

Code 节点:JavaScript 和 Python 自定义逻辑

Code 节点是 n8n 中最强大的数据处理工具,允许你编写任意 JavaScript 或 Python 代码。它直接在工作流中执行,无需外部服务器。

JavaScript 模式(推荐)

JavaScript 模式内置了 n8n 的所有内置变量($json$node$getWorkflowStaticData 等),运行在 Node.js 环境中:

// 场景1:对每个 item 做复杂计算
return items.map(item => {
  const order = item.json;

  // 计算折扣后价格
  const discountRate = order.vipLevel === 'gold' ? 0.85 : 0.95;
  const finalPrice = (order.originalPrice * discountRate).toFixed(2);

  // 格式化日期
  const orderDate = new Date(order.createdAt);
  const formattedDate = orderDate.toLocaleDateString('zh-CN');

  return {
    json: {
      ...order,          // 保留原有所有字段
      finalPrice,        // 添加计算字段
      formattedDate,
      isHighValue: parseFloat(finalPrice) > 1000,
    }
  };
});

// 场景2:将多个 items 合并为一个
const summary = {
  orders: items.map(i => i.json),
  totalAmount: items.reduce((sum, i) => sum + i.json.amount, 0),
  count: items.length,
};
return [{ json: summary }]; // 返回单个 item

Python 模式

Python 模式适合熟悉 Python 的开发者,或需要使用特定 Python 库的场景:

# Python 模式:使用内置数据进行统计分析
# items 是一个 Python 列表,每个元素是字典

orders = [item['json'] for item in _input.all()]

# 按状态分组统计
status_counts = {}
for order in orders:
    status = order.get('status', 'unknown')
    status_counts[status] = status_counts.get(status, 0) + 1

# 计算平均金额
amounts = [order.get('amount', 0) for order in orders]
avg_amount = sum(amounts) / len(amounts) if amounts else 0

return [{
  'json': {
    'statusBreakdown': status_counts,
    'avgAmount': round(avg_amount, 2),
    'totalOrders': len(orders),
  }
}]

Split In Batches:大数据集分批处理

当工作流需要处理大量数据(如一次性同步 10000 条订单到数据库),直接处理会遇到内存压力或 API 限速问题。Split In Batches 节点将 items 数组按指定大小切分,每次只处理一批。

使用方式: Split In Batches 节点有两个输出端口:Loop(当前批次的数据,连接到处理节点)和 Done(所有批次处理完后触发,连接到后续汇总逻辑)。Loop 输出会连回到 Split In Batches 节点形成循环,直到所有批次处理完毕。

// 节点配置:
{
  "batchSize": 50,    // 每批 50 条
  "options": {
    "reset": false    // 是否在下次运行时重置批次状态
  }
}

// 典型工作流结构:
// [获取10000条数据] → [Split In Batches(50)] → [写入数据库]
//                              ↑_______________________↓ (Loop)
//                                      ↓ (Done)
//                              [发送完成通知]

Aggregate 节点:多条数据合并为一条

Aggregate 节点是 Split In Batches 的"逆操作":把多个独立的 items 合并成一条数据。常见用途:

Aggregate 节点有两种模式:

实战:清洗电商订单数据

假设你从 Shopify API 拉取了原始订单数据,需要清洗成符合内部 CRM 格式的结构,再批量写入 MySQL:

// 输入:Shopify API 返回的原始订单数组
// 输出:符合内部 CRM 格式的清洗后数据

const cleanOrders = items
  // 1. 过滤:只处理已完成(fulfilled)且已支付的订单
  .filter(item => {
    const o = item.json;
    return o.fulfillment_status === 'fulfilled' && o.financial_status === 'paid';
  })
  // 2. 转换:字段重命名 + 数据类型转换 + 计算衍生字段
  .map(item => {
    const o = item.json;

    // 提取客户信息
    const customer = o.customer || {};
    const shippingAddr = (o.shipping_address || {});

    // 计算商品总数量
    const totalQuantity = (o.line_items || [])
      .reduce((sum, li) => sum + (li.quantity || 0), 0);

    const totalPrice = parseFloat(o.total_price || 0);
    const isHighValue = totalPrice >= 500;

    return {
      json: {
        orderId:         o.id?.toString(),
        orderNo:         o.order_number?.toString(),
        orderDate:       o.created_at ? o.created_at.substring(0, 10) : '',
        customerName:    `${customer.first_name || ''} ${customer.last_name || ''}`.trim(),
        customerEmail:   customer.email || '',
        customerPhone:   customer.phone || shippingAddr.phone || '',
        totalAmount:     totalPrice,
        currency:        o.currency || 'CNY',
        itemCount:       totalQuantity,
        shippingCity:    shippingAddr.city || '',
        shippingProvince: shippingAddr.province || '',
        isHighValue,
        source:          'shopify',
        syncedAt:        new Date().toISOString(),
      }
    };
  })
  // 3. 去重:按 orderId 去除重复
  .filter((item, index, arr) =>
    arr.findIndex(a => a.json.orderId === item.json.orderId) === index
  );

return cleanOrders;

数据质量检查(写入数据库前验证):

// 在写入数据库前,先做数据质量检查
const validItems = [];
const invalidItems = [];

for (const item of items) {
  const o = item.json;
  const errors = [];

  // 必填字段检查
  if (!o.orderId) errors.push('Missing orderId');
  if (!o.customerEmail || !o.customerEmail.includes('@')) errors.push('Invalid email');
  if (typeof o.totalAmount !== 'number' || o.totalAmount < 0) errors.push('Invalid amount');

  if (errors.length === 0) {
    validItems.push(item);
  } else {
    invalidItems.push({
      json: { ...o, _errors: errors, _rejectedAt: new Date().toISOString() }
    });
  }
}

// 将问题数据存入 Static Data 供后续检查
const staticData = $getWorkflowStaticData('global');
staticData.lastRejected = invalidItems.map(i => i.json);

return validItems;

最佳实践: 对于数据清洗工作流,建议在 Code 节点中统一完成过滤→转换→验证三个步骤,而不是用多个 Filter 和 Set 节点分散处理。一个大的 Code 节点更容易维护,逻辑更清晰,执行也更快。

本章评分
4.6  / 5  (46 评分)

💬 留言讨论