数据处理节点
Ch07 数据处理节点
数据在节点间流动的过程中,往往需要重塑、过滤、计算和合并。n8n 提供了一套完整的数据处理节点,从简单的字段增删到复杂的 JavaScript/Python 逻辑,覆盖所有典型的数据转换需求。本章通过电商订单清洗这一完整实战场景,把所有数据处理节点串联讲解。
数据处理节点速览
| 节点 | 主要作用 | 典型场景 |
|---|---|---|
| Set | 添加/修改/删除字段 | 给数据加标签、修改字段值 |
| Edit Fields | 批量字段映射与重命名 | 把 API 响应字段重命名为业务字段 |
| Code | JS/Python 自定义逻辑 | 复杂计算、字符串处理、业务规则 |
| Filter | 条件过滤 items | 只保留状态为"已完成"的订单 |
| Split In Batches | 将 items 分批处理 | 大量数据分批写入 DB,控制 API 调用速率 |
| Aggregate | 多条 items 合并为一条 | 汇总求和、将列表打包发送 |
| Merge | 合并两个输入的数据 | 关联两个数据源的数据 |
| Sort | 对 items 按字段排序 | 按金额从大到小排序订单 |
| Limit | 限制输出 items 数量 | 调试时限制只处理 N 条 |
Set 节点:添加/修改/删除字段
Set 节点是最常用的数据处理节点,用于在现有 items 上添加新字段、修改已有字段值,或删除不需要的字段。
Set 节点有三种工作模式:
- Keep All Fields(默认):保留原有所有字段,添加/修改指定字段
- Replace All Fields:丢弃原有所有字段,只保留你明确添加的字段(适合数据清洗,去掉冗余字段)
- Delete Fields:只删除指定字段,其余保留
Set vs Edit Fields 的区别: Set 节点适合添加计算字段或修改少数字段;Edit Fields 适合批量重命名字段或做大规模字段映射。新版 n8n 推荐用 Edit Fields 替代 Rename Keys。
Code 节点:JavaScript 和 Python 自定义逻辑
Code 节点是 n8n 中最强大的数据处理工具,允许你编写任意 JavaScript 或 Python 代码。它直接在工作流中执行,无需外部服务器。
JavaScript 模式(推荐)
JavaScript 模式内置了 n8n 的所有内置变量($json、$node、$getWorkflowStaticData 等),运行在 Node.js 环境中:
// 场景1:对每个 item 做复杂计算
return items.map(item => {
const order = item.json;
// 计算折扣后价格
const discountRate = order.vipLevel === 'gold' ? 0.85 : 0.95;
const finalPrice = (order.originalPrice * discountRate).toFixed(2);
// 格式化日期
const orderDate = new Date(order.createdAt);
const formattedDate = orderDate.toLocaleDateString('zh-CN');
return {
json: {
...order, // 保留原有所有字段
finalPrice, // 添加计算字段
formattedDate,
isHighValue: parseFloat(finalPrice) > 1000,
}
};
});
// 场景2:将多个 items 合并为一个
const summary = {
orders: items.map(i => i.json),
totalAmount: items.reduce((sum, i) => sum + i.json.amount, 0),
count: items.length,
};
return [{ json: summary }]; // 返回单个 item
Python 模式
Python 模式适合熟悉 Python 的开发者,或需要使用特定 Python 库的场景:
# Python 模式:使用内置数据进行统计分析
# items 是一个 Python 列表,每个元素是字典
orders = [item['json'] for item in _input.all()]
# 按状态分组统计
status_counts = {}
for order in orders:
status = order.get('status', 'unknown')
status_counts[status] = status_counts.get(status, 0) + 1
# 计算平均金额
amounts = [order.get('amount', 0) for order in orders]
avg_amount = sum(amounts) / len(amounts) if amounts else 0
return [{
'json': {
'statusBreakdown': status_counts,
'avgAmount': round(avg_amount, 2),
'totalOrders': len(orders),
}
}]
Split In Batches:大数据集分批处理
当工作流需要处理大量数据(如一次性同步 10000 条订单到数据库),直接处理会遇到内存压力或 API 限速问题。Split In Batches 节点将 items 数组按指定大小切分,每次只处理一批。
使用方式: Split In Batches 节点有两个输出端口:Loop(当前批次的数据,连接到处理节点)和 Done(所有批次处理完后触发,连接到后续汇总逻辑)。Loop 输出会连回到 Split In Batches 节点形成循环,直到所有批次处理完毕。
// 节点配置:
{
"batchSize": 50, // 每批 50 条
"options": {
"reset": false // 是否在下次运行时重置批次状态
}
}
// 典型工作流结构:
// [获取10000条数据] → [Split In Batches(50)] → [写入数据库]
// ↑_______________________↓ (Loop)
// ↓ (Done)
// [发送完成通知]
Aggregate 节点:多条数据合并为一条
Aggregate 节点是 Split In Batches 的"逆操作":把多个独立的 items 合并成一条数据。常见用途:
- 将多条数据库查询结果合并为一个数组,再一次性写入文件
- 把多个 API 调用结果汇总后统一发送通知
- 统计分析:把每条记录的字段汇总成总计/平均值
Aggregate 节点有两种模式:
- Append All Items to Array:把所有 items 的 json 收集到一个数组字段中
- Aggregate Individual Fields:对指定字段执行聚合操作(求和、计数、最大/最小值)
实战:清洗电商订单数据
假设你从 Shopify API 拉取了原始订单数据,需要清洗成符合内部 CRM 格式的结构,再批量写入 MySQL:
// 输入:Shopify API 返回的原始订单数组
// 输出:符合内部 CRM 格式的清洗后数据
const cleanOrders = items
// 1. 过滤:只处理已完成(fulfilled)且已支付的订单
.filter(item => {
const o = item.json;
return o.fulfillment_status === 'fulfilled' && o.financial_status === 'paid';
})
// 2. 转换:字段重命名 + 数据类型转换 + 计算衍生字段
.map(item => {
const o = item.json;
// 提取客户信息
const customer = o.customer || {};
const shippingAddr = (o.shipping_address || {});
// 计算商品总数量
const totalQuantity = (o.line_items || [])
.reduce((sum, li) => sum + (li.quantity || 0), 0);
const totalPrice = parseFloat(o.total_price || 0);
const isHighValue = totalPrice >= 500;
return {
json: {
orderId: o.id?.toString(),
orderNo: o.order_number?.toString(),
orderDate: o.created_at ? o.created_at.substring(0, 10) : '',
customerName: `${customer.first_name || ''} ${customer.last_name || ''}`.trim(),
customerEmail: customer.email || '',
customerPhone: customer.phone || shippingAddr.phone || '',
totalAmount: totalPrice,
currency: o.currency || 'CNY',
itemCount: totalQuantity,
shippingCity: shippingAddr.city || '',
shippingProvince: shippingAddr.province || '',
isHighValue,
source: 'shopify',
syncedAt: new Date().toISOString(),
}
};
})
// 3. 去重:按 orderId 去除重复
.filter((item, index, arr) =>
arr.findIndex(a => a.json.orderId === item.json.orderId) === index
);
return cleanOrders;
数据质量检查(写入数据库前验证):
// 在写入数据库前,先做数据质量检查
const validItems = [];
const invalidItems = [];
for (const item of items) {
const o = item.json;
const errors = [];
// 必填字段检查
if (!o.orderId) errors.push('Missing orderId');
if (!o.customerEmail || !o.customerEmail.includes('@')) errors.push('Invalid email');
if (typeof o.totalAmount !== 'number' || o.totalAmount < 0) errors.push('Invalid amount');
if (errors.length === 0) {
validItems.push(item);
} else {
invalidItems.push({
json: { ...o, _errors: errors, _rejectedAt: new Date().toISOString() }
});
}
}
// 将问题数据存入 Static Data 供后续检查
const staticData = $getWorkflowStaticData('global');
staticData.lastRejected = invalidItems.map(i => i.json);
return validItems;
最佳实践: 对于数据清洗工作流,建议在 Code 节点中统一完成过滤→转换→验证三个步骤,而不是用多个 Filter 和 Set 节点分散处理。一个大的 Code 节点更容易维护,逻辑更清晰,执行也更快。