阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 聚合管道(Aggregation Pipeline)概述

聚合管道(Aggregation Pipeline)概述

作者:陈川 阅读数:22408人阅读 分类: MongoDB

聚合管道(Aggregation Pipeline)是MongoDB中用于处理数据流的强大工具,通过将多个操作阶段串联起来实现对文档的复杂转换和分析。每个阶段接收前一个阶段的输出作为输入,并生成新的文档集传递给下一个阶段。

聚合管道的基本结构

聚合管道由多个阶段(Stage)组成,每个阶段对数据进行特定操作。典型的管道结构如下:

db.collection.aggregate([
  { $match: { status: "A" } },
  { $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
  { $sort: { total: -1 } }
])

这个例子展示了三个阶段的管道:

  1. $match阶段过滤出status为"A"的文档
  2. $group阶段按cust_id分组并计算amount总和
  3. $sort阶段按total降序排列结果

常用聚合阶段详解

$match 阶段

$match用于过滤文档,类似于find()方法中的查询条件。它可以减少后续阶段需要处理的文档数量,提高性能。

{ $match: { 
  age: { $gt: 18 },
  department: "engineering",
  joinDate: { $gte: new Date("2020-01-01") }
} }

$group 阶段

$group是聚合管道的核心阶段,用于分组和计算聚合值。它必须指定_id字段作为分组依据。

{ $group: {
  _id: "$department",
  averageSalary: { $avg: "$salary" },
  maxAge: { $max: "$age" },
  employeeCount: { $sum: 1 }
} }

$project 阶段

$project用于重塑文档结构,可以包含、排除或重命名字段,还能创建计算字段。

{ $project: {
  name: 1,
  department: 1,
  monthlySalary: { $divide: ["$salary", 12] },
  _id: 0
} }

$sort 阶段

$sort对文档进行排序,1表示升序,-1表示降序。

{ $sort: { 
  department: 1,
  salary: -1 
} }

$limit 和 $skip 阶段

这对阶段用于分页处理:

{ $skip: 20 },
{ $limit: 10 }

高级聚合操作

数组操作

MongoDB提供了丰富的数组操作符:

{ $unwind: "$tags" },  // 将数组展开为多个文档
{ $addToSet: "$skills" },  // 将值添加到集合中(去重)
{ $push: { comments: "$newComment" } }  // 将值追加到数组

条件表达式

可以在聚合中使用条件逻辑:

{ $project: {
  name: 1,
  bonus: {
    $cond: {
      if: { $gte: ["$sales", 10000] },
      then: 1000,
      else: 0
    }
  }
} }

日期处理

MongoDB提供日期操作符:

{ $project: {
  year: { $year: "$joinDate" },
  month: { $month: "$joinDate" },
  dayOfWeek: { $dayOfWeek: "$joinDate" }
} }

性能优化技巧

索引利用

确保$match$sort阶段能使用索引:

  • $match放在管道开头
  • 在排序前使用$match减少文档数量
  • 确保排序字段有索引

内存限制

聚合管道默认有100MB内存限制,对于大数据集:

  • 使用allowDiskUse选项
  • 优化管道减少中间结果大小
  • 尽早使用$match$project减少数据量

分片集群考虑

在分片集群上:

  • 第一阶段如果是$match,会在分片上执行
  • $group阶段通常需要合并到主分片
  • 考虑使用$out阶段将结果写入新集合

实际应用示例

电商数据分析

db.orders.aggregate([
  { $match: { 
    orderDate: { 
      $gte: new Date("2023-01-01"),
      $lt: new Date("2023-02-01") 
    } 
  } },
  { $unwind: "$items" },
  { $group: {
    _id: "$items.category",
    totalSales: { $sum: "$items.price" },
    avgQuantity: { $avg: "$items.quantity" },
    orderCount: { $sum: 1 }
  } },
  { $sort: { totalSales: -1 } },
  { $limit: 5 }
])

用户行为分析

db.userActivities.aggregate([
  { $match: { 
    timestamp: { $gte: new Date("2023-06-01") } 
  } },
  { $group: {
    _id: {
      userId: "$userId",
      action: "$actionType"
    },
    count: { $sum: 1 },
    lastOccurred: { $max: "$timestamp" }
  } },
  { $project: {
    userId: "$_id.userId",
    action: "$_id.action",
    count: 1,
    lastOccurred: 1,
    isFrequent: { $gt: ["$count", 10] }
  } },
  { $sort: { count: -1 } }
])

管道表达式和运算符

MongoDB提供了丰富的表达式和运算符:

算术运算符

{ $project: {
  netPrice: { $subtract: ["$price", "$discount"] },
  tax: { $multiply: ["$price", 0.1] },
  finalPrice: { 
    $add: [
      { $subtract: ["$price", "$discount"] },
      { $multiply: ["$price", 0.1] }
    ]
  }
} }

字符串操作

{ $project: {
  fullName: { $concat: ["$firstName", " ", "$lastName"] },
  emailDomain: { $substr: ["$email", { $indexOfBytes: ["$email", "@"] }, -1 ] },
  nameLength: { $strLenCP: "$name" }
} }

聚合累加器

{ $group: {
  _id: null,
  total: { $sum: "$value" },
  average: { $avg: "$value" },
  min: { $min: "$value" },
  max: { $max: "$value" },
  values: { $push: "$value" },
  uniqueValues: { $addToSet: "$value" }
} }

管道阶段顺序的重要性

聚合管道的阶段顺序会显著影响结果和性能:

  1. 尽早使用$match减少文档数量
  2. $group前使用$sort可以优化某些分组操作
  3. $project可以放在不同位置实现不同目的:
    • 早期减少字段数量
    • 后期重塑输出格式

错误顺序示例:

// 效率低的顺序
db.orders.aggregate([
  { $project: { items: 1 } },
  { $unwind: "$items" },
  { $match: { "items.price": { $gt: 100 } } }
])

// 优化后的顺序
db.orders.aggregate([
  { $match: { "items.price": { $gt: 100 } } },
  { $unwind: "$items" },
  { $match: { "items.price": { $gt: 100 } } },
  { $project: { items: 1 } }
])

特殊阶段和功能

$facet 阶段

$facet允许在单个聚合管道中执行多个子管道:

db.products.aggregate([
  { $facet: {
    "priceStats": [
      { $match: { category: "Electronics" } },
      { $group: {
        _id: null,
        avgPrice: { $avg: "$price" },
        maxPrice: { $max: "$price" }
      } }
    ],
    "topSellers": [
      { $sort: { sales: -1 } },
      { $limit: 5 },
      { $project: { name: 1, sales: 1 } }
    ]
  } }
])

$lookup 阶段

实现类似SQL的join操作:

db.orders.aggregate([
  { $lookup: {
    from: "customers",
    localField: "customerId",
    foreignField: "_id",
    as: "customerInfo"
  } },
  { $unwind: "$customerInfo" },
  { $project: {
    orderId: 1,
    amount: 1,
    customerName: "$customerInfo.name"
  } }
])

$graphLookup 阶段

递归查找文档,适用于层级数据:

db.employees.aggregate([
  { $match: { name: "John Doe" } },
  { $graphLookup: {
    from: "employees",
    startWith: "$reportsTo",
    connectFromField: "reportsTo",
    connectToField: "_id",
    as: "managementChain",
    depthField: "level"
  } }
])

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌