聚合管道
管道基础
聚合管道通过一系列阶段处理文档,每个阶段对文档进行转换。
// Basic structure
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $group: { _id: "$userId", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } },
{ $limit: 10 }
]);
// $match — filter documents (like WHERE)
{ $match: { status: "active", age: { $gte: 18 } } }
// $sort — order documents
{ $sort: { createdAt: -1, name: 1 } }
// $limit and $skip — pagination
{ $limit: 20 },
{ $skip: 40 }
$group — 分组聚合
// Group by field with accumulators
db.orders.aggregate([
{
$group: {
_id: "$category",
count: { $sum: 1 },
total: { $sum: "$amount" },
avgAmt: { $avg: "$amount" },
maxAmt: { $max: "$amount" },
minAmt: { $min: "$amount" },
allItems: { $push: "$item" },
uniqueUsers: { $addToSet: "$userId" }
}
}
]);
// Group by multiple fields
{ $group: { _id: { year: { $year: "$date" }, month: { $month: "$date" } }, count: { $sum: 1 } } }
// Grand total (group all)
{ $group: { _id: null, grandTotal: { $sum: "$amount" } } }
$project — 输出整形
// Include/exclude fields and compute new ones
db.users.aggregate([
{
$project: {
name: 1,
email: 1,
_id: 0,
fullName: { $concat: ["$firstName", " ", "$lastName"] },
age: { $dateDiff: { startDate: "$birthDate", endDate: "$$NOW", unit: "year" } },
isAdult: { $gte: ["$age", 18] }
}
}
]);
// $addFields — add computed fields without suppressing existing
{ $addFields: { tax: { $multiply: ["$price", 0.1] } } }
// $set is an alias for $addFields (MongoDB 4.2+)
{ $set: { updatedAt: "$$NOW" } }
$lookup — 连接
// Simple $lookup (left outer join)
db.orders.aggregate([
{
$lookup: {
from: "users",
localField: "userId",
foreignField: "_id",
as: "user"
}
},
{ $unwind: "$user" } // flatten the array
]);
// $lookup with pipeline (MongoDB 3.6+)
{
$lookup: {
from: "products",
let: { productIds: "$items.productId" },
pipeline: [
{ $match: { $expr: { $in: ["$_id", "$$productIds"] } } },
{ $project: { name: 1, price: 1 } }
],
as: "productDetails"
}
}
$unwind、$bucket、$facet
// $unwind: deconstruct array field into separate documents
{ $unwind: { path: "$tags", preserveNullAndEmpty: true } }
// $bucket: categorize documents into ranges
{
$bucket: {
groupBy: "$price",
boundaries: [0, 50, 100, 200, 500],
default: "500+",
output: { count: { $sum: 1 }, avgPrice: { $avg: "$price" } }
}
}
// $facet: run multiple sub-pipelines simultaneously
{
$facet: {
byCategory: [{ $group: { _id: "$category", cnt: { $sum: 1 } } }],
byStatus: [{ $group: { _id: "$status", cnt: { $sum: 1 } } }],
totalCount: [{ $count: "total" }]
}
}