Oplog备份与恢复
Oplog简介
MongoDB的Oplog(操作日志)是一个特殊的固定集合,用于记录所有修改数据库状态的操作。它作为复制集的核心组件,实现了数据的同步和故障恢复。Oplog采用幂等性原则设计,每条记录代表一个最小粒度的数据变更操作。
Oplog工作原理
Oplog存储在local数据库的oplog.rs集合中,其文档结构包含以下关键字段:
{
ts: Timestamp(1630000000, 1), // 操作时间戳
h: NumberLong("123456789"), // 操作唯一标识
v: 2, // 版本号
op: "i", // 操作类型(i=插入,u=更新,d=删除)
ns: "test.users", // 命名空间
o: { _id: 1, name: "Alice" }, // 操作文档
o2: { _id: 1 } // 更新操作的条件文档
}
典型操作类型包括:
i
:插入文档u
:更新文档d
:删除文档c
:执行命令n
:空操作(心跳检测)
Oplog备份策略
物理备份方案
使用mongodump进行热备份:
mongodump --host rs0/localhost:27017,localhost:27018 \
--db local --collection oplog.rs \
--out /backup/oplog_$(date +%Y%m%d)
结合LVM快照实现原子备份:
lvcreate --size 1G --snapshot --name mongo-snap /dev/vg0/mongo
mkdir /backup/snapshot
mount /dev/vg0/mongo-snap /backup/snapshot
mongodump --db local --collection oplog.rs --out /backup/snapshot
逻辑备份方案
创建增量备份脚本:
const { MongoClient } = require('mongodb');
async function backupOplog(lastTimestamp) {
const client = await MongoClient.connect('mongodb://rs0/localhost:27017');
const oplog = client.db('local').collection('oplog.rs');
const query = lastTimestamp ? { ts: { $gt: lastTimestamp } } : {};
const cursor = oplog.find(query).sort({ $natural: 1 });
const backupFile = `./oplog_backup_${Date.now()}.bson`;
const fs = require('fs');
const stream = fs.createWriteStream(backupFile);
while (await cursor.hasNext()) {
const doc = await cursor.next();
stream.write(bson.serialize(doc));
}
stream.end();
client.close();
}
Oplog恢复技术
全量恢复流程
- 停止所有secondary节点
- 在主节点执行冻结操作:
db.fsyncLock();
- 使用bsondump恢复数据:
mongorestore --host rs0/localhost:27017 \
--db local --collection oplog.rs \
/backup/oplog_backup.bson
- 解除冻结:
db.fsyncUnlock();
时间点恢复
通过replayOplog实现精确恢复:
const oplog = db.getSiblingDB('local').oplog.rs;
const targetTime = new Timestamp(1630000000, 0);
oplog.find({ ts: { $lte: targetTime } }).forEach(doc => {
switch(doc.op) {
case 'i':
db.getCollection(doc.ns).insert(doc.o);
break;
case 'u':
db.getCollection(doc.ns).update(doc.o2, doc.o);
break;
case 'd':
db.getCollection(doc.ns).remove(doc.o);
break;
}
});
生产环境实践
容量规划建议
Oplog大小计算公式:
所需Oplog大小 = (写入速率 × 最长故障恢复时间) × 安全系数(1.5-2)
示例配置:
// 设置5GB的Oplog大小
db.adminCommand({
replSetResizeOplog: 1,
size: 5 * 1024
});
监控与告警
关键监控指标:
// 获取Oplog状态
const oplogStatus = db.getReplicationInfo();
printjson({
"usedSize": oplogStatus.usedMB + "MB",
"timeDiff": oplogStatus.timeDiffHours + "hours",
"firstEvent": oplogStatus.firstEvent,
"lastEvent": oplogStatus.lastEvent
});
// 设置告警阈值
const threshold = 24; // 小时
if (oplogStatus.timeDiffHours < threshold) {
print("警告:Oplog窗口不足" + threshold + "小时");
}
高级应用场景
跨集群同步
使用Change Stream实现实时同步:
const pipeline = [{ $match: { operationType: { $in: ["insert", "update", "delete"] } }];
const changeStream = db.collection('users').watch(pipeline);
changeStream.on('change', (change) => {
const targetClient = new MongoClient('mongodb://target-cluster');
targetClient.connect().then(client => {
const coll = client.db('prod').collection('users');
switch(change.operationType) {
case 'insert':
return coll.insertOne(change.fullDocument);
case 'update':
return coll.updateOne(
{ _id: change.documentKey._id },
{ $set: change.updateDescription.updatedFields }
);
case 'delete':
return coll.deleteOne({ _id: change.documentKey._id });
}
});
});
数据审计实现
构建操作审计系统:
// 创建审计集合
db.createCollection("auditLog", {
capped: true,
size: 100000000,
max: 100000
});
// Oplog监听处理器
const processOplog = async (oplogDoc) => {
if (!['i', 'u', 'd'].includes(oplogDoc.op)) return;
const auditEntry = {
timestamp: new Date(),
operation: oplogDoc.op,
namespace: oplogDoc.ns,
operator: db.currentOp().clientMetadata?.user,
details: {
documentKey: oplogDoc.o._id || oplogDoc.o2._id,
changes: oplogDoc.op === 'u' ? oplogDoc.o : null
}
};
await db.auditLog.insertOne(auditEntry);
};
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn
上一篇:增量备份与时间点恢复
下一篇:分片集群的备份与恢复