From 73ac6884baa6148c440b747ccd73bcf87e0d44e5 Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Mon, 27 Nov 2017 16:29:54 -0500 Subject: [PATCH] feat(retryable-writes): initial support on replicasets NODE-1105 --- lib/topologies/replset.js | 44 ++++++++++---- lib/topologies/shared.js | 39 +++++++++++- .../unit/replset/retryable_writes_tests.js | 59 +++++++++++++++++++ 3 files changed, 129 insertions(+), 13 deletions(-) create mode 100644 test/tests/unit/replset/retryable_writes_tests.js diff --git a/lib/topologies/replset.js b/lib/topologies/replset.js index eeb563223..195886bf4 100644 --- a/lib/topologies/replset.js +++ b/lib/topologies/replset.js @@ -8,13 +8,16 @@ var inherits = require('util').inherits, retrieveBSON = require('../connection/utils').retrieveBSON, Logger = require('../connection/logger'), MongoError = require('../error').MongoError, + errors = require('../error'), Server = require('./server'), ReplSetState = require('./replset_state'), clone = require('./shared').clone, Timeout = require('./shared').Timeout, Interval = require('./shared').Interval, createClientInfo = require('./shared').createClientInfo, - SessionMixins = require('./shared').SessionMixins; + SessionMixins = require('./shared').SessionMixins, + isRetryableWritesSupported = require('./shared').isRetryableWritesSupported, + txnNumber = require('./shared').txnNumber; var MongoCR = require('../auth/mongocr'), X509 = require('../auth/x509'), @@ -1154,20 +1157,39 @@ ReplSet.prototype.getServers = function() { // // Execute write operation var executeWriteOperation = function(self, op, ns, ops, options, callback) { - if (typeof options === 'function') { - (callback = options), (options = {}), (options = options || {}); - } - - // Ensure we have no options + if (typeof options === 'function') (callback = options), (options = {}); options = options || {}; - // No server returned we had an error - if (self.s.replicaSetState.primary == null) { - return callback(new MongoError('no primary server found')); + if (!options.retryWrites || !options.session || !isRetryableWritesSupported(self)) { + // No server returned we had an error + if (self.s.replicaSetState.primary == null) { + return callback(new MongoError('no primary server found')); + } + + // Execute the command + return self.s.replicaSetState.primary[op](ns, ops, options, callback); } - // Execute the command - self.s.replicaSetState.primary[op](ns, ops, options, callback); + // increment and assign txnNumber + options.txnNumber = txnNumber(options.session); + + self.s.replicaSetState.primary[op](ns, ops, options, (err, result) => { + if (!err) return callback(null, result); + if (err instanceof errors.MongoNetworkError) { + return callback(err); + } + + // check again, this might have changed in the interim + if (self.s.replicaSetState.primary == null) { + return callback(new MongoError('no primary server found')); + } + + // increment and assign txnNumber + options.txnNumber = txnNumber(options.session); + + // rerun the operation + self.s.replicaSetState.primary[op](ns, ops, options, callback); + }); }; /** diff --git a/lib/topologies/shared.js b/lib/topologies/shared.js index b97eb012a..833c71a0d 100644 --- a/lib/topologies/shared.js +++ b/lib/topologies/shared.js @@ -1,8 +1,11 @@ 'use strict'; -var os = require('os'), +const os = require('os'), f = require('util').format, - ReadPreference = require('./read_preference'); + ReadPreference = require('./read_preference'), + retrieveBSON = require('../connection/utils').retrieveBSON; + +const BSON = retrieveBSON(); /** * Emit event if it exists @@ -402,6 +405,36 @@ const SessionMixins = { } }; +const RETRYABLE_WIRE_VERSION = 6; + +/** + * Determines whether the provided topology supports retryable writes + * + * @param {Mongos|Replset} topology + */ +const isRetryableWritesSupported = function(topology) { + const maxWireVersion = topology.lastIsMaster().maxWireVersion; + if (maxWireVersion < RETRYABLE_WIRE_VERSION) { + return false; + } + + if (!topology.logicalSessionTimeoutMinutes) { + return false; + } + + return true; +}; + +/** + * Increment the transaction number on the ServerSession contained by the provided ClientSession + * + * @param {ClientSession} session + */ +const txnNumber = function(session) { + session.serverSession.txnNumber++; + return BSON.Long.fromNumber(session.serverSession.txnNumber); +}; + module.exports.SessionMixins = SessionMixins; module.exports.resolveClusterTime = resolveClusterTime; module.exports.inquireServerState = inquireServerState; @@ -415,3 +448,5 @@ module.exports.clone = clone; module.exports.diff = diff; module.exports.Interval = Interval; module.exports.Timeout = Timeout; +module.exports.isRetryableWritesSupported = isRetryableWritesSupported; +module.exports.txnNumber = txnNumber; diff --git a/test/tests/unit/replset/retryable_writes_tests.js b/test/tests/unit/replset/retryable_writes_tests.js new file mode 100644 index 000000000..a4a719cc9 --- /dev/null +++ b/test/tests/unit/replset/retryable_writes_tests.js @@ -0,0 +1,59 @@ +'use strict'; +var expect = require('chai').expect, + ReplSet = require('../../../../lib/topologies/replset'), + mock = require('../../../mock'), + ReplSetFixture = require('../common').ReplSetFixture, + ClientSession = require('../../../../lib/sessions').ClientSession, + ServerSessionPool = require('../../../../lib/sessions').ServerSessionPool; + +const test = new ReplSetFixture(); +describe('Sessions (ReplSet)', function() { + afterEach(() => mock.cleanup()); + beforeEach(() => test.setup({ ismaster: mock.DEFAULT_ISMASTER_36 })); + + it('should add `txnNumber` to write commands where `retryWrites` is true', { + metadata: { requires: { topology: ['single'] } }, + test: function(done) { + var replset = new ReplSet( + [test.primaryServer.address(), test.firstSecondaryServer.address()], + { + setName: 'rs', + connectionTimeout: 3000, + socketTimeout: 0, + haInterval: 100, + size: 1 + } + ); + + const sessionPool = new ServerSessionPool(replset); + const session = new ClientSession(replset, sessionPool); + + let command = null; + test.primaryServer.setMessageHandler(request => { + const doc = request.document; + if (doc.ismaster) { + request.reply(test.primaryStates[0]); + } else if (doc.insert) { + command = doc; + request.reply({ ok: 1 }); + } + }); + + replset.on('all', () => { + replset.insert('test.test', [{ a: 1 }], { retryWrites: true, session: session }, function( + err + ) { + expect(err).to.not.exist; + expect(command).to.have.property('txnNumber'); + expect(command.txnNumber).to.eql(1); + + replset.destroy(); + done(); + }); + }); + + replset.on('error', done); + replset.connect(); + } + }); +});