Skip to content

Commit

Permalink
feat(sessions): adding implicit cursor session support
Browse files Browse the repository at this point in the history
Adds support for implicit cursor sessions. Cursors will
now lazily create sessions for their operations if no session
is supplied, and will take care of cleaning up the created
session when the cursor is exhausted.

Fixes NODE-1326
  • Loading branch information
daprahamian authored and mbroadst committed Mar 19, 2018
1 parent da51ac5 commit a81245b
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 15 deletions.
8 changes: 6 additions & 2 deletions lib/command_cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ var CommandCursor = function(bson, ns, cmd, options, topology, topologyOptions)
// Topology Options
topologyOptions: topologyOptions,
// Promise library
promiseLibrary: promiseLibrary
promiseLibrary: promiseLibrary,
// Optional ClientSession
session: options.session
};
};

Expand Down Expand Up @@ -145,7 +147,9 @@ var methodsToInherit = [
'isDead',
'explain',
'isNotified',
'isKilled'
'isKilled',
'_endSession',
'_initImplicitSession'
];

// Only inherit the types we need
Expand Down
45 changes: 36 additions & 9 deletions lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,33 @@ var Cursor = function(bson, ns, cmd, options, topology, topologyOptions) {
inherits(Cursor, Readable);

// Map core cursor _next method so we can apply mapping
CoreCursor.prototype._next = CoreCursor.prototype.next;
Cursor.prototype._next = function() {
if (this._initImplicitSession) {
this._initImplicitSession();
}
return CoreCursor.prototype.next.apply(this, arguments);
};

for (var name in CoreCursor.prototype) {
Cursor.prototype[name] = CoreCursor.prototype[name];
}

var define = (Cursor.define = new Define('Cursor', Cursor, true));

Cursor.prototype._initImplicitSession = function() {
if (!this.s.session && this.s.topology.hasSessionSupport()) {
this.s.session = this.s.topology.startSession({ owner: this });
this.cursorState.session = this.s.session;
}
};

Cursor.prototype._endSession = function() {
const didCloseCursor = CoreCursor.prototype._endSession.apply(this, arguments);
if (didCloseCursor) {
this.s.session = undefined;
}
};

/**
* Check if there is any document still available in the cursor
* @method
Expand Down Expand Up @@ -929,7 +948,11 @@ var toArray = function(self, callback) {
// Fetch all the documents
var fetchDocs = function() {
self._next(function(err, doc) {
if (err) return handleCallback(callback, err);
if (err) {
return self._endSession
? self._endSession(() => handleCallback(callback, err))
: handleCallback(callback, err);
}
if (doc == null) {
return self.close({ skipKillCursors: true }, () => handleCallback(callback, null, items));
}
Expand Down Expand Up @@ -985,17 +1008,21 @@ Cursor.prototype.count = function(applySkipLimit, opts, callback) {
if (typeof opts === 'function') (callback = opts), (opts = {});
opts = opts || {};

return executeOperation(this.s.topology, count, [this, applySkipLimit, opts, callback], {
skipSessions: true
});
};

var count = function(self, applySkipLimit, opts, callback) {
if (typeof applySkipLimit === 'function') {
callback = applySkipLimit;
applySkipLimit = true;
}

if (this.s.session) {
opts = Object.assign({}, opts, { session: this.s.session });
}

return executeOperation(this.s.topology, count, [this, applySkipLimit, opts, callback], {
skipSessions: !!this.s.session
});
};

var count = function(self, applySkipLimit, opts, callback) {
if (applySkipLimit) {
if (typeof self.cursorSkip() === 'number') opts.skip = self.cursorSkip();
if (typeof self.cursorLimit() === 'number') opts.limit = self.cursorLimit();
Expand Down Expand Up @@ -1080,7 +1107,7 @@ Cursor.prototype.close = function(options, callback) {
};

if (this.s.session) {
return this.s.session.endSession(() => completeClose());
return this._endSession(() => completeClose());
}

return completeClose();
Expand Down
2 changes: 1 addition & 1 deletion lib/mongo_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ define.staticMethod('connect', { callback: true, promise: true });
* @return {ClientSession} the newly established session
*/
MongoClient.prototype.startSession = function(options) {
options = options || {};
options = Object.assign({ explicit: true }, options);
if (!this.topology) {
throw new MongoError('Must connect to a server before calling this method');
}
Expand Down
7 changes: 4 additions & 3 deletions lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,12 @@ const executeOperation = (topology, operation, args, options) => {

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
let session, opOptions;
let session, opOptions, owner;
if (!options.skipSessions && topology.hasSessionSupport()) {
opOptions = args[args.length - 2];
if (opOptions == null || opOptions.session == null) {
session = topology.startSession();
owner = {};
session = topology.startSession({ owner });
const optionsIndex = args.length - 2;
args[optionsIndex] = Object.assign({}, args[optionsIndex], { session: session });
} else if (opOptions.session && opOptions.session.hasEnded) {
Expand All @@ -392,7 +393,7 @@ const executeOperation = (topology, operation, args, options) => {

const makeExecuteCallback = (resolve, reject) =>
function executeCallback(err, result) {
if (session && !options.returnsCursor) {
if (session && session.owner === owner && !options.returnsCursor) {
session.endSession(() => {
delete opOptions.session;
if (err) return reject(err);
Expand Down

0 comments on commit a81245b

Please sign in to comment.