Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(sessions): adding implicit cursor session support
Browse files Browse the repository at this point in the history
Adds ability to add owner to session (which allows you to note
who is responsible for closing the session), and adds part of
implicit session support to cursors.

Part of NODE-1326
  • Loading branch information
daprahamian authored and mbroadst committed Mar 12, 2018
1 parent d0b0346 commit 1607321
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 20 deletions.
63 changes: 43 additions & 20 deletions lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,27 @@ Cursor.prototype.cursorSkip = function() {
return this.cursorState.skip;
};

Cursor.prototype._endSession = function(options, callback) {
if (typeof options === 'function') {
callback = options;
options = {};
}
options = options || {};

const session = this.cursorState.session;

if (session && (options.force || session.owner === this)) {
this.cursorState.session = undefined;
session.endSession(callback);
return true;
}

if (callback) {
callback();
}
return false;
};

//
// Handle callback (including any exceptions thrown)
var handleCallback = function(callback, err, result) {
Expand Down Expand Up @@ -475,15 +496,11 @@ Cursor.prototype.rewind = function() {
*/
var isConnectionDead = function(self, callback) {
if (self.pool && self.pool.isDestroyed()) {
self.cursorState.notified = true;
self.cursorState.killed = true;
self.cursorState.documents = [];
self.cursorState.cursorIndex = 0;
callback(
new MongoNetworkError(
f('connection to host %s:%s was destroyed', self.pool.host, self.pool.port)
)
const err = new MongoNetworkError(
f('connection to host %s:%s was destroyed', self.pool.host, self.pool.port)
);
_setCursorNotifiedImpl(self, () => callback(err));
return true;
}

Expand All @@ -496,11 +513,8 @@ var isConnectionDead = function(self, callback) {
var isCursorDeadButNotkilled = function(self, callback) {
// Cursor is dead but not marked killed, return null
if (self.cursorState.dead && !self.cursorState.killed) {
self.cursorState.notified = true;
self.cursorState.killed = true;
self.cursorState.documents = [];
self.cursorState.cursorIndex = 0;
handleCallback(callback, null, null);
setCursorNotified(self, callback);
return true;
}

Expand All @@ -524,10 +538,7 @@ var isCursorDeadAndKilled = function(self, callback) {
*/
var isCursorKilled = function(self, callback) {
if (self.cursorState.killed) {
self.cursorState.notified = true;
self.cursorState.documents = [];
self.cursorState.cursorIndex = 0;
handleCallback(callback, null, null);
setCursorNotified(self, callback);
return true;
}

Expand All @@ -539,20 +550,24 @@ var isCursorKilled = function(self, callback) {
*/
var setCursorDeadAndNotified = function(self, callback) {
self.cursorState.dead = true;
self.cursorState.notified = true;
self.cursorState.documents = [];
self.cursorState.cursorIndex = 0;
handleCallback(callback, null, null);
setCursorNotified(self, callback);
};

/**
* Mark cursor as being notified
*/
var setCursorNotified = function(self, callback) {
_setCursorNotifiedImpl(self, () => handleCallback(callback, null, null));
};

var _setCursorNotifiedImpl = function(self, callback) {
self.cursorState.notified = true;
self.cursorState.documents = [];
self.cursorState.cursorIndex = 0;
handleCallback(callback, null, null);
if (self._endSession) {
return self._endSession(undefined, () => callback());
}
return callback();
};

var nextFunction = function(self, callback) {
Expand Down Expand Up @@ -654,6 +669,10 @@ var nextFunction = function(self, callback) {
self._find(function(err) {
if (err) return handleCallback(callback, err, null);

if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) {
self._endSession();
}

if (
self.cursorState.documents.length === 0 &&
self.cursorState.cursorId &&
Expand Down Expand Up @@ -696,6 +715,10 @@ var nextFunction = function(self, callback) {
self._getmore(function(err, doc, connection) {
if (err) return handleCallback(callback, err);

if (self.cursorState.cursorId && self.cursorState.cursorId.isZero() && self._endSession) {
self._endSession();
}

// Save the returned connection to ensure all getMore's fire over the same connection
self.connection = connection;

Expand Down
3 changes: 3 additions & 0 deletions lib/sessions.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class ClientSession extends EventEmitter {
}

this.operationTime = null;

this.explicit = !!options.explicit;
this.owner = options.owner;
}

/**
Expand Down

0 comments on commit 1607321

Please sign in to comment.