Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
refactor(sessions): ensure lsid is sent on all commands
Browse files Browse the repository at this point in the history
NODE-1088
  • Loading branch information
mbroadst committed Oct 6, 2017
1 parent c372537 commit f15bab6
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 15 deletions.
30 changes: 21 additions & 9 deletions lib/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var DESTROYED = 'destroyed';

var _id = 0;

function supportsClusterTime(topology) {
function hasSessionSupport(topology) {
if (topology == null) return false;
return topology.ismaster == null ? false : topology.ismaster.maxWireVersion >= 6;
}
Expand Down Expand Up @@ -1148,19 +1148,31 @@ Pool.prototype.write = function(commands, options, cb) {
// Get the requestId
operation.requestId = commands[commands.length - 1].requestId;

if (supportsClusterTime(this.topology) && this.topology.clusterTime) {
let $clusterTime = this.topology.clusterTime;
if (operation.session && operation.session.clusterTime) {
$clusterTime = operation.session.clusterTime.clusterTime.greaterThan($clusterTime.clusterTime)
? operation.session.clusterTime
: $clusterTime;
if (hasSessionSupport(this.topology)) {
let sessionOptions = {};
if (this.topology.clusterTime) {
sessionOptions = { $clusterTime: this.topology.clusterTime };
}

if (operation.session) {
if (
operation.session.clusterTime &&
operation.session.clusterTime.clusterTime.greaterThan(
sessionOptions.$clusterTime.clusterTime
)
) {
sessionOptions.$clusterTime = operation.session.clusterTime;
}

sessionOptions.lsid = operation.session.id;
}

// decorate the commands with session-specific details
commands.forEach(command => {
if (command instanceof Query) {
command.query.$clusterTime = $clusterTime;
Object.assign(command.query, sessionOptions);
} else {
command.$clusterTime = $clusterTime;
Object.assign(command, sessionOptions);
}
});
}
Expand Down
14 changes: 12 additions & 2 deletions lib/sessions.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ const Binary = require('mongodb-core').BSON.Binary,
*
*/
class ClientSession {
constructor(topology, options) {
constructor(topology, sessionPool, options) {
if (topology == null) {
throw new Error('ClientSession requires a topology');
}

this.topology = topology;
this.sessionPool = sessionPool;
this.hasEnded = false;
this._serverSession = undefined; // TBD
this.serverSession = sessionPool.dequeue();

options = options || {};
if (typeof options.initialClusterTime !== 'undefined') {
Expand All @@ -39,12 +40,21 @@ class ClientSession {
this.topology.command('admin.$cmd', { endSessions: 1, ids: [this.id] }, err => {
this.hasEnded = true;

// release the server session back to the pool
this.sessionPool.enqueue(this.serverSession);

if (err) return callback(err, null);
callback(null, null);
});
}
}

Object.defineProperty(ClientSession.prototype, 'id', {
get: function() {
return this.serverSession.id;
}
});

/**
*
*/
Expand Down
49 changes: 45 additions & 4 deletions test/tests/unit/single/sessions_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ var Server = require('../../../../lib/topologies/server'),
assign = require('../../../../lib/utils').assign,
mock = require('../../../mock'),
genClusterTime = require('../common').genClusterTime,
ClientSession = require('../../../../lib/sessions').ClientSession;
ClientSession = require('../../../../lib/sessions').ClientSession,
ServerSessionPool = require('../../../../lib/sessions').ServerSessionPool;

const test = {};
describe('Sessions (Single)', function() {
afterEach(() => mock.cleanup());
beforeEach(() => {
return mock.createServer(37019, 'localhost').then(mockServer => {
return mock.createServer().then(mockServer => {
test.server = mockServer;
});
});
Expand Down Expand Up @@ -109,7 +110,8 @@ describe('Sessions (Single)', function() {
});

const client = new Server(test.server.address());
const session = new ClientSession(client);
const sessionPool = new ServerSessionPool(client);
const session = new ClientSession(client, sessionPool);

client.on('error', done);
client.once('connect', () => {
Expand Down Expand Up @@ -190,7 +192,11 @@ describe('Sessions (Single)', function() {
});

const client = new Server(test.server.address());
const session = new ClientSession(client, { initialClusterTime: futureClusterTime });
const sessionPool = new ServerSessionPool(client);
const session = new ClientSession(client, sessionPool, {
initialClusterTime: futureClusterTime
});

client.on('error', done);
client.once('connect', () => {
client.command('admin.$cmd', { ping: 1 }, { session: session }, err => {
Expand Down Expand Up @@ -233,4 +239,39 @@ describe('Sessions (Single)', function() {
client.connect();
}
});

it('should add `lsid` to commands sent to the server with a session', {
metadata: { requires: { topology: 'single' } },
test: function(done) {
const client = new Server(test.server.address());
const sessionPool = new ServerSessionPool(client);
const session = new ClientSession(client, sessionPool);

let sentIsMaster = false;
test.server.setMessageHandler(request => {
if (sentIsMaster) {
expect(request.document.lsid).to.eql(session.id);
request.reply({ ok: 1 });
return;
}

sentIsMaster = true;
request.reply(
assign({}, mock.DEFAULT_ISMASTER, {
maxWireVersion: 6
})
);
});

client.on('error', done);
client.once('connect', () => {
client.command('admin.$cmd', { ping: 1 }, { session: session }, err => {
expect(err).to.not.exist;
done();
});
});

client.connect();
}
});
});

0 comments on commit f15bab6

Please sign in to comment.