阿里云主机折上折
  • 微信号
您当前的位置:网站首页 > 数据变更监听(Change Streams)

数据变更监听(Change Streams)

作者:陈川 阅读数:41968人阅读 分类: MongoDB

数据变更监听(Change Streams)

Mongoose的Change Streams功能允许开发者监听MongoDB集合中文档的变更事件。当文档被插入、更新、删除或替换时,Change Streams会实时推送这些变更,非常适合需要实时同步数据的场景。

Change Streams的基本用法

在Mongoose中,可以通过watch()方法开启Change Streams监听。以下是一个基本示例:

const mongoose = require('mongoose');
const Schema = mongoose.Schema;

const userSchema = new Schema({
  name: String,
  age: Number,
  email: String
});

const User = mongoose.model('User', userSchema);

// 连接到MongoDB
mongoose.connect('mongodb://localhost:27017/test');

// 监听User集合的变更
const changeStream = User.watch();

changeStream.on('change', (change) => {
  console.log('检测到变更:', change);
});

当对User集合进行任何修改时,控制台都会输出变更信息。例如插入一个新文档:

User.create({ name: '张三', age: 25, email: 'zhangsan@example.com' });

这会触发change事件,输出类似以下内容:

{
  _id: { _data: '8262...' },
  operationType: 'insert',
  fullDocument: {
    _id: 5f8b...,
    name: '张三',
    age: 25,
    email: 'zhangsan@example.com',
    __v: 0
  },
  ns: { db: 'test', coll: 'users' },
  documentKey: { _id: 5f8b... }
}

变更事件类型

Change Streams可以监听多种类型的变更事件,主要包括:

  1. insert:文档被插入时触发
  2. update:文档被更新时触发
  3. delete:文档被删除时触发
  4. replace:文档被替换时触发
  5. invalidate:当变更流因某些原因无效时触发

可以通过检查operationType属性来判断变更类型:

changeStream.on('change', (change) => {
  switch(change.operationType) {
    case 'insert':
      console.log('新文档插入:', change.fullDocument);
      break;
    case 'update':
      console.log('文档更新:', change.documentKey, change.updateDescription);
      break;
    case 'delete':
      console.log('文档删除:', change.documentKey);
      break;
    case 'replace':
      console.log('文档替换:', change.fullDocument);
      break;
  }
});

过滤特定变更

有时我们只关心特定条件的变更,可以通过传递管道(pipeline)参数来过滤:

const pipeline = [
  { $match: { 'fullDocument.age': { $gte: 18 } } },
  { $match: { operationType: 'insert' } }
];

const changeStream = User.watch(pipeline);

这样只有当插入的文档中age字段大于等于18时才会触发变更事件。

变更详情

对于update操作,变更对象包含updateDescription字段,详细说明了哪些字段被修改:

User.updateOne({ name: '张三' }, { $set: { age: 26 } });

对应的变更事件会包含:

{
  "updateDescription": {
    "updatedFields": {
      "age": 26
    },
    "removedFields": []
  }
}

恢复变更流

Change Streams支持从特定时间点恢复监听,这在应用重启后非常有用:

// 保存上一次的变更令牌
let resumeToken;

changeStream.on('change', (change) => {
  resumeToken = change._id;
});

// 应用重启后
const newChangeStream = User.watch([], { resumeAfter: resumeToken });

高级配置选项

watch()方法接受多个配置选项:

const options = {
  fullDocument: 'updateLookup', // 对于update操作也返回完整文档
  batchSize: 100,             // 每批处理的最大文档数
  maxAwaitTimeMS: 5000        // 等待新变更的最长时间
};

const changeStream = User.watch([], options);

实际应用场景

  1. 实时通知系统:当用户收到新消息时实时推送
  2. 数据同步:保持不同系统间的数据一致性
  3. 审计日志:记录所有数据变更用于审计
  4. 缓存失效:当数据变更时使相关缓存失效

性能考虑

虽然Change Streams非常强大,但也需要注意:

  1. 会对数据库产生额外负载
  2. 需要合理处理高频率变更
  3. 考虑网络延迟和重连机制
  4. 可能需要实现背压机制防止消费者过载

错误处理

必须妥善处理错误事件:

changeStream.on('error', (err) => {
  console.error('变更流错误:', err);
  // 实现重连逻辑
});

与WebSocket集成示例

以下是将Change Streams与WebSocket结合的示例:

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws) => {
  const changeStream = User.watch();
  
  changeStream.on('change', (change) => {
    ws.send(JSON.stringify(change));
  });
  
  ws.on('close', () => {
    changeStream.close();
  });
});

限制与注意事项

  1. 需要MongoDB副本集或分片集群
  2. 需要适当的权限配置
  3. 长时间运行的变更流可能消耗大量内存
  4. 某些操作如集合删除不会触发变更事件

本站部分内容来自互联网,一切版权均归源网站或源作者所有。

如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn

前端川

前端川,陈川的代码茶馆🍵,专治各种不服的Bug退散符💻,日常贩卖秃头警告级的开发心得🛠️,附赠一行代码笑十年的摸鱼宝典🐟,偶尔掉落咖啡杯里泡开的像素级浪漫☕。‌