Skip to content

Commit

Permalink
Merge pull request #13218 from Automattic/vkarpov15/gh-13176
Browse files Browse the repository at this point in the history
fix(model): execute valid write operations if calling `bulkWrite()` with ordered: false
  • Loading branch information
vkarpov15 authored Apr 6, 2023
2 parents 90527df + 36df3a0 commit 25c7535
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 20 deletions.
81 changes: 64 additions & 17 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -3736,33 +3736,80 @@ Model.bulkWrite = function(ops, options, callback) {
options = null;
}
options = options || {};
const ordered = options.ordered == null ? true : options.ordered;

const validations = ops.map(op => castBulkWrite(this, op, options));

callback = this.$handleCallbackError(callback);
return this.db.base._promiseOrCallback(callback, cb => {
cb = this.$wrapCallback(cb);
each(validations, (fn, cb) => fn(cb), error => {
if (error) {
return cb(error);
}
if (ordered) {
each(validations, (fn, cb) => fn(cb), error => {
if (error) {
return cb(error);
}

if (ops.length === 0) {
return cb(null, getDefaultBulkwriteResult());
}
if (ops.length === 0) {
return cb(null, getDefaultBulkwriteResult());
}

try {
this.$__collection.bulkWrite(ops, options, (error, res) => {
if (error) {
return cb(error);
try {
this.$__collection.bulkWrite(ops, options, (error, res) => {
if (error) {
return cb(error);
}

cb(null, res);
});
} catch (err) {
return cb(err);
}
});

return;
}

let remaining = validations.length;
let validOps = [];
let validationErrors = [];
for (let i = 0; i < validations.length; ++i) {
validations[i]((err) => {
if (err == null) {
validOps.push(i);
} else {
validationErrors.push({ index: i, error: err });
}
if (--remaining <= 0) {
completeUnorderedValidation.call(this);
}
});
}

validationErrors = validationErrors.
sort((v1, v2) => v1.index - v2.index).
map(v => v.error);

function completeUnorderedValidation() {
validOps = validOps.sort().map(index => ops[index]);

this.$__collection.bulkWrite(validOps, options, (error, res) => {
if (error) {
if (validationErrors.length > 0) {
error.mongoose = error.mongoose || {};
error.mongoose.validationErrors = validationErrors;
}

cb(null, res);
});
} catch (err) {
return cb(err);
}
});
return cb(error);
}

if (validationErrors.length > 0) {
res.mongoose = res.mongoose || {};
res.mongoose.validationErrors = validationErrors;
}

cb(null, res);
});
}
}, this.events);
};

Expand Down
63 changes: 63 additions & 0 deletions test/model.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6010,6 +6010,69 @@ describe('Model', function() {
}
}]);
});

it('sends valid ops if ordered = false (gh-13176)', async function() {
const testSchema = new mongoose.Schema({
num: Number
});
const Test = db.model('Test', testSchema);

const res = await Test.bulkWrite([
{
updateOne: {
filter: {},
update: { $set: { num: 'not a number' } },
upsert: true
}
},
{
updateOne: {
filter: {},
update: { $set: { num: 42 } }
}
}
], { ordered: false });
assert.ok(res.mongoose);
assert.equal(res.mongoose.validationErrors.length, 1);
assert.strictEqual(res.result.nUpserted, 0);
});

it('decorates write error with validation errors if unordered fails (gh-13176)', async function() {
const testSchema = new mongoose.Schema({
num: Number
});
const Test = db.model('Test', testSchema);

await Test.deleteMany({});
const { _id } = await Test.create({ num: 42 });

const err = await Test.bulkWrite([
{
updateOne: {
filter: {},
update: { $set: { num: 'not a number' } },
upsert: true
}
},
{
updateOne: {
filter: {},
update: { $push: { num: 42 } }
}
},
{
updateOne: {
filter: {},
update: { $inc: { num: 57 } }
}
}
], { ordered: false }).then(() => null, err => err);
assert.ok(err);
assert.equal(err.mongoose.validationErrors.length, 1);

const { num } = await Test.findById(_id);
assert.equal(num, 99);
});
});

it('insertMany with Decimal (gh-5190)', async function() {
Expand Down
11 changes: 8 additions & 3 deletions types/models.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,14 @@ declare module 'mongoose' {
* if you use `create()`) because with `bulkWrite()` there is only one network
* round trip to the MongoDB server.
*/
bulkWrite(writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>, options: mongodb.BulkWriteOptions & MongooseBulkWriteOptions, callback: Callback<mongodb.BulkWriteResult>): void;
bulkWrite(writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>, callback: Callback<mongodb.BulkWriteResult>): void;
bulkWrite(writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>, options?: mongodb.BulkWriteOptions & MongooseBulkWriteOptions): Promise<mongodb.BulkWriteResult>;
bulkWrite(
writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>,
options: mongodb.BulkWriteOptions & MongooseBulkWriteOptions & { ordered: false }
): Promise<mongodb.BulkWriteResult & { mongoose?: { validationErrors: Error[] } }>;
bulkWrite(
writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>,
options?: mongodb.BulkWriteOptions & MongooseBulkWriteOptions
): Promise<mongodb.BulkWriteResult>;

/**
* Sends multiple `save()` calls in a single `bulkWrite()`. This is faster than
Expand Down

0 comments on commit 25c7535

Please sign in to comment.