Aggregation Pipeline

Pipeline Basics

The aggregation pipeline processes documents through a series of stages. Each stage transforms the documents.

// Basic structure
db.orders.aggregate([
  { $match:   { status: "completed" } },
  { $group:   { _id: "$userId", total: { $sum: "$amount" } } },
  { $sort:    { total: -1 } },
  { $limit:   10 }
]);

// $match โ€” filter documents (like WHERE)
{ $match: { status: "active", age: { $gte: 18 } } }

// $sort โ€” order documents
{ $sort: { createdAt: -1, name: 1 } }

// $limit and $skip โ€” pagination
{ $limit: 20 },
{ $skip: 40 }

$group โ€” Aggregation

// Group by field with accumulators
db.orders.aggregate([
  {
    $group: {
      _id: "$category",
      count:    { $sum: 1 },
      total:    { $sum: "$amount" },
      avgAmt:   { $avg: "$amount" },
      maxAmt:   { $max: "$amount" },
      minAmt:   { $min: "$amount" },
      allItems: { $push: "$item" },
      uniqueUsers: { $addToSet: "$userId" }
    }
  }
]);

// Group by multiple fields
{ $group: { _id: { year: { $year: "$date" }, month: { $month: "$date" } }, count: { $sum: 1 } } }

// Grand total (group all)
{ $group: { _id: null, grandTotal: { $sum: "$amount" } } }

$project โ€” Shape Output

// Include/exclude fields and compute new ones
db.users.aggregate([
  {
    $project: {
      name: 1,
      email: 1,
      _id: 0,
      fullName: { $concat: ["$firstName", " ", "$lastName"] },
      age: { $dateDiff: { startDate: "$birthDate", endDate: "$$NOW", unit: "year" } },
      isAdult: { $gte: ["$age", 18] }
    }
  }
]);

// $addFields โ€” add computed fields without suppressing existing
{ $addFields: { tax: { $multiply: ["$price", 0.1] } } }

// $set is an alias for $addFields (MongoDB 4.2+)
{ $set: { updatedAt: "$$NOW" } }

$lookup โ€” Joins

// Simple $lookup (left outer join)
db.orders.aggregate([
  {
    $lookup: {
      from:         "users",
      localField:   "userId",
      foreignField: "_id",
      as:           "user"
    }
  },
  { $unwind: "$user" }  // flatten the array
]);

// $lookup with pipeline (MongoDB 3.6+)
{
  $lookup: {
    from: "products",
    let: { productIds: "$items.productId" },
    pipeline: [
      { $match: { $expr: { $in: ["$_id", "$$productIds"] } } },
      { $project: { name: 1, price: 1 } }
    ],
    as: "productDetails"
  }
}

$unwind, $bucket, $facet

// $unwind: deconstruct array field into separate documents
{ $unwind: { path: "$tags", preserveNullAndEmpty: true } }

// $bucket: categorize documents into ranges
{
  $bucket: {
    groupBy: "$price",
    boundaries: [0, 50, 100, 200, 500],
    default: "500+",
    output: { count: { $sum: 1 }, avgPrice: { $avg: "$price" } }
  }
}

// $facet: run multiple sub-pipelines simultaneously
{
  $facet: {
    byCategory: [{ $group: { _id: "$category", cnt: { $sum: 1 } } }],
    byStatus:   [{ $group: { _id: "$status",   cnt: { $sum: 1 } } }],
    totalCount: [{ $count: "total" }]
  }
}