批量操作(bulkWrite)
批量操作(bulkWrite)
MongoDB的bulkWrite
方法提供了一种高效执行多个写操作的方式,通过单次网络请求完成插入、更新、删除等操作组合。相比逐条执行,它能显著减少网络开销和服务器负载,特别适合需要原子性处理大批量数据的场景。
核心概念与语法
bulkWrite
接受一个由写操作对象组成的数组,每个对象代表一个独立操作。基本语法结构如下:
db.collection.bulkWrite(
[ <operation 1>, <operation 2>, ... ],
{
writeConcern: <document>,
ordered: <boolean>
}
)
其中关键参数:
operations
:包含多个写操作的数组ordered
:默认为true,表示顺序执行操作(遇到错误即停止);false时继续执行后续操作writeConcern
:可选,设置写操作确认级别
支持的操作类型
插入文档(insertOne)
{
insertOne: {
"document": { name: "产品A", price: 99 }
}
}
更新文档(updateOne/updateMany)
单条更新示例:
{
updateOne: {
"filter": { _id: ObjectId("5f8792b...") },
"update": { $set: { stock: 45 } },
"upsert": true // 可选,不存在时插入
}
}
批量更新示例:
{
updateMany: {
"filter": { category: "电子" },
"update": { $inc: { viewCount: 1 } }
}
}
替换文档(replaceOne)
{
replaceOne: {
"filter": { sku: "XJ-208" },
"replacement": {
sku: "XJ-208",
name: "新版控制器",
inventory: 120
}
}
}
删除文档(deleteOne/deleteMany)
// 删除单条
{
deleteOne: { "filter": { status: "过期" } }
}
// 批量删除
{
deleteMany: { "filter": { createDate: { $lt: ISODate("2023-01-01") } } }
}
实际应用示例
电商库存管理场景
const operations = [
// 插入新产品
{
insertOne: {
document: {
sku: "P-1002",
name: "无线耳机",
stock: 200,
price: 299
}
}
},
// 更新现有产品库存
{
updateOne: {
filter: { sku: "P-1001" },
update: { $inc: { stock: 50 } }
}
},
// 下架缺货商品
{
updateMany: {
filter: { stock: { $lte: 0 } },
update: { $set: { status: "缺货" } }
}
},
// 删除过期促销
{
deleteMany: {
filter: {
promoEnd: { $lt: new Date() }
}
}
}
];
db.products.bulkWrite(operations, { ordered: false });
用户数据迁移案例
const userUpdates = [
// 迁移旧系统ID
{
updateMany: {
filter: { legacyId: { $exists: true } },
update: {
$set: {
"meta.legacyId": "$legacyId",
migrated: true
},
$unset: { legacyId: "" }
}
}
},
// 标准化电话号码格式
{
updateMany: {
filter: { phone: /^(\d{3})(\d{4})(\d{4})$/ },
update: {
$set: {
phone: {
$concat: [
"$phone.substr(0,3)", "-",
"$phone.substr(3,4)", "-",
"$phone.substr(7,4)"
]
}
}
}
}
}
];
db.users.bulkWrite(userUpdates);
性能优化策略
批处理大小控制
// 分批次处理10万条数据
const batchSize = 1000;
for (let i = 0; i < 100000; i += batchSize) {
const batchOps = generateOperations(i, batchSize);
db.collection.bulkWrite(batchOps, { ordered: false });
}
function generateOperations(offset, limit) {
// 生成具体操作逻辑
return [...];
}
索引优化建议
- 确保
filter
条件字段已建立索引 - 对于更新操作,包含
_id
的查询效率最高 - 避免全集合扫描的
updateMany
/deleteMany
写关注级别调整
// 降低写确认要求提升吞吐量
db.collection.bulkWrite(ops, {
writeConcern: { w: 1, j: false }
});
错误处理模式
有序操作错误捕获
try {
const result = db.orders.bulkWrite([
{ insertOne: { ... } },
{ updateOne: { ... } } // 假设此处失败
], { ordered: true });
} catch (e) {
console.error("失败操作位置:", e.result.nInserted + 1);
console.error("错误详情:", e.writeErrors[0].errmsg);
}
无序操作结果分析
const result = db.logs.bulkWrite([...], { ordered: false });
if (result.hasWriteErrors()) {
result.getWriteErrors().forEach(err => {
console.log(`操作${err.index}失败:`, err.errmsg);
});
}
console.log("成功插入:", result.nInserted);
console.log("成功更新:", result.nModified);
特殊场景处理
混合操作原子性
// 转账事务示例
const transferOps = [
{
updateOne: {
filter: { _id: acc1Id, balance: { $gte: 100 } },
update: { $inc: { balance: -100 } }
}
},
{
updateOne: {
filter: { _id: acc2Id },
update: { $inc: { balance: 100 } }
}
}
];
// 在事务会话中执行
const session = db.getMongo().startSession();
session.withTransaction(() => {
db.accounts.bulkWrite(transferOps, { session });
});
数组批量更新技巧
// 为所有匹配文档添加数组元素
{
updateMany: {
filter: { department: "研发" },
update: {
$push: {
projects: {
$each: ["新网关系统", "数据平台"],
$position: 0
}
}
}
}
}
与替代方案对比
与insertMany比较
// insertMany只能处理插入
db.collection.insertMany([doc1, doc2, doc3])
// bulkWrite可实现混合操作
db.collection.bulkWrite([
{ insertOne: { document: doc1 } },
{ updateOne: { ... } },
{ deleteOne: { ... } }
])
与循环单次操作对比
// 低效方式(网络往返次数多)
for (const doc of docs) {
db.collection.updateOne(
{ _id: doc._id },
{ $set: { value: doc.newValue } }
);
}
// 高效方式(单次网络请求)
const ops = docs.map(doc => ({
updateOne: {
filter: { _id: doc._id },
update: { $set: { value: doc.newValue } }
}
}));
db.collection.bulkWrite(ops);
监控与调试
获取操作统计
const result = db.inventory.bulkWrite([...]);
console.log(JSON.stringify(result, null, 2));
/* 输出示例:
{
"ok": 1,
"nInserted": 3,
"nUpserted": 0,
"nMatched": 5,
"nModified": 5,
"nRemoved": 2,
"upserted": [],
"writeErrors": []
}
*/
使用explain分析
const explain = db.collection.explain().bulkWrite([...]);
console.log(explain.queryPlanner.winningPlan);
最佳实践建议
- 批量大小控制:根据文档平均大小调整,通常1000-5000操作/批次
- 错误处理:始终检查返回结果中的
writeErrors
- 重试机制:对于网络错误实现指数退避重试
- 模式设计:考虑文档结构对批量更新的影响
- 监控指标:跟踪
nModified
与nMatched
的比例检测意外更新
实际业务案例
日志归档处理
const lastMonth = new Date();
lastMonth.setMonth(lastMonth.getMonth() - 1);
const logArchiveOps = [
// 将旧日志标记为归档
{
updateMany: {
filter: {
timestamp: { $lt: lastMonth },
status: { $ne: "archived" }
},
update: { $set: { status: "archived" } }
}
},
// 复制到归档集合
{
insertMany: {
documents: db.logs.find({
timestamp: { $lt: lastMonth }
}).toArray()
}
},
// 清理原始集合
{
deleteMany: {
filter: {
timestamp: { $lt: lastMonth }
}
}
}
];
db.logs.bulkWrite(logArchiveOps);
价格全面调整
const priceAdjustments = [
// 电子产品涨价10%
{
updateMany: {
filter: { category: "electronics" },
update: {
$mul: { price: 1.1 },
$currentDate: { lastModified: true }
}
}
},
// 清仓商品降价30%
{
updateMany: {
filter: { clearance: true },
update: {
$mul: { price: 0.7 },
$set: { promoTag: "清仓特卖" }
}
}
},
// 删除已下架商品
{
deleteMany: {
filter: {
status: "discontinued",
lastSold: { $lt: new Date("2022-01-01") }
}
}
}
];
db.products.bulkWrite(priceAdjustments, { ordered: false });
本站部分内容来自互联网,一切版权均归源网站或源作者所有。
如果侵犯了你的权益请来信告知我们删除。邮箱:cc@cccx.cn