From 5ecf18e01c080e8c6c8c82782a675f853f3b6276 Mon Sep 17 00:00:00 2001 From: Eric Adum Date: Mon, 18 May 2020 09:31:01 -0400 Subject: [PATCH] fix(ChangeStream): should resume from errors when iterating Introduced `getCursor` method to safely provide a change stream cursor for `next`/`hasNext` across recoveries from resumable errors. NODE-2548 --- lib/change_stream.js | 205 +++++++++++++++++--------- test/functional/change_stream.test.js | 172 ++++++++++++++++----- test/functional/shared.js | 22 +++ 3 files changed, 291 insertions(+), 108 deletions(-) diff --git a/lib/change_stream.js b/lib/change_stream.js index 635d009ec2b..900da1edad1 100644 --- a/lib/change_stream.js +++ b/lib/change_stream.js @@ -1,5 +1,6 @@ 'use strict'; +const Denque = require('denque'); const EventEmitter = require('events'); const isResumableError = require('./error').isResumableError; const MongoError = require('./core').MongoError; @@ -9,6 +10,8 @@ const maxWireVersion = require('./core/utils').maxWireVersion; const maybePromise = require('./utils').maybePromise; const AggregateOperation = require('./operations/aggregate'); +const kResumeQueue = Symbol('resumeQueue'); + const CHANGE_STREAM_OPTIONS = ['resumeAfter', 'startAfter', 'startAtOperationTime', 'fullDocument']; const CURSOR_OPTIONS = ['batchSize', 'maxAwaitTimeMS', 'collation', 'readPreference'].concat( CHANGE_STREAM_OPTIONS @@ -91,6 +94,8 @@ class ChangeStream extends EventEmitter { this.options.readPreference = parent.s.readPreference; } + this[kResumeQueue] = new Denque(); + // Create contained Change Stream cursor this.cursor = createChangeStreamCursor(this, options); @@ -99,9 +104,7 @@ class ChangeStream extends EventEmitter { // Listen for any `change` listeners being added to ChangeStream this.on('newListener', eventName => { if (eventName === 'change' && this.cursor && this.listenerCount('change') === 0) { - this.cursor.on('data', change => - processNewChange({ changeStream: this, change, eventEmitter: true }) - ); + this.cursor.on('data', change => processNewChange(this, change)); } }); @@ -130,7 +133,12 @@ class ChangeStream extends EventEmitter { * @returns {Promise|void} returns Promise if no callback passed */ hasNext(callback) { - return maybePromise(this.parent, callback, cb => this.cursor.hasNext(cb)); + return maybePromise(this.parent, callback, cb => { + getCursor(this, (err, cursor) => { + if (err) return cb(err); // failed to resume, raise an error + cursor.hasNext(cb); + }); + }); } /** @@ -142,18 +150,24 @@ class ChangeStream extends EventEmitter { */ next(callback) { return maybePromise(this.parent, callback, cb => { - if (this.isClosed()) { - return cb(new MongoError('ChangeStream is closed')); - } - this.cursor.next((error, change) => { - processNewChange({ changeStream: this, error, change, callback: cb }); + getCursor(this, (err, cursor) => { + if (err) return cb(err); // failed to resume, raise an error + cursor.next((error, change) => { + if (error) { + this[kResumeQueue].push(() => this.next(cb)); + processError(this, error, cb); + return; + } + processNewChange(this, change, cb); + }); }); }); } /** - * Is the cursor closed + * Is the change stream closed * @method ChangeStream.prototype.isClosed + * @param {boolean} [checkCursor=true] also check if the underlying cursor is closed * @return {boolean} */ isClosed() { @@ -173,6 +187,8 @@ class ChangeStream extends EventEmitter { // flag the change stream as explicitly closed this.closed = true; + if (!this.cursor) return cb(); + // Tidy up the existing cursor const cursor = this.cursor; @@ -383,7 +399,7 @@ function createChangeStreamCursor(self, options) { */ if (self.listenerCount('change') > 0) { changeStreamCursor.on('data', function(change) { - processNewChange({ changeStream: self, change, eventEmitter: true }); + processNewChange(self, change); }); } @@ -415,7 +431,7 @@ function createChangeStreamCursor(self, options) { * @type {Error} */ changeStreamCursor.on('error', function(error) { - processNewChange({ changeStream: self, error, eventEmitter: true }); + processError(self, error); }); if (self.pipeDestinations) { @@ -456,73 +472,20 @@ function waitForTopologyConnected(topology, options, callback) { }, 500); // this is an arbitrary wait time to allow SDAM to transition } -// Handle new change events. This method brings together the routes from the callback, event emitter, and promise ways of using ChangeStream. -function processNewChange(args) { - const changeStream = args.changeStream; - const error = args.error; - const change = args.change; - const callback = args.callback; - const eventEmitter = args.eventEmitter || false; +function processNewChange(changeStream, change, callback) { const cursor = changeStream.cursor; - // If the cursor is null or the change stream has been closed explictly, do not process a change. - if (cursor == null || changeStream.closed) { - // We do not error in the eventEmitter case. - changeStream.closed = true; - if (eventEmitter) { - return; - } - callback(new MongoError('ChangeStream is closed')); + if (changeStream.closed) { + if (callback) callback(new MongoError('ChangeStream is closed')); return; } - const topology = changeStream.topology; - const options = changeStream.cursor.options; - const wireVersion = maxWireVersion(cursor.server); - - if (error) { - if (isResumableError(error, wireVersion) && !changeStream.attemptingResume) { - changeStream.attemptingResume = true; - - // stop listening to all events from old cursor - ['data', 'close', 'end', 'error'].forEach(event => - changeStream.cursor.removeAllListeners(event) - ); - - // close internal cursor, ignore errors - changeStream.cursor.close(); - - waitForTopologyConnected(topology, { readPreference: options.readPreference }, err => { - if (err) { - // if there's an error reconnecting, close the change stream - changeStream.closed = true; - if (eventEmitter) { - changeStream.emit('error', err); - changeStream.emit('close'); - return; - } - return callback(err); - } - - changeStream.cursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); - if (eventEmitter) return; - changeStream.next(callback); - }); - return; - } - - if (eventEmitter) return changeStream.emit('error', error); - return callback(error); - } - - changeStream.attemptingResume = false; - if (change && !change._id) { const noResumeTokenError = new Error( 'A change stream document has been received that lacks a resume token (_id).' ); - if (eventEmitter) return changeStream.emit('error', noResumeTokenError); + if (!callback) return changeStream.emit('error', noResumeTokenError); return callback(noResumeTokenError); } @@ -534,8 +497,108 @@ function processNewChange(args) { changeStream.options.startAtOperationTime = undefined; // Return the change - if (eventEmitter) return changeStream.emit('change', change); - return callback(error, change); + if (!callback) return changeStream.emit('change', change); + return callback(undefined, change); +} + +function processError(changeStream, error, callback) { + const topology = changeStream.topology; + const cursor = changeStream.cursor; + + // If the change stream has been closed explictly, do not process error. + if (changeStream.closed) { + if (callback) callback(new MongoError('ChangeStream is closed')); + return; + } + + // if the resume succeeds, continue with the new cursor + function resumeWithCursor(newCursor) { + changeStream.cursor = newCursor; + processResumeQueue(changeStream); + } + + // otherwise, raise an error and close the change stream + function unresumableError(err) { + if (!callback) { + changeStream.emit('error', err); + changeStream.emit('close'); + } + processResumeQueue(changeStream, err); + changeStream.closed = true; + } + + if (cursor && isResumableError(error, maxWireVersion(cursor.server))) { + changeStream.cursor = undefined; + + // stop listening to all events from old cursor + ['data', 'close', 'end', 'error'].forEach(event => cursor.removeAllListeners(event)); + + // close internal cursor, ignore errors + cursor.close(); + + waitForTopologyConnected(topology, { readPreference: cursor.options.readPreference }, err => { + // if the topology can't reconnect, close the stream + if (err) return unresumableError(err); + + // create a new cursor, preserving the old cursor's options + const newCursor = createChangeStreamCursor(changeStream, cursor.resumeOptions); + + // attempt to continue in emitter mode + if (!callback) return resumeWithCursor(newCursor); + + // attempt to continue in iterator mode + newCursor.hasNext(err => { + // if there's an error immediately after resuming, close the stream + if (err) return unresumableError(err); + resumeWithCursor(newCursor); + }); + }); + return; + } + + if (!callback) return changeStream.emit('error', error); + return callback(error); +} + +/** + * Safely provides a cursor across resume attempts + * + * @param {ChangeStream} changeStream the parent ChangeStream + * @param {function} callback gets the cursor or error + * @param {ChangeStreamCursor} [oldCursor] when resuming from an error, carry over options from previous cursor + */ +function getCursor(changeStream, callback) { + if (changeStream.isClosed()) { + callback(new MongoError('ChangeStream is closed.')); + return; + } + + // if a cursor exists and it is open, return it + if (changeStream.cursor) { + callback(undefined, changeStream.cursor); + return; + } + + // no cursor, queue callback until topology reconnects + changeStream[kResumeQueue].push(callback); +} + +/** + * Drain the resume queue when a new has become available + * + * @param {ChangeStream} changeStream the parent ChangeStream + * @param {ChangeStreamCursor?} changeStream.cursor the new cursor + * @param {Error} [err] error getting a new cursor + */ +function processResumeQueue(changeStream, err) { + while (changeStream[kResumeQueue].length) { + const request = changeStream[kResumeQueue].pop(); + if (changeStream.isClosed() && !err) { + request(new MongoError('Change Stream is not open.')); + return; + } + request(err, changeStream.cursor); + } } /** diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index c31f8e7197f..8c34ce99735 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -5,6 +5,7 @@ const MongoError = require('../../lib/core').MongoError; var MongoNetworkError = require('../../lib/core').MongoNetworkError; var setupDatabase = require('./shared').setupDatabase; var withClient = require('./shared').withClient; +var withCursor = require('./shared').withCursor; var delay = require('./shared').delay; var co = require('co'); var mock = require('mongodb-mock-server'); @@ -14,21 +15,61 @@ const sinon = require('sinon'); chai.use(require('chai-subset')); +function withChangeStream(dbName, collectionName, callback) { + if (arguments.length === 1) { + callback = dbName; + dbName = undefined; + } else if (arguments.length === 2) { + callback = collectionName; + collectionName = dbName; + dbName = undefined; + } + dbName = dbName || 'changestream_integration_test'; + collectionName = collectionName || 'test'; + + return withClient((client, done) => { + const db = client.db(dbName); + db.createCollection(collectionName, { w: 'majority' }, (err, collection) => { + if (err) return done(err); + withCursor( + collection.watch(), + (cursor, done) => callback(collection, cursor, done), + err => collection.drop(dropErr => done(err || dropErr)) + ); + }); + }); +} + /** * Triggers a fake resumable error on a change stream * * @param {ChangeStream} changeStream - * @param {function} onCursorClosed callback when cursor closed due this error + * @param {number} [delay] optional delay before triggering error + * @param {Function} onClose callback when cursor closed due this error */ -function triggerResumableError(changeStream, onCursorClosed) { - const closeCursor = changeStream.cursor.close; - changeStream.cursor.close = callback => { - onCursorClosed(); - changeStream.cursor.close = closeCursor; - changeStream.cursor.close(callback); - }; - const fakeResumableError = new MongoNetworkError('fake error'); - changeStream.cursor.emit('error', fakeResumableError); +function triggerResumableError(changeStream, delay, onClose) { + if (arguments.length === 2) { + onClose = delay; + delay = undefined; + } + + const stub = sinon.stub(changeStream.cursor, 'close'); + stub.callsFake(function() { + stub.wrappedMethod.call(this); + stub.restore(); + onClose(); + }); + + function triggerError() { + changeStream.cursor.emit('error', new MongoNetworkError('fake error')); + } + + if (delay != null) { + setTimeout(triggerError, delay); + return; + } + + triggerError(); } /** @@ -88,7 +129,8 @@ function tryNext(changeStream, callback) { * empty batch into a returned array of events. * * @param {ChangeStream} changeStream - * @param {function} callback + * @param {Function|Array} bag + * @param {Function} [callback] */ function exhaust(changeStream, bag, callback) { if (typeof bag === 'function') { @@ -2632,32 +2674,20 @@ describe('Change Streams', function() { }); describe('tryNext', function() { - function withTemporaryCollectionOnDb(database, testFn) { - return withClient((client, done) => { - const db = client.db(database); - db.createCollection('test', { w: 'majority' }, (err, collection) => { - if (err) return done(err); - testFn(collection, () => db.dropDatabase(done)); - }); - }); - } it('should return null on single iteration of empty cursor', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withTemporaryCollectionOnDb('testTryNext', (collection, done) => { - const changeStream = collection.watch(); + test: withChangeStream((collection, changeStream, done) => { tryNext(changeStream, (err, doc) => { expect(err).to.not.exist; expect(doc).to.not.exist; - - changeStream.close(done); + done(); }); }) }); it('should iterate a change stream until first empty batch', { metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, - test: withTemporaryCollectionOnDb('testTryNext', (collection, done) => { - const changeStream = collection.watch(); + test: withChangeStream((collection, changeStream, done) => { waitForStarted(changeStream, () => { collection.insertOne({ a: 42 }, err => { expect(err).to.not.exist; @@ -2680,7 +2710,7 @@ describe('Change Streams', function() { expect(err).to.not.exist; expect(doc).to.not.exist; - changeStream.close(done); + done(); }); }); }); @@ -2694,9 +2724,8 @@ describe('Change Streams', function() { let startAfter; function recordEvent(events, e) { - if (e.commandName === 'aggregate') { - events.push({ $changeStream: e.command.pipeline[0].$changeStream }); - } + if (e.commandName !== 'aggregate') return; + events.push({ $changeStream: e.command.pipeline[0].$changeStream }); } beforeEach(function(done) { @@ -2770,7 +2799,7 @@ describe('Change Streams', function() { // - MUST include a startAfter option // - MUST NOT include a resumeAfter option // when resuming a change stream. - it('$changeStream that has not received results must include startAfter and not resumeAfter', { + it('$changeStream without results must include startAfter and not resumeAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { const events = []; @@ -2791,11 +2820,9 @@ describe('Change Streams', function() { }); waitForStarted(changeStream, () => { - triggerResumableError(changeStream, () => { - events.push('error'); - coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { - expect(err).to.not.exist; - }); + triggerResumableError(changeStream, () => events.push('error')); + coll.insertOne({ x: 2 }, { w: 'majority', j: true }, err => { + expect(err).to.not.exist; }); }); } @@ -2806,7 +2833,7 @@ describe('Change Streams', function() { // - MUST include a resumeAfter option // - MUST NOT include a startAfter option // when resuming a change stream. - it('$changeStream that has received results must include resumeAfter and not startAfter', { + it('$changeStream with results must include resumeAfter and not startAfter', { metadata: { requires: { topology: 'replicaset', mongodb: '>=4.1.1' } }, test: function(done) { let events = []; @@ -2844,3 +2871,74 @@ describe('Change Streams', function() { }); }); }); + +describe('Change Stream Resume Error Tests', function() { + it('should continue emitting change events after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: withChangeStream((collection, changeStream, done) => { + const docs = []; + changeStream.on('change', change => { + expect(change).to.exist; + docs.push(change); + if (docs.length === 2) { + expect(docs[0]).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + expect(docs[1]).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); + } + }); + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, 1000, () => { + collection.insertOne({ b: 24 }, err => { + expect(err).to.not.exist; + }); + }); + }); + }); + }) + }); + + it('should continue iterating changes after a resumable error', { + metadata: { requires: { topology: 'replicaset', mongodb: '>=3.6' } }, + test: withChangeStream((collection, changeStream, done) => { + waitForStarted(changeStream, () => { + collection.insertOne({ a: 42 }, err => { + expect(err).to.not.exist; + triggerResumableError(changeStream, 250, () => { + changeStream.hasNext((err1, hasNext) => { + expect(err1).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { b: 24 } + }); + done(); + }); + }); + collection.insertOne({ b: 24 }); + }); + }); + }); + changeStream.hasNext((err, hasNext) => { + expect(err).to.not.exist; + expect(hasNext).to.be.true; + changeStream.next((err, change) => { + expect(err).to.not.exist; + expect(change).to.containSubset({ + operationType: 'insert', + fullDocument: { a: 42 } + }); + }); + }); + }) + }); +}); diff --git a/test/functional/shared.js b/test/functional/shared.js index f2c34c58c89..b372b7275de 100644 --- a/test/functional/shared.js +++ b/test/functional/shared.js @@ -189,6 +189,27 @@ function withMonitoredClient(commands, options, callback) { }; } +/** + * Safely perform a test with an arbitrary cursor. + * + * @param {Function} cursor any cursor that needs to be closed + * @param {(cursor: Object, done: Function) => void} body test body + * @param {Function} done called after cleanup + */ +function withCursor(cursor, body, done) { + let clean = false; + function cleanup(testErr) { + if (clean) return; + clean = true; + return cursor.close(closeErr => done(testErr || closeErr)); + } + try { + body(cursor, cleanup); + } catch (err) { + cleanup(err); + } +} + /** * A class for listening on specific events * @@ -265,5 +286,6 @@ module.exports = { setupDatabase, withClient, withMonitoredClient, + withCursor, EventCollector };