Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
refactor(cluster-time): track clusterTime on session if necessary
Browse files Browse the repository at this point in the history
NODE-1088
  • Loading branch information
mbroadst committed Oct 6, 2017
1 parent 5da75e4 commit c372537
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
22 changes: 18 additions & 4 deletions lib/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ var inherits = require('util').inherits,
opcodes = require('../wireprotocol/shared').opcodes,
compress = require('../wireprotocol/compression').compress,
compressorIDs = require('../wireprotocol/compression').compressorIDs,
uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands;
uncompressibleCommands = require('../wireprotocol/compression').uncompressibleCommands,
resolveClusterTime = require('../topologies/shared').resolveClusterTime;

var MongoCR = require('../auth/mongocr'),
X509 = require('../auth/x509'),
Expand Down Expand Up @@ -546,7 +547,12 @@ function messageHandler(self) {

// Look for clusterTime, and update it if necessary
if (message.documents[0] && message.documents[0].hasOwnProperty('$clusterTime')) {
self.topology.clusterTime = message.documents[0].$clusterTime;
const $clusterTime = message.documents[0].$clusterTime;
self.topology.clusterTime = $clusterTime;

if (workItem.session != null) {
resolveClusterTime(workItem.session, $clusterTime);
}
}

// Establish if we have an error
Expand Down Expand Up @@ -1124,6 +1130,7 @@ Pool.prototype.write = function(commands, options, cb) {
operation.command = typeof options.command === 'boolean' ? options.command : false;
operation.fullResult = typeof options.fullResult === 'boolean' ? options.fullResult : false;
operation.noResponse = typeof options.noResponse === 'boolean' ? options.noResponse : false;
operation.session = options.session || null;

// Optional per operation socketTimeout
operation.socketTimeout = options.socketTimeout;
Expand All @@ -1142,11 +1149,18 @@ Pool.prototype.write = function(commands, options, cb) {
operation.requestId = commands[commands.length - 1].requestId;

if (supportsClusterTime(this.topology) && this.topology.clusterTime) {
let $clusterTime = this.topology.clusterTime;
if (operation.session && operation.session.clusterTime) {
$clusterTime = operation.session.clusterTime.clusterTime.greaterThan($clusterTime.clusterTime)
? operation.session.clusterTime
: $clusterTime;
}

commands.forEach(command => {
if (command instanceof Query) {
command.query.$clusterTime = this.topology.clusterTime;
command.query.$clusterTime = $clusterTime;
} else {
command.$clusterTime = this.topology.clusterTime;
command.$clusterTime = $clusterTime;
}
});
}
Expand Down
88 changes: 87 additions & 1 deletion test/tests/unit/single/sessions_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ var Server = require('../../../../lib/topologies/server'),
expect = require('chai').expect,
assign = require('../../../../lib/utils').assign,
mock = require('../../../mock'),
genClusterTime = require('../common').genClusterTime;
genClusterTime = require('../common').genClusterTime,
ClientSession = require('../../../../lib/sessions').ClientSession;

const test = {};
describe('Sessions (Single)', function() {
Expand Down Expand Up @@ -83,6 +84,54 @@ describe('Sessions (Single)', function() {
}
});

it('should track the highest `$clusterTime` seen, and store it on a session if available', {
metadata: { requires: { topology: 'single' } },
test: function(done) {
const clusterTime = genClusterTime(Date.now()),
futureClusterTime = genClusterTime(Date.now() + 10 * 60 * 1000);

test.server.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(
assign({}, mock.DEFAULT_ISMASTER, {
$clusterTime: clusterTime
})
);
} else if (doc.insert) {
request.reply({
ok: 1,
n: [],
lastOp: new Date(),
$clusterTime: futureClusterTime
});
}
});

const client = new Server(test.server.address());
const session = new ClientSession(client);

client.on('error', done);
client.once('connect', () => {
expect(client.clusterTime).to.exist;
expect(client.clusterTime).to.eql(clusterTime);

client.insert('test.test', [{ created: new Date() }], { session: session }, function(err) {
expect(err).to.not.exist;
expect(client.clusterTime).to.exist;
expect(client.clusterTime).to.not.eql(clusterTime);
expect(client.clusterTime).to.eql(futureClusterTime);
expect(session.clusterTime).to.eql(futureClusterTime);

client.destroy();
done();
});
});

client.connect();
}
});

it('should send `clusterTime` on outgoing messages', {
metadata: { requires: { topology: 'single' } },
test: function(done) {
Expand Down Expand Up @@ -117,6 +166,43 @@ describe('Sessions (Single)', function() {
}
});

it('should send the highest `clusterTime` between topology and session if it exists', {
metadata: { requires: { topology: 'single' } },
test: function(done) {
const clusterTime = genClusterTime(Date.now()),
futureClusterTime = genClusterTime(Date.now() + 10 * 60 * 1000);

let sentIsMaster = false;
test.server.setMessageHandler(request => {
if (sentIsMaster) {
expect(request.document.$clusterTime).to.eql(futureClusterTime);
request.reply({ ok: 1 });
return;
}

sentIsMaster = true;
request.reply(
assign({}, mock.DEFAULT_ISMASTER, {
maxWireVersion: 6,
$clusterTime: clusterTime
})
);
});

const client = new Server(test.server.address());
const session = new ClientSession(client, { initialClusterTime: futureClusterTime });
client.on('error', done);
client.once('connect', () => {
client.command('admin.$cmd', { ping: 1 }, { session: session }, err => {
expect(err).to.not.exist;
done();
});
});

client.connect();
}
});

it('should default `logicalSessionTimeoutMinutes` to `null`', {
metadata: { requires: { topology: 'single' } },
test: function() {
Expand Down

0 comments on commit c372537

Please sign in to comment.