窗口函数($setWindowFields)
窗口函数($setWindowFields)
窗口函数是MongoDB 5.0引入的强大聚合操作符,允许在文档集合上执行计算时访问相邻文档的数据。与传统聚合操作不同,窗口函数可以基于定义的窗口框架对数据进行分区和排序,然后在每个窗口内执行计算。
基本语法结构
$setWindowFields
的基本语法如下:
{
$setWindowFields: {
partitionBy: <expression>, // 分区字段
sortBy: { <field1>: <sortOrder>, ... }, // 排序规则
output: { // 输出字段定义
<outputField1>: {
<windowOperator>: <expression>,
window: {
documents: [ <lowerBound>, <upperBound> ], // 文档窗口
range: [ <lowerBound>, <upperBound> ], // 值范围窗口
unit: <timeUnit> // 时间单位
}
},
...
}
}
}
分区与排序
分区(partitionBy
)将数据分成多个组,窗口函数在每个分区内独立计算。排序(sortBy
)确定分区内文档的处理顺序:
// 按部门分区并按工资降序排序
db.employees.aggregate([
{
$setWindowFields: {
partitionBy: "$department",
sortBy: { salary: -1 },
output: {
rank: {
$rank: {},
window: { documents: [ "unbounded", "current" ] }
}
}
}
}
])
窗口范围定义
窗口范围决定计算时包含哪些文档:
-
文档窗口:基于文档位置
["unbounded", "unbounded"]
:整个分区["unbounded", "current"]
:从分区开始到当前行[ -1, 1 ]
:当前行前后各1行
-
值范围窗口:基于字段值
[ -100, 100 ]
:当前值±100范围内的文档
// 计算3个月滑动窗口内的平均销售额
db.sales.aggregate([
{
$setWindowFields: {
partitionBy: "$product",
sortBy: { date: 1 },
output: {
threeMonthAvg: {
$avg: "$amount",
window: {
range: [ -3, 0 ],
unit: "month"
}
}
}
}
}
])
常用窗口函数
排名函数
$rank
:标准排名(并列会跳过后续排名)$denseRank
:密集排名(并列不跳过排名)$rowNumber
:简单的行号
// 计算部门内工资排名
db.employees.aggregate([
{
$setWindowFields: {
partitionBy: "$department",
sortBy: { salary: -1 },
output: {
rank: { $rank: {} },
denseRank: { $denseRank: {} },
rowNumber: { $rowNumber: {} }
}
}
}
])
聚合函数
标准聚合函数如$sum
、$avg
、$min
、$max
等都可以作为窗口函数使用:
// 计算每个产品的累计销售额
db.sales.aggregate([
{
$setWindowFields: {
partitionBy: "$product",
sortBy: { date: 1 },
output: {
cumulativeSales: {
$sum: "$amount",
window: { documents: [ "unbounded", "current" ] }
}
}
}
}
])
位移函数
$shift
:访问相对于当前行的其他行$first
/$last
:获取窗口内的第一个/最后一个值
// 计算每月销售额环比增长率
db.monthly_sales.aggregate([
{
$setWindowFields: {
sortBy: { yearMonth: 1 },
output: {
prevMonthSales: {
$shift: {
output: "$amount",
by: -1,
default: null
}
},
growthRate: {
$divide: [
{ $subtract: [ "$amount", { $shift: { output: "$amount", by: -1 } } ] },
{ $shift: { output: "$amount", by: -1 } }
]
}
}
}
}
])
高级应用示例
移动平均线计算
// 计算7天移动平均温度
db.temperature_readings.aggregate([
{
$setWindowFields: {
partitionBy: "$sensor_id",
sortBy: { timestamp: 1 },
output: {
sevenDayAvg: {
$avg: "$temperature",
window: {
range: [ -6, 0 ], // 当前日+前6天
unit: "day"
}
}
}
}
}
])
会话分割
// 基于30分钟不活动分割用户会话
db.user_clicks.aggregate([
{
$setWindowFields: {
partitionBy: "$user_id",
sortBy: { timestamp: 1 },
output: {
newSession: {
$function: {
body: function(timestamp, prevTimestamp) {
return (!prevTimestamp ||
(timestamp - prevTimestamp) > 30*60*1000) ? 1 : 0;
},
args: ["$timestamp", { $shift: { output: "$timestamp", by: -1 } }],
lang: "js"
}
},
sessionId: {
$sum: {
$function: {
body: function(timestamp, prevTimestamp) {
return (!prevTimestamp ||
(timestamp - prevTimestamp) > 30*60*1000) ? 1 : 0;
},
args: ["$timestamp", { $shift: { output: "$timestamp", by: -1 } }],
lang: "js"
}
},
window: { documents: [ "unbounded", "current" ] }
}
}
}
}
])
性能优化建议
- 合理使用分区:分区字段应该有适当的基数,避免创建过多小分区或单个超大分区
- 利用索引:确保
partitionBy
和sortBy
字段有索引支持 - 限制窗口大小:避免使用无界窗口(
unbounded
)除非必要 - 管道顺序:尽量在早期阶段使用
$match
和$project
减少数据处理量
// 优化后的查询示例
db.large_collection.aggregate([
{ $match: { status: "active" } }, // 先过滤
{ $project: { _id: 1, value: 1, category: 1 } }, // 减少字段
{
$setWindowFields: {
partitionBy: "$category",
sortBy: { value: -1 },
output: {
rank: { $denseRank: {} }
}
}
}
])
与其他聚合阶段结合
窗口函数可以与其他聚合阶段灵活组合:
// 先分组聚合,再计算移动平均
db.sales.aggregate([
{
$group: {
_id: { product: "$product", month: { $month: "$date" } },
totalSales: { $sum: "$amount" }
}
},
{
$setWindowFields: {
partitionBy: "$_id.product",
sortBy: { "_id.month": 1 },
output: {
threeMonthAvg: {
$avg: "$totalSales",
window: { documents: [ -2, 0 ] } // 当前月及前两个月
}
}
}
}
])
实际业务场景
电商分析
// 计算用户购买频次排名
db.orders.aggregate([
{
$group: {
_id: "$customer_id",
orderCount: { $sum: 1 },
totalSpent: { $sum: "$amount" }
}
},
{
$setWindowFields: {
sortBy: { orderCount: -1 },
output: {
frequencyRank: { $rank: {} },
top10Pct: {
$percentRank: {},
window: { documents: [ "unbounded", "unbounded" ] }
}
}
}
}
])
金融时间序列
// 计算股票布林带指标
db.stock_prices.aggregate([
{
$setWindowFields: {
partitionBy: "$symbol",
sortBy: { date: 1 },
output: {
ma20: {
$avg: "$close",
window: { documents: [ -19, 0 ] }
},
std20: {
$stdDevPop: "$close",
window: { documents: [ -19, 0 ] }
},
upperBand: {
$add: [
{ $avg: "$close", window: { documents: [ -19, 0 ] } },
{ $multiply: [
2,
{ $stdDevPop: "$close", window: { documents: [ -19, 0 ] } }
]}
]
}
}
}
}
])
限制与注意事项
- 窗口函数不能用于
$facet
阶段 - 内存限制:大窗口可能需要大量内存
- 分片集合:分区字段通常应与分片键一致以获得最佳性能
- 结果大小:输出文档不能超过16MB限制
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn