聚合管道(Aggregation Pipeline)概述
聚合管道(Aggregation Pipeline)是MongoDB中用于处理数据流的强大工具,通过将多个操作阶段串联起来实现对文档的复杂转换和分析。每个阶段接收前一个阶段的输出作为输入,并生成新的文档集传递给下一个阶段。
聚合管道的基本结构
聚合管道由多个阶段(Stage)组成,每个阶段对数据进行特定操作。典型的管道结构如下:
db.collection.aggregate([
{ $match: { status: "A" } },
{ $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } }
])
这个例子展示了三个阶段的管道:
$match
阶段过滤出status为"A"的文档$group
阶段按cust_id分组并计算amount总和$sort
阶段按total降序排列结果
常用聚合阶段详解
$match 阶段
$match
用于过滤文档,类似于find()方法中的查询条件。它可以减少后续阶段需要处理的文档数量,提高性能。
{ $match: {
age: { $gt: 18 },
department: "engineering",
joinDate: { $gte: new Date("2020-01-01") }
} }
$group 阶段
$group
是聚合管道的核心阶段,用于分组和计算聚合值。它必须指定_id字段作为分组依据。
{ $group: {
_id: "$department",
averageSalary: { $avg: "$salary" },
maxAge: { $max: "$age" },
employeeCount: { $sum: 1 }
} }
$project 阶段
$project
用于重塑文档结构,可以包含、排除或重命名字段,还能创建计算字段。
{ $project: {
name: 1,
department: 1,
monthlySalary: { $divide: ["$salary", 12] },
_id: 0
} }
$sort 阶段
$sort
对文档进行排序,1表示升序,-1表示降序。
{ $sort: {
department: 1,
salary: -1
} }
$limit 和 $skip 阶段
这对阶段用于分页处理:
{ $skip: 20 },
{ $limit: 10 }
高级聚合操作
数组操作
MongoDB提供了丰富的数组操作符:
{ $unwind: "$tags" }, // 将数组展开为多个文档
{ $addToSet: "$skills" }, // 将值添加到集合中(去重)
{ $push: { comments: "$newComment" } } // 将值追加到数组
条件表达式
可以在聚合中使用条件逻辑:
{ $project: {
name: 1,
bonus: {
$cond: {
if: { $gte: ["$sales", 10000] },
then: 1000,
else: 0
}
}
} }
日期处理
MongoDB提供日期操作符:
{ $project: {
year: { $year: "$joinDate" },
month: { $month: "$joinDate" },
dayOfWeek: { $dayOfWeek: "$joinDate" }
} }
性能优化技巧
索引利用
确保$match
和$sort
阶段能使用索引:
- 将
$match
放在管道开头 - 在排序前使用
$match
减少文档数量 - 确保排序字段有索引
内存限制
聚合管道默认有100MB内存限制,对于大数据集:
- 使用
allowDiskUse
选项 - 优化管道减少中间结果大小
- 尽早使用
$match
和$project
减少数据量
分片集群考虑
在分片集群上:
- 第一阶段如果是
$match
,会在分片上执行 $group
阶段通常需要合并到主分片- 考虑使用
$out
阶段将结果写入新集合
实际应用示例
电商数据分析
db.orders.aggregate([
{ $match: {
orderDate: {
$gte: new Date("2023-01-01"),
$lt: new Date("2023-02-01")
}
} },
{ $unwind: "$items" },
{ $group: {
_id: "$items.category",
totalSales: { $sum: "$items.price" },
avgQuantity: { $avg: "$items.quantity" },
orderCount: { $sum: 1 }
} },
{ $sort: { totalSales: -1 } },
{ $limit: 5 }
])
用户行为分析
db.userActivities.aggregate([
{ $match: {
timestamp: { $gte: new Date("2023-06-01") }
} },
{ $group: {
_id: {
userId: "$userId",
action: "$actionType"
},
count: { $sum: 1 },
lastOccurred: { $max: "$timestamp" }
} },
{ $project: {
userId: "$_id.userId",
action: "$_id.action",
count: 1,
lastOccurred: 1,
isFrequent: { $gt: ["$count", 10] }
} },
{ $sort: { count: -1 } }
])
管道表达式和运算符
MongoDB提供了丰富的表达式和运算符:
算术运算符
{ $project: {
netPrice: { $subtract: ["$price", "$discount"] },
tax: { $multiply: ["$price", 0.1] },
finalPrice: {
$add: [
{ $subtract: ["$price", "$discount"] },
{ $multiply: ["$price", 0.1] }
]
}
} }
字符串操作
{ $project: {
fullName: { $concat: ["$firstName", " ", "$lastName"] },
emailDomain: { $substr: ["$email", { $indexOfBytes: ["$email", "@"] }, -1 ] },
nameLength: { $strLenCP: "$name" }
} }
聚合累加器
{ $group: {
_id: null,
total: { $sum: "$value" },
average: { $avg: "$value" },
min: { $min: "$value" },
max: { $max: "$value" },
values: { $push: "$value" },
uniqueValues: { $addToSet: "$value" }
} }
管道阶段顺序的重要性
聚合管道的阶段顺序会显著影响结果和性能:
- 尽早使用
$match
减少文档数量 - 在
$group
前使用$sort
可以优化某些分组操作 $project
可以放在不同位置实现不同目的:- 早期减少字段数量
- 后期重塑输出格式
错误顺序示例:
// 效率低的顺序
db.orders.aggregate([
{ $project: { items: 1 } },
{ $unwind: "$items" },
{ $match: { "items.price": { $gt: 100 } } }
])
// 优化后的顺序
db.orders.aggregate([
{ $match: { "items.price": { $gt: 100 } } },
{ $unwind: "$items" },
{ $match: { "items.price": { $gt: 100 } } },
{ $project: { items: 1 } }
])
特殊阶段和功能
$facet 阶段
$facet
允许在单个聚合管道中执行多个子管道:
db.products.aggregate([
{ $facet: {
"priceStats": [
{ $match: { category: "Electronics" } },
{ $group: {
_id: null,
avgPrice: { $avg: "$price" },
maxPrice: { $max: "$price" }
} }
],
"topSellers": [
{ $sort: { sales: -1 } },
{ $limit: 5 },
{ $project: { name: 1, sales: 1 } }
]
} }
])
$lookup 阶段
实现类似SQL的join操作:
db.orders.aggregate([
{ $lookup: {
from: "customers",
localField: "customerId",
foreignField: "_id",
as: "customerInfo"
} },
{ $unwind: "$customerInfo" },
{ $project: {
orderId: 1,
amount: 1,
customerName: "$customerInfo.name"
} }
])
$graphLookup 阶段
递归查找文档,适用于层级数据:
db.employees.aggregate([
{ $match: { name: "John Doe" } },
{ $graphLookup: {
from: "employees",
startWith: "$reportsTo",
connectFromField: "reportsTo",
connectToField: "_id",
as: "managementChain",
depthField: "level"
} }
])
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:索引选择与排序优化
下一篇:配置级别(系统、全局、本地)