Skip to content

Commit

Permalink
fix(ChangeStream): handle null changes
Browse files Browse the repository at this point in the history
  • Loading branch information
emadum authored Jul 7, 2020
1 parent 99681e1 commit 306b5b3
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 41 deletions.
9 changes: 7 additions & 2 deletions lib/change_stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,8 @@ class ChangeStreamCursor extends Cursor {

_initializeCursor(callback) {
super._initializeCursor((err, result) => {
if (err) {
callback(err);
if (err || result == null) {
callback(err, result);
return;
}

Expand Down Expand Up @@ -483,6 +483,11 @@ function waitForTopologyConnected(topology, options, callback) {
function processNewChange(changeStream, change, callback) {
const cursor = changeStream.cursor;

// a null change means the cursor has been notified, implicitly closing the change stream
if (change == null) {
changeStream.closed = true;
}

if (changeStream.closed) {
if (callback) callback(new MongoError('ChangeStream is closed'));
return;
Expand Down
69 changes: 30 additions & 39 deletions lib/core/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,50 +464,41 @@ class CoreCursor extends Readable {
}

const result = r.message;
if (result.queryFailure) {
return done(new MongoError(result.documents[0]), null);
}

// Check if we have a command cursor
if (
Array.isArray(result.documents) &&
result.documents.length === 1 &&
(!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) &&
(typeof result.documents[0].cursor !== 'string' ||
result.documents[0]['$err'] ||
result.documents[0]['errmsg'] ||
Array.isArray(result.documents[0].result))
) {
// We have an error document, return the error
if (result.documents[0]['$err'] || result.documents[0]['errmsg']) {
return done(new MongoError(result.documents[0]), null);
if (Array.isArray(result.documents) && result.documents.length === 1) {
const document = result.documents[0];

if (result.queryFailure) {
return done(new MongoError(document), null);
}

// We have a cursor document
if (result.documents[0].cursor != null && typeof result.documents[0].cursor !== 'string') {
const id = result.documents[0].cursor.id;
// If we have a namespace change set the new namespace for getmores
if (result.documents[0].cursor.ns) {
cursor.ns = result.documents[0].cursor.ns;
// Check if we have a command cursor
if (!cursor.cmd.find || (cursor.cmd.find && cursor.cmd.virtual === false)) {
// We have an error document, return the error
if (document.$err || document.errmsg) {
return done(new MongoError(document), null);
}
// Promote id to long if needed
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
cursor.cursorState.operationTime = result.documents[0].operationTime;

// If we have a firstBatch set it
if (Array.isArray(result.documents[0].cursor.firstBatch)) {
cursor.cursorState.documents = result.documents[0].cursor.firstBatch; //.reverse();
}

// Return after processing command cursor
return done(null, result);
}

if (Array.isArray(result.documents[0].result)) {
cursor.cursorState.documents = result.documents[0].result;
cursor.cursorState.cursorId = Long.ZERO;
return done(null, result);
// We have a cursor document
if (document.cursor != null && typeof document.cursor !== 'string') {
const id = document.cursor.id;
// If we have a namespace change set the new namespace for getmores
if (document.cursor.ns) {
cursor.ns = document.cursor.ns;
}
// Promote id to long if needed
cursor.cursorState.cursorId = typeof id === 'number' ? Long.fromNumber(id) : id;
cursor.cursorState.lastCursorId = cursor.cursorState.cursorId;
cursor.cursorState.operationTime = document.operationTime;

// If we have a firstBatch set it
if (Array.isArray(document.cursor.firstBatch)) {
cursor.cursorState.documents = document.cursor.firstBatch; //.reverse();
}

// Return after processing command cursor
return done(null, result);
}
}
}

Expand Down
48 changes: 48 additions & 0 deletions test/functional/change_stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const expect = chai.expect;
const sinon = require('sinon');
const fs = require('fs');
const crypto = require('crypto');
const BSON = require('bson');
const Long = BSON.Long;

chai.use(require('chai-subset'));

Expand Down Expand Up @@ -2830,3 +2832,49 @@ describe('Change Stream Resume Error Tests', function() {
})
});
});
context('NODE-2626', function() {
let mockServer;
afterEach(() => mock.cleanup());
beforeEach(() => mock.createServer().then(server => (mockServer = server)));
it('changeStream should close if cursor id for initial aggregate is Long.ZERO', function(done) {
mockServer.setMessageHandler(req => {
const doc = req.document;
if (doc.ismaster) {
return req.reply(mock.DEFAULT_ISMASTER_36);
}
if (doc.aggregate) {
return req.reply({
ok: 1,
cursor: {
id: Long.ZERO,
firstBatch: []
}
});
}
if (doc.getMore) {
return req.reply({
ok: 1,
cursor: {
id: new Long(1407, 1407),
nextBatch: []
}
});
}
req.reply({ ok: 1 });
});
const client = this.configuration.newClient(`mongodb://${mockServer.uri()}/`, {
useUnifiedTopology: true
});
client.connect(err => {
expect(err).to.not.exist;
const collection = client.db('cs').collection('test');
const changeStream = collection.watch();
changeStream.next((err, doc) => {
expect(err).to.exist;
expect(doc).to.not.exist;
expect(err.message).to.equal('ChangeStream is closed');
changeStream.close(() => client.close(done));
});
});
});
});

0 comments on commit 306b5b3

Please sign in to comment.