diff --git a/lib/connection/pool.js b/lib/connection/pool.js index 5b270465c..b6fb558a0 100644 --- a/lib/connection/pool.js +++ b/lib/connection/pool.js @@ -14,7 +14,8 @@ var inherits = require('util').inherits, opcodes = require('../wireprotocol/shared').opcodes, compress = require('../wireprotocol/compression').compress, compressorIDs = require('../wireprotocol/compression').compressorIDs, - uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands; + uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands, + resolveClusterTime = require('../topologies/shared').resolveClusterTime; var MongoCR = require('../auth/mongocr'), X509 = require('../auth/x509'), @@ -546,7 +547,12 @@ function messageHandler(self) { // Look for clusterTime, and update it if necessary if (message.documents[0] && message.documents[0].hasOwnProperty('$clusterTime')) { - self.topology.clusterTime = message.documents[0].$clusterTime; + const $clusterTime = message.documents[0].$clusterTime; + self.topology.clusterTime = $clusterTime; + + if (workItem.session != null) { + resolveClusterTime(workItem.session, $clusterTime); + } } // Establish if we have an error @@ -1124,6 +1130,7 @@ Pool.prototype.write = function(commands, options, cb) { operation.command = typeof options.command === 'boolean' ? options.command : false; operation.fullResult = typeof options.fullResult === 'boolean' ? options.fullResult : false; operation.noResponse = typeof options.noResponse === 'boolean' ? options.noResponse : false; + operation.session = options.session || null; // Optional per operation socketTimeout operation.socketTimeout = options.socketTimeout; @@ -1142,11 +1149,18 @@ Pool.prototype.write = function(commands, options, cb) { operation.requestId = commands[commands.length - 1].requestId; if (supportsClusterTime(this.topology) && this.topology.clusterTime) { + let $clusterTime = this.topology.clusterTime; + if (operation.session && operation.session.clusterTime) { + $clusterTime = operation.session.clusterTime.clusterTime.greaterThan($clusterTime.clusterTime) + ? operation.session.clusterTime + : $clusterTime; + } + commands.forEach(command => { if (command instanceof Query) { - command.query.$clusterTime = this.topology.clusterTime; + command.query.$clusterTime = $clusterTime; } else { - command.$clusterTime = this.topology.clusterTime; + command.$clusterTime = $clusterTime; } }); } diff --git a/test/tests/unit/single/sessions_tests.js b/test/tests/unit/single/sessions_tests.js index 583a5bf86..3b9df5dca 100644 --- a/test/tests/unit/single/sessions_tests.js +++ b/test/tests/unit/single/sessions_tests.js @@ -3,7 +3,8 @@ var Server = require('../../../../lib/topologies/server'), expect = require('chai').expect, assign = require('../../../../lib/utils').assign, mock = require('../../../mock'), - genClusterTime = require('../common').genClusterTime; + genClusterTime = require('../common').genClusterTime, + ClientSession = require('../../../../lib/sessions').ClientSession; const test = {}; describe('Sessions (Single)', function() { @@ -83,6 +84,54 @@ describe('Sessions (Single)', function() { } }); + it('should track the highest `$clusterTime` seen, and store it on a session if available', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const clusterTime = genClusterTime(Date.now()), + futureClusterTime = genClusterTime(Date.now() + 10 * 60 * 1000); + + test.server.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply( + assign({}, mock.DEFAULT_ISMASTER, { + $clusterTime: clusterTime + }) + ); + } else if (doc.insert) { + request.reply({ + ok: 1, + n: [], + lastOp: new Date(), + $clusterTime: futureClusterTime + }); + } + }); + + const client = new Server(test.server.address()); + const session = new ClientSession(client); + + client.on('error', done); + client.once('connect', () => { + expect(client.clusterTime).to.exist; + expect(client.clusterTime).to.eql(clusterTime); + + client.insert('test.test', [{ created: new Date() }], { session: session }, function(err) { + expect(err).to.not.exist; + expect(client.clusterTime).to.exist; + expect(client.clusterTime).to.not.eql(clusterTime); + expect(client.clusterTime).to.eql(futureClusterTime); + expect(session.clusterTime).to.eql(futureClusterTime); + + client.destroy(); + done(); + }); + }); + + client.connect(); + } + }); + it('should send `clusterTime` on outgoing messages', { metadata: { requires: { topology: 'single' } }, test: function(done) { @@ -117,6 +166,43 @@ describe('Sessions (Single)', function() { } }); + it('should send the highest `clusterTime` between topology and session if it exists', { + metadata: { requires: { topology: 'single' } }, + test: function(done) { + const clusterTime = genClusterTime(Date.now()), + futureClusterTime = genClusterTime(Date.now() + 10 * 60 * 1000); + + let sentIsMaster = false; + test.server.setMessageHandler(request => { + if (sentIsMaster) { + expect(request.document.$clusterTime).to.eql(futureClusterTime); + request.reply({ ok: 1 }); + return; + } + + sentIsMaster = true; + request.reply( + assign({}, mock.DEFAULT_ISMASTER, { + maxWireVersion: 6, + $clusterTime: clusterTime + }) + ); + }); + + const client = new Server(test.server.address()); + const session = new ClientSession(client, { initialClusterTime: futureClusterTime }); + client.on('error', done); + client.once('connect', () => { + client.command('admin.$cmd', { ping: 1 }, { session: session }, err => { + expect(err).to.not.exist; + done(); + }); + }); + + client.connect(); + } + }); + it('should default `logicalSessionTimeoutMinutes` to `null`', { metadata: { requires: { topology: 'single' } }, test: function() {