Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(sessions): support sessions with cursors with find/getMore
Browse files Browse the repository at this point in the history
NODE-1088
  • Loading branch information
mbroadst committed Oct 6, 2017
1 parent f15bab6 commit a016602
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 6 deletions.
7 changes: 7 additions & 0 deletions lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ var Cursor = function(bson, ns, cmd, options, topology, topologyOptions) {
this.cursorState.promoteLongs = topologyOptions.promoteLongs;
} else if (typeof options.promoteLongs === 'boolean') {
this.cursorState.promoteLongs = options.promoteLongs;
} else if (typeof options.session === 'object') {
this.cursorState.session = options.session;
}

// Add promoteValues to cursor state
Expand Down Expand Up @@ -286,6 +288,11 @@ Cursor.prototype._find = function(callback) {
if (typeof self.cursorState.promoteBuffers === 'boolean') {
queryOptions.promoteBuffers = self.cursorState.promoteBuffers;
}

if (typeof self.cursorState.session === 'object') {
queryOptions.session = self.cursorState.session;
}

// Write the initial command out
self.server.s.pool.write(self.query, queryOptions, queryCallback);
};
Expand Down
4 changes: 4 additions & 0 deletions lib/wireprotocol/2_6_support.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ WireProtocol.prototype.getMore = function(
queryOptions.promoteBuffers = cursorState.promoteBuffers;
}

if (typeof cursorState.session === 'object') {
queryOptions.session = cursorState.session;
}

// Write out the getMore command
connection.write(getMore, queryOptions, queryCallback);
};
Expand Down
4 changes: 4 additions & 0 deletions lib/wireprotocol/3_2_support.js
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@ WireProtocol.prototype.getMore = function(
queryOptions.promoteBuffers = cursorState.promoteBuffers;
}

if (typeof cursorState.session === 'object') {
queryOptions.session = cursorState.session;
}

// Write out the getMore command
connection.write(query, queryOptions, queryCallback);
};
Expand Down
94 changes: 88 additions & 6 deletions test/tests/unit/single/sessions_tests.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
'use strict';
var Server = require('../../../../lib/topologies/server'),
Long = require('bson').Long,
ObjectId = require('bson').ObjectId,
expect = require('chai').expect,
assign = require('../../../../lib/utils').assign,
mock = require('../../../mock'),
Expand Down Expand Up @@ -138,10 +140,12 @@ describe('Sessions (Single)', function() {
metadata: { requires: { topology: 'single' } },
test: function(done) {
const clusterTime = genClusterTime(Date.now());
let sentIsMaster = false;
let sentIsMaster = false,
command = null;

test.server.setMessageHandler(request => {
if (sentIsMaster) {
expect(request.document.$clusterTime).to.eql(clusterTime);
command = request.document;
request.reply({ ok: 1 });
return;
}
Expand All @@ -160,6 +164,8 @@ describe('Sessions (Single)', function() {
client.once('connect', () => {
client.command('admin.$cmd', { ping: 1 }, err => {
expect(err).to.not.exist;
expect(command.$clusterTime).to.eql(clusterTime);

done();
});
});
Expand All @@ -174,10 +180,11 @@ describe('Sessions (Single)', function() {
const clusterTime = genClusterTime(Date.now()),
futureClusterTime = genClusterTime(Date.now() + 10 * 60 * 1000);

let sentIsMaster = false;
let sentIsMaster = false,
command = null;
test.server.setMessageHandler(request => {
if (sentIsMaster) {
expect(request.document.$clusterTime).to.eql(futureClusterTime);
command = request.document;
request.reply({ ok: 1 });
return;
}
Expand All @@ -201,6 +208,7 @@ describe('Sessions (Single)', function() {
client.once('connect', () => {
client.command('admin.$cmd', { ping: 1 }, { session: session }, err => {
expect(err).to.not.exist;
expect(command.$clusterTime).to.eql(futureClusterTime);
done();
});
});
Expand Down Expand Up @@ -247,10 +255,11 @@ describe('Sessions (Single)', function() {
const sessionPool = new ServerSessionPool(client);
const session = new ClientSession(client, sessionPool);

let sentIsMaster = false;
let sentIsMaster = false,
command = null;
test.server.setMessageHandler(request => {
if (sentIsMaster) {
expect(request.document.lsid).to.eql(session.id);
command = request.document;
request.reply({ ok: 1 });
return;
}
Expand All @@ -267,11 +276,84 @@ describe('Sessions (Single)', function() {
client.once('connect', () => {
client.command('admin.$cmd', { ping: 1 }, { session: session }, err => {
expect(err).to.not.exist;
expect(command.document.lsid).to.eql(session.id);
done();
});
});

client.connect();
}
});

it('should use the same session for all getMore issued by a cursor', {
metadata: { requires: { topology: 'single' } },
test: function(done) {
const client = new Server(test.server.address());
const sessionPool = new ServerSessionPool(client);
const session = new ClientSession(client, sessionPool);

let commands = [];
test.server.setMessageHandler(request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(
assign({}, mock.DEFAULT_ISMASTER, {
maxWireVersion: 6
})
);
} else if (doc.find) {
commands.push(doc);
request.reply({
cursor: {
id: Long.fromNumber(1),
ns: 'test.t',
firstBatch: []
},
ok: 1
});
} else if (doc.getMore) {
commands.push(doc);
request.reply({
cursor: {
id: Long.ZERO,
ns: 'test.t',
nextBatch: [{ _id: new ObjectId(), a: 1 }]
},
ok: 1
});
}
});

client.on('error', done);
client.once('connect', () => {
const cursor = client.cursor(
'test.test',
{
find: 'test',
query: {},
batchSize: 2
},
{
session: session
}
);

// Execute next
cursor.next(function(err) {
expect(err).to.not.exist;
expect(commands[0].lsid).to.eql(session.id);

cursor.next(function(err) {
expect(err).to.not.exist;
expect(commands[1].lsid).to.eql(session.id);

client.destroy();
done();
});
});
});

client.connect();
}
});
});

0 comments on commit a016602

Please sign in to comment.