Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(retryable-writes): add mongos support for retryable writes
Browse files Browse the repository at this point in the history
NODE-1105
  • Loading branch information
mbroadst committed Dec 1, 2017
1 parent 73ac688 commit 7778067
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 11 deletions.
49 changes: 38 additions & 11 deletions lib/topologies/mongos.js
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
'use strict';

var inherits = require('util').inherits,
const inherits = require('util').inherits,
f = require('util').format,
EventEmitter = require('events').EventEmitter,
BasicCursor = require('../cursor'),
Logger = require('../connection/logger'),
retrieveBSON = require('../connection/utils').retrieveBSON,
MongoError = require('../error').MongoError,
errors = require('../error'),
Server = require('./server'),
clone = require('./shared').clone,
diff = require('./shared').diff,
cloneOptions = require('./shared').cloneOptions,
createClientInfo = require('./shared').createClientInfo,
SessionMixins = require('./shared').SessionMixins;
SessionMixins = require('./shared').SessionMixins,
isRetryableWritesSupported = require('./shared').isRetryableWritesSupported,
txnNumber = require('./shared').txnNumber;

var BSON = retrieveBSON();
const BSON = retrieveBSON();

/**
* @fileOverview The **Mongos** class is a class that represents a Mongos Proxy topology and is
Expand Down Expand Up @@ -879,18 +882,42 @@ Mongos.prototype.isDestroyed = function() {

// Execute write operation
var executeWriteOperation = function(self, op, ns, ops, options, callback) {
if (typeof options === 'function') {
(callback = options), (options = {}), (options = options || {});
}

// Ensure we have no options
if (typeof options === 'function') (callback = options), (options = {});
options = options || {};

// Pick a server
var server = pickProxy(self);
let server = pickProxy(self);
// No server found error out
if (!server) return callback(new MongoError('no mongos proxy available'));
// Execute the command
server[op](ns, ops, options, callback);

if (!options.retryWrites || !options.session || !isRetryableWritesSupported(self)) {
// Execute the command
return server[op](ns, ops, options, callback);
}

// increment and assign txnNumber
options.txnNumber = txnNumber(options.session);

server[op](ns, ops, options, (err, result) => {
if (!err) return callback(null, result);
if (err instanceof errors.MongoNetworkError) {
return callback(err);
}

// Pick another server
server = pickProxy(self);

// No server found error out with original error
if (!server) {
return callback(err);
}

// increment and assign txnNumber
options.txnNumber = txnNumber(options.session);

// rerun the operation
server[op](ns, ops, options, callback);
});
};

/**
Expand Down
14 changes: 14 additions & 0 deletions test/tests/unit/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,19 @@ class ReplSetFixture {
}
}

class MongosFixture {
setup(options) {
options = options || {};
const ismaster = options.ismaster ? options.ismaster : mock.DEFAULT_ISMASTER;
return Promise.all([mock.createServer(), mock.createServer()]).then(servers => {
this.servers = servers;
this.defaultFields = Object.assign({}, ismaster, {
msg: 'isdbgrid'
});
});
}
}

/**
* Creates a cluster time for use in unit testing cluster time gossiping and
* causal consistency.
Expand All @@ -111,5 +124,6 @@ function genClusterTime(time) {

module.exports = {
ReplSetFixture: ReplSetFixture,
MongosFixture: MongosFixture,
genClusterTime: genClusterTime
};
61 changes: 61 additions & 0 deletions test/tests/unit/mongos/retryable_writes_tests.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict';
var expect = require('chai').expect,
Mongos = require('../../../../lib/topologies/mongos'),
mock = require('../../../mock'),
MongosFixture = require('../common').MongosFixture,
ClientSession = require('../../../../lib/sessions').ClientSession,
ServerSessionPool = require('../../../../lib/sessions').ServerSessionPool;

const test = new MongosFixture();
describe('Retryable Writes (Mongos)', function() {
afterEach(() => mock.cleanup());
beforeEach(() => test.setup({ ismaster: mock.DEFAULT_ISMASTER_36 }));

it('should add `txnNumber` to write commands where `retryWrites` is true', {
metadata: { requires: { topology: ['single'] } },
test: function(done) {
const topology = new Mongos(test.servers.map(server => server.address()), {
connectionTimeout: 3000,
socketTimeout: 0,
haInterval: 10000,
localThresholdMS: 500,
size: 1
});

const sessionPool = new ServerSessionPool(topology);
const session = new ClientSession(topology, sessionPool);

let command = null;
const messageHandler = () => {
return request => {
const doc = request.document;
if (doc.ismaster) {
request.reply(test.defaultFields);
} else if (doc.insert) {
command = doc;
request.reply({ ok: 1 });
}
};
};

test.servers[0].setMessageHandler(messageHandler('MONGOS1'));
test.servers[1].setMessageHandler(messageHandler('MONGOS2'));

topology.once('fullsetup', function() {
topology.insert('test.test', [{ a: 1 }], { retryWrites: true, session: session }, function(
err
) {
expect(err).to.not.exist;
expect(command).to.have.property('txnNumber');
expect(command.txnNumber).to.eql(1);

topology.destroy();
done();
});
});

topology.on('error', done);
topology.connect();
}
});
});

0 comments on commit 7778067

Please sign in to comment.