Skip to content

Commit

Permalink
Merge pull request #14831 from Automattic/8.6
Browse files Browse the repository at this point in the history
8.6
  • Loading branch information
vkarpov15 authored Aug 28, 2024
2 parents fb0febb + 0a93334 commit 473a636
Show file tree
Hide file tree
Showing 22 changed files with 309 additions and 118 deletions.
52 changes: 31 additions & 21 deletions lib/cursor/changeStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ class ChangeStream extends EventEmitter {
);
}

// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
this.emit('error', err);
return;
}
this.$driverChangeStreamPromise = new Promise((resolve, reject) => {
// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
this.emit('error', err);
return reject(err);
}

this.driverChangeStream = driverChangeStream;
this.emit('ready');
this.driverChangeStream = driverChangeStream;
this.emit('ready');
resolve();
});
});
}

Expand All @@ -53,20 +56,23 @@ class ChangeStream extends EventEmitter {
this.bindedEvents = true;

if (this.driverChangeStream == null) {
this.once('ready', () => {
this.driverChangeStream.on('close', () => {
this.closed = true;
});
this.$driverChangeStreamPromise.then(
() => {
this.driverChangeStream.on('close', () => {
this.closed = true;
});

driverChangeStreamEvents.forEach(ev => {
this.driverChangeStream.on(ev, data => {
if (data != null && data.fullDocument != null && this.options && this.options.hydrate) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
this.emit(ev, data);
driverChangeStreamEvents.forEach(ev => {
this.driverChangeStream.on(ev, data => {
if (data != null && data.fullDocument != null && this.options && this.options.hydrate) {
data.fullDocument = this.options.model.hydrate(data.fullDocument);
}
this.emit(ev, data);
});
});
});
});
},
() => {} // No need to register events if opening change stream failed
);

return;
}
Expand Down Expand Up @@ -142,8 +148,12 @@ class ChangeStream extends EventEmitter {
this.closed = true;
if (this.driverChangeStream) {
return this.driverChangeStream.close();
} else {
return this.$driverChangeStreamPromise.then(
() => this.driverChangeStream.close(),
() => {} // No need to close if opening the change stream failed
);
}
return Promise.resolve();
}
}

Expand Down
25 changes: 25 additions & 0 deletions lib/cursor/queryCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const eachAsync = require('../helpers/cursor/eachAsync');
const helpers = require('../queryHelpers');
const kareem = require('kareem');
const immediate = require('../helpers/immediate');
const { once } = require('node:events');
const util = require('util');

/**
Expand Down Expand Up @@ -42,6 +43,7 @@ function QueryCursor(query) {
this.cursor = null;
this.skipped = false;
this.query = query;
this._closed = false;
const model = query.model;
this._mongooseOptions = {};
this._transforms = [];
Expand Down Expand Up @@ -135,6 +137,25 @@ QueryCursor.prototype._read = function() {
});
};

/**
* Returns the underlying cursor from the MongoDB Node driver that this cursor uses.
*
* @method getDriverCursor
* @memberOf QueryCursor
* @returns {Cursor} MongoDB Node driver cursor instance
* @instance
* @api public
*/

QueryCursor.prototype.getDriverCursor = async function getDriverCursor() {
if (this.cursor) {
return this.cursor;
}

await once(this, 'cursor');
return this.cursor;
};

/**
* Registers a transform function which subsequently maps documents retrieved
* via the streams interface or `.next()`
Expand Down Expand Up @@ -209,6 +230,7 @@ QueryCursor.prototype.close = async function close() {
}
try {
await this.cursor.close();
this._closed = true;
this.emit('close');
} catch (error) {
this.listeners('error').length > 0 && this.emit('error', error);
Expand Down Expand Up @@ -246,6 +268,9 @@ QueryCursor.prototype.next = async function next() {
if (typeof arguments[0] === 'function') {
throw new MongooseError('QueryCursor.prototype.next() no longer accepts a callback');
}
if (this._closed) {
throw new MongooseError('Cannot call `next()` on a closed cursor');
}
return new Promise((resolve, reject) => {
_next(this, function(error, doc) {
if (error) {
Expand Down
8 changes: 6 additions & 2 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -2115,17 +2115,21 @@ Model.countDocuments = function countDocuments(conditions, options) {
*
* @param {String} field
* @param {Object} [conditions] optional
* @param {Object} [options] optional
* @return {Query}
* @api public
*/

Model.distinct = function distinct(field, conditions) {
Model.distinct = function distinct(field, conditions, options) {
_checkContext(this, 'distinct');
if (typeof arguments[0] === 'function' || typeof arguments[1] === 'function') {
if (typeof arguments[0] === 'function' || typeof arguments[1] === 'function' || typeof arguments[2] === 'function') {
throw new MongooseError('Model.distinct() no longer accepts a callback');
}

const mq = new this.Query({}, {}, this, this.$__collection);
if (options != null) {
mq.setOptions(options);
}

return mq.distinct(field, conditions);
};
Expand Down
15 changes: 11 additions & 4 deletions lib/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -2777,7 +2777,7 @@ Query.prototype.estimatedDocumentCount = function(options) {
this.op = 'estimatedDocumentCount';
this._validateOp();

if (typeof options === 'object' && options != null) {
if (options != null) {
this.setOptions(options);
}

Expand Down Expand Up @@ -2836,7 +2836,7 @@ Query.prototype.countDocuments = function(conditions, options) {
this.merge(conditions);
}

if (typeof options === 'object' && options != null) {
if (options != null) {
this.setOptions(options);
}

Expand Down Expand Up @@ -2874,21 +2874,24 @@ Query.prototype.__distinct = async function __distinct() {
*
* #### Example:
*
* distinct(field, conditions, options)
* distinct(field, conditions)
* distinct(field)
* distinct()
*
* @param {String} [field]
* @param {Object|Query} [filter]
* @param {Object} [options]
* @return {Query} this
* @see distinct https://www.mongodb.com/docs/manual/reference/method/db.collection.distinct/
* @api public
*/

Query.prototype.distinct = function(field, conditions) {
Query.prototype.distinct = function(field, conditions, options) {
if (typeof field === 'function' ||
typeof conditions === 'function' ||
typeof arguments[2] === 'function') {
typeof options === 'function' ||
typeof arguments[3] === 'function') {
throw new MongooseError('Query.prototype.distinct() no longer accepts a callback');
}

Expand All @@ -2907,6 +2910,10 @@ Query.prototype.distinct = function(field, conditions) {
this._distinct = field;
}

if (options != null) {
this.setOptions(options);
}

return this;
};

Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"dependencies": {
"bson": "^6.7.0",
"kareem": "2.6.3",
"mongodb": "6.7.0",
"mongodb": "6.8.0",
"mpath": "0.9.0",
"mquery": "5.0.0",
"ms": "2.1.3",
Expand Down
2 changes: 2 additions & 0 deletions test/connection.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,8 @@ describe('connections:', function() {
await nextChange;
assert.equal(changes.length, 1);
assert.equal(changes[0].operationType, 'insert');

await changeStream.close();
await conn.close();
});

Expand Down
19 changes: 19 additions & 0 deletions test/docs/transactions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,25 @@ describe('transactions', function() {
assert.deepEqual(fromDb, { name: 'Tyrion Lannister' });
});

it('distinct (gh-8006)', async function() {
const Character = db.model('gh8006_Character', new Schema({ name: String, rank: String }, { versionKey: false }));

const session = await db.startSession();

session.startTransaction();
await Character.create([{ name: 'Will Riker', rank: 'Commander' }, { name: 'Jean-Luc Picard', rank: 'Captain' }], { session });

let names = await Character.distinct('name', {}, { session });
assert.deepStrictEqual(names.sort(), ['Jean-Luc Picard', 'Will Riker']);

names = await Character.distinct('name', { rank: 'Captain' }, { session });
assert.deepStrictEqual(names.sort(), ['Jean-Luc Picard']);

// Undo both update and delete since doc should pull from `$session()`
await session.abortTransaction();
session.endSession();
});

it('save() with no changes (gh-8571)', async function() {
db.deleteModel(/Test/);
const Test = db.model('Test', Schema({ name: String }));
Expand Down
48 changes: 41 additions & 7 deletions test/model.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const sinon = require('sinon');
const start = require('./common');

const assert = require('assert');
const { once } = require('events');
const random = require('./util').random;
const util = require('./util');

Expand Down Expand Up @@ -3508,6 +3509,9 @@ describe('Model', function() {
}
changeStream.removeListener('change', listener);
listener = null;
// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});
changeStream.close();
changeStream = null;
});
Expand Down Expand Up @@ -3560,14 +3564,21 @@ describe('Model', function() {
it('fullDocument (gh-11936)', async function() {
const MyModel = db.model('Test', new Schema({ name: String }));

const doc = await MyModel.create({ name: 'Ned Stark' });
const changeStream = await MyModel.watch([], {
fullDocument: 'updateLookup',
hydrate: true
});
await changeStream.$driverChangeStreamPromise;

const doc = await MyModel.create({ name: 'Ned Stark' });

const p = changeStream.next();
const p = new Promise((resolve) => {
changeStream.once('change', change => {
resolve(change);
});
});
// Need to wait for resume token to be set after the event listener,
// otherwise change stream might not pick up the update.
await once(changeStream.driverChangeStream, 'resumeTokenChanged');
await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' });

const changeData = await p;
Expand All @@ -3576,22 +3587,31 @@ describe('Model', function() {
doc._id.toHexString());
assert.ok(changeData.fullDocument.$__);
assert.equal(changeData.fullDocument.get('name'), 'Tony Stark');

await changeStream.close();
});

it('fullDocument with immediate watcher and hydrate (gh-14049)', async function() {
const MyModel = db.model('Test', new Schema({ name: String }));

const doc = await MyModel.create({ name: 'Ned Stark' });

let changeStream = null;
const p = new Promise((resolve) => {
MyModel.watch([], {
changeStream = MyModel.watch([], {
fullDocument: 'updateLookup',
hydrate: true
}).on('change', change => {
});

changeStream.on('change', change => {
resolve(change);
});
});

// Need to wait for cursor to be initialized and for resume token to
// be set, otherwise change stream might not pick up the update.
await changeStream.$driverChangeStreamPromise;
await once(changeStream.driverChangeStream, 'resumeTokenChanged');
await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' });

const changeData = await p;
Expand All @@ -3600,6 +3620,8 @@ describe('Model', function() {
doc._id.toHexString());
assert.ok(changeData.fullDocument.$__);
assert.equal(changeData.fullDocument.get('name'), 'Tony Stark');

await changeStream.close();
});

it('respects discriminators (gh-11007)', async function() {
Expand Down Expand Up @@ -3639,6 +3661,9 @@ describe('Model', function() {
assert.equal(changeData.operationType, 'insert');
assert.equal(changeData.fullDocument.name, 'Ned Stark');

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});
await changeStream.close();
await db.close();
});
Expand All @@ -3654,11 +3679,16 @@ describe('Model', function() {
setTimeout(resolve, 500, false);
});

changeStream.close();
await db;
// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});

const close = changeStream.close();
await db.asPromise();
const readyCalled = await ready;
assert.strictEqual(readyCalled, false);

await close;
await db.close();
});

Expand All @@ -3675,6 +3705,10 @@ describe('Model', function() {

await MyModel.create({ name: 'Hodor' });

// Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream
// may still poll after close.
changeStream.on('error', () => {});

changeStream.close();
const closedData = await closed;
assert.strictEqual(closedData, true);
Expand Down
Loading

0 comments on commit 473a636

Please sign in to comment.