Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(model): execute valid write operations if calling bulkWrite() with ordered: false #13218

Merged
merged 3 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 65 additions & 17 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -3736,33 +3736,81 @@ Model.bulkWrite = function(ops, options, callback) {
options = null;
}
options = options || {};
const ordered = options.ordered == null ? true : options.ordered;

const validations = ops.map(op => castBulkWrite(this, op, options));

callback = this.$handleCallbackError(callback);
return this.db.base._promiseOrCallback(callback, cb => {
cb = this.$wrapCallback(cb);
each(validations, (fn, cb) => fn(cb), error => {
if (error) {
return cb(error);
}
if (ordered) {
each(validations, (fn, cb) => fn(cb), error => {
if (error) {
return cb(error);
}

if (ops.length === 0) {
return cb(null, getDefaultBulkwriteResult());
}
if (ops.length === 0) {
return cb(null, getDefaultBulkwriteResult());
}
Copy link

@doomhz doomhz Nov 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should be moved somewhere above the if (ordered) { block. Please see #14117


try {
this.$__collection.bulkWrite(ops, options, (error, res) => {
if (error) {
return cb(error);
try {
this.$__collection.bulkWrite(ops, options, (error, res) => {
if (error) {
return cb(error);
}

cb(null, res);
});
} catch (err) {
return cb(err);
}
});

return;
}

let remaining = validations.length;
let validOps = [];
let validationErrors = [];
for (let i = 0; i < validations.length; ++i) {
validations[i]((err) => {
if (err == null) {
validOps.push(i);
} else {
validationErrors.push({ index: i, error: err });
}
if (--remaining <= 0) {
completeUnorderedValidation.call(this);
}
vkarpov15 marked this conversation as resolved.
Show resolved Hide resolved
});
}

validationErrors = validationErrors.
sort((v1, v2) => v1.index - v2.index).
map(v => v.error);

function completeUnorderedValidation() {
validOps.sort();
validOps = validOps.map(index => ops[index]);
vkarpov15 marked this conversation as resolved.
Show resolved Hide resolved

this.$__collection.bulkWrite(validOps, options, (error, res) => {
if (error) {
if (validationErrors.length > 0) {
error.mongoose = error.mongoose || {};
error.mongoose.validationErrors = validationErrors;
}

cb(null, res);
});
} catch (err) {
return cb(err);
}
});
return cb(error);
}

if (validationErrors.length > 0) {
res.mongoose = res.mongoose || {};
res.mongoose.validationErrors = validationErrors;
}

cb(null, res);
});
}
}, this.events);
};

Expand Down
63 changes: 63 additions & 0 deletions test/model.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6010,6 +6010,69 @@ describe('Model', function() {
}
}]);
});

it('sends valid ops if ordered = false (gh-13176)', async function() {
const testSchema = new mongoose.Schema({
num: Number
});
const Test = db.model('Test', testSchema);

const res = await Test.bulkWrite([
{
updateOne: {
filter: {},
update: { $set: { num: 'not a number' } },
upsert: true
}
},
{
updateOne: {
filter: {},
update: { $set: { num: 42 } }
}
}
], { ordered: false });
assert.ok(res.mongoose);
assert.equal(res.mongoose.validationErrors.length, 1);
assert.strictEqual(res.result.nUpserted, 0);
});

it('decorates write error with validation errors if unordered fails (gh-13176)', async function() {
const testSchema = new mongoose.Schema({
num: Number
});
const Test = db.model('Test', testSchema);

await Test.deleteMany({});
const { _id } = await Test.create({ num: 42 });

const err = await Test.bulkWrite([
{
updateOne: {
filter: {},
update: { $set: { num: 'not a number' } },
upsert: true
}
},
{
updateOne: {
filter: {},
update: { $push: { num: 42 } }
}
},
{
updateOne: {
filter: {},
update: { $inc: { num: 57 } }
}
}
], { ordered: false }).then(() => null, err => err);
assert.ok(err);
assert.equal(err.mongoose.validationErrors.length, 1);

const { num } = await Test.findById(_id);
assert.equal(num, 99);
});
});

it('insertMany with Decimal (gh-5190)', async function() {
Expand Down
11 changes: 8 additions & 3 deletions types/models.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,14 @@ declare module 'mongoose' {
* if you use `create()`) because with `bulkWrite()` there is only one network
* round trip to the MongoDB server.
*/
bulkWrite(writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>, options: mongodb.BulkWriteOptions & MongooseBulkWriteOptions, callback: Callback<mongodb.BulkWriteResult>): void;
bulkWrite(writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>, callback: Callback<mongodb.BulkWriteResult>): void;
bulkWrite(writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>, options?: mongodb.BulkWriteOptions & MongooseBulkWriteOptions): Promise<mongodb.BulkWriteResult>;
bulkWrite(
writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>,
options: mongodb.BulkWriteOptions & MongooseBulkWriteOptions & { ordered: false }
): Promise<mongodb.BulkWriteResult & { mongoose?: { validationErrors: Error[] } }>;
bulkWrite(
writes: Array<mongodb.AnyBulkWriteOperation<T extends Document ? any : (T extends {} ? T : any)>>,
options?: mongodb.BulkWriteOptions & MongooseBulkWriteOptions
): Promise<mongodb.BulkWriteResult>;

/**
* Sends multiple `save()` calls in a single `bulkWrite()`. This is faster than
Expand Down