Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat: update proxy selection to consider pinned server on session
Browse files Browse the repository at this point in the history
  • Loading branch information
daprahamian authored and mbroadst committed Feb 27, 2019
1 parent da13e55 commit 189e428
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
11 changes: 10 additions & 1 deletion lib/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,16 @@ function initializeCursor(cursor, callback) {
}
}

return cursor.topology.selectServer(cursor.options, (err, server) => {
// Very explicitly choose what is passed to selectServer
const serverSelectOptions = {};
if (cursor.cursorState.session) {
serverSelectOptions.session = cursor.cursorState.session;
}
if (cursor.options.readPreference) {
serverSelectOptions.readPreference = cursor.options.readPreference;
}

return cursor.topology.selectServer(serverSelectOptions, (err, server) => {
if (err) {
const disconnectHandler = cursor.disconnectHandler;
if (disconnectHandler != null) {
Expand Down
37 changes: 26 additions & 11 deletions lib/topologies/mongos.js
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,14 @@ function connectProxies(self, servers) {
}
}

function pickProxy(self) {
function pickProxy(self, session) {
// TODO: Destructure :)
const transaction = session && session.transaction;

if (transaction && transaction.server) {
return transaction.server;
}

// Get the currently connected Proxies
var connectedProxies = self.connectedProxies.slice(0);

Expand All @@ -488,15 +495,22 @@ function pickProxy(self) {
}
});

let proxy;

// We have no connectedProxies pick first of the connected ones
if (connectedProxies.length === 0) {
return self.connectedProxies[0];
proxy = self.connectedProxies[0];
} else {
// Get proxy
proxy = connectedProxies[self.index % connectedProxies.length];
// Update the index
self.index = (self.index + 1) % connectedProxies.length;
}

if (transaction) {
transaction.pinServer(proxy);
}

// Get proxy
var proxy = connectedProxies[self.index % connectedProxies.length];
// Update the index
self.index = (self.index + 1) % connectedProxies.length;
// Return the proxy
return proxy;
}
Expand Down Expand Up @@ -846,7 +860,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) {
options = options || {};

// Pick a server
let server = pickProxy(self);
let server = pickProxy(self, options.session);
// No server found error out
if (!server) return callback(new MongoError('no mongos proxy available'));

Expand All @@ -866,7 +880,7 @@ var executeWriteOperation = function(self, op, ns, ops, options, callback) {
}

// Pick another server
server = pickProxy(self);
server = pickProxy(self, options.session);

// No server found error out with original error
if (!server || !isRetryableWritesSupported(server)) {
Expand Down Expand Up @@ -1007,7 +1021,7 @@ Mongos.prototype.command = function(ns, cmd, options, callback) {
var self = this;

// Pick a proxy
var server = pickProxy(self);
var server = pickProxy(self, options.session);

// Topology is not connected, save the call in the provided store to be
// Executed at some point when the handler deems it's reconnected
Expand Down Expand Up @@ -1087,7 +1101,8 @@ Mongos.prototype.cursor = function(ns, cmd, options) {
*
* @method
* @param {function} selector Unused
* @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
* @param {ReadPreference} [options.readPreference] Unused
* @param {ClientSession} [options.session] Specify a session if it is being used
* @param {function} callback
*/
Mongos.prototype.selectServer = function(selector, options, callback) {
Expand All @@ -1097,7 +1112,7 @@ Mongos.prototype.selectServer = function(selector, options, callback) {
(callback = options), (options = selector), (selector = undefined);
options = options || {};

const server = pickProxy(this);
const server = pickProxy(this, options.session);
if (this.s.debug) this.emit('pickedServer', null, server);
callback(null, server);
};
Expand Down
1 change: 1 addition & 0 deletions lib/topologies/replset.js
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,7 @@ ReplSet.prototype.isDestroyed = function() {
* @method
* @param {function} selector Unused
* @param {ReadPreference} [options.readPreference] Specify read preference if command supports it
* @param {ClientSession} [options.session] Unused
* @param {function} callback
*/
ReplSet.prototype.selectServer = function(selector, options, callback) {
Expand Down
4 changes: 4 additions & 0 deletions lib/topologies/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,10 @@ Server.prototype.connections = function() {

/**
* Selects a server
* @method
* @param {function} selector Unused
* @param {ReadPreference} [options.readPreference] Unused
* @param {ClientSession} [options.session] Unused
* @return {Server}
*/
Server.prototype.selectServer = function(selector, options, callback) {
Expand Down

0 comments on commit 189e428

Please sign in to comment.