Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(transactions): tack recovery token for sharded transactions
Browse files Browse the repository at this point in the history
NODE-1764
  • Loading branch information
mbroadst committed Feb 27, 2019
1 parent 03b64e9 commit e12ae70
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 11 deletions.
24 changes: 13 additions & 11 deletions lib/connection/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -428,23 +428,25 @@ function messageHandler(self) {
return handleOperationCallback(self, workItem.cb, new MongoError(err));
}

// Look for clusterTime, and operationTime and update them if necessary
// Look for clusterTime, operationTime, and recoveryToken and update them if necessary
if (message.documents[0]) {
if (message.documents[0].$clusterTime) {
const $clusterTime = message.documents[0].$clusterTime;
const session = workItem.session;
const document = message.documents[0];
if (document.$clusterTime) {
const $clusterTime = document.$clusterTime;
self.topology.clusterTime = $clusterTime;

if (workItem.session != null) {
resolveClusterTime(workItem.session, $clusterTime);
if (session != null) {
resolveClusterTime(session, $clusterTime);
}
}

if (
message.documents[0].operationTime &&
workItem.session &&
workItem.session.supports.causalConsistency
) {
workItem.session.advanceOperationTime(message.documents[0].operationTime);
if (document.operationTime && session && session.supports.causalConsistency) {
session.advanceOperationTime(message.documents[0].operationTime);
}

if (document.recoveryToken && session && session.inTransaction()) {
session.transaction._recoveryToken = document.recoveryToken;
}
}

Expand Down
4 changes: 4 additions & 0 deletions lib/sessions.js
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ function endTransaction(session, commandName, callback) {
return commandName === 'commitTransaction' ? err : null;
}

if (session.transaction.recoveryToken) {
command.recoveryToken = session.transaction.recoveryToken;
}

// send the command
session.topology.command('admin.$cmd', command, { session }, (err, reply) => {
if (err && isRetryableError(err)) {
Expand Down
9 changes: 9 additions & 0 deletions lib/transactions.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,21 @@ class Transaction {

// TODO: This isn't technically necessary
this._pinnedServer = undefined;
this._recoveryToken = undefined;
}

get isPinned() {
return this._pinnedServer != null;
}

get server() {
return this._pinnedServer;
}

get recoveryToken() {
return this._recoveryToken;
}

/**
* @ignore
* @return Whether this session is presently in a transaction
Expand Down

0 comments on commit e12ae70

Please sign in to comment.