Skip to content

Commit

Permalink
Merge #107658
Browse files Browse the repository at this point in the history
107658: kv: enable replay protection for ambiguous writes on commits r=AlexTalks a=AlexTalks

While previously, RPC failures were assumed to be retriable, as write operations (with the notable exception of `EndTxn`) were assumed to be idempotent, it has been seen in #67765 and documented in #103817 that RPC failures on write operations that occur in parallel with a commit (i.e. a partial batch where `withCommit==true`), it is not always possible to assume idempotency and retry the "ambiguous" writes. This is due to the fact that the retried write RPC could result in the transaction's `WriteTimestamp` being bumped, changing the commit timestamp of the transaction that may in fact already be implicitly committed if the initial "ambiguous" write actually succeeded.

This change modifies the protocol of the DistSender to flag in subsequent retries that a batch with a commit has previously experienced ambiguity, as well as the handling of the retried write in the MVCC layer to detect this previous ambiguity and reject retries that change the write timestamp as a non-idempotent replay. The flag allows subsequent retries to "remember" the earlier ambiguous write and evaluate accordingly.

The flag allows us to properly handle RPC failures (i.e. ambiguous writes) that occur on commit, as a transaction that is implicitly committed is eligible to be marked as explicitly committed by contending transactions via the `RecoverTxn` operation, resulting in a race between retries by the transaction coordinator and recovery by contending transactions that could result in either incorrectly reporting a transaction as having failed with a `RETRY_SERIALIZABLE` error (despite the possibility that it already was or could be recovered and successfully committed), or in attempting to explicitly commit an already-recovered and committed transaction, resulting in seeing an assertion failure due to `transaction unexpectedly committed`.

The replay protection introduced here allows us to avoid both of these situations by detecting a replay that should be considered non-idempotent and returning an error, causing the original RPC error remembered by the DistSender to be propagated as an `AmbiguousResultError`. As such, this can be handled by application code by validating the success/failure of a transaction when receiving this error.

Depends on #107680, #107323, #108154, #108001

Fixes: #103817

Release note (bug fix): Properly handles RPC failures on writes using the parallel commit protocol that execute in parallel to the commit operation, avoiding incorrect retriable failures and  `transaction unexpectedly committed` assertions by detecting when writes  cannot be retried idempotently, instead returning an `AmbiguousResultError`.

Co-authored-by: Alex Sarkesian <[email protected]>
  • Loading branch information
craig[bot] and AlexTalks committed Sep 7, 2023
2 parents 30431f6 + af5b0de commit 8bad483
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 166 deletions.
49 changes: 40 additions & 9 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ func (ds *DistSender) detectIntentMissingDueToIntentResolution(
// We weren't able to determine whether the intent missing error is
// due to intent resolution or not, so it is still ambiguous whether
// the commit succeeded.
return false, kvpb.NewAmbiguousResultErrorf("error=%s [intent missing]", pErr)
return false, kvpb.NewAmbiguousResultErrorf("error=%v [intent missing]", pErr)
}
resp := br.Responses[0].GetQueryTxn()
respTxn := &resp.QueriedTxn
Expand Down Expand Up @@ -2082,7 +2082,8 @@ func maybeSetResumeSpan(
// the error that the last attempt to execute the request returned.
func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
if ambiguousErr != nil {
return kvpb.NewAmbiguousResultErrorf("error=%s [exhausted]", ambiguousErr)
return kvpb.NewAmbiguousResultErrorf("error=%v [exhausted] (last error: %v)",
ambiguousErr, lastAttemptErr)
}

// TODO(bdarnell): The error from the last attempt is not necessarily the best
Expand Down Expand Up @@ -2260,6 +2261,28 @@ func (ds *DistSender) sendToReplicas(
ba = ba.ShallowCopy()
ba.Replica = curReplica
ba.RangeID = desc.RangeID

// When a sub-batch from a batch containing a commit experiences an
// ambiguous error, it is critical to ensure subsequent replay attempts
// do not permit changing the write timestamp, as the transaction may
// already have been considered implicitly committed.
ba.AmbiguousReplayProtection = ambiguousError != nil

// In the case that the batch has already seen an ambiguous error, in
// addition to enabling ambiguous replay protection, we also need to
// disable the ability for the server to forward the read timestamp, as
// the transaction may have been implicitly committed. If the intents for
// the implicitly committed transaction were already resolved, on a replay
// attempt encountering committed values above the read timestamp the
// server will attempt to handle what seems to be a write-write conflict by
// throwing a WriteTooOld, which could be refreshed away on the server if
// the read timestamp can be moved. Disabling this ability protects against
// refreshing away the error when retrying the ambiguous operation, instead
// returning to the DistSender so the ambiguous error can be propagated.
if ambiguousError != nil && ba.CanForwardReadTimestamp {
ba.CanForwardReadTimestamp = false
}

// Communicate to the server the information our cache has about the
// range. If it's stale, the server will return an update.
ba.ClientRangeInfo = roachpb.ClientRangeInfo{
Expand Down Expand Up @@ -2294,10 +2317,13 @@ func (ds *DistSender) sendToReplicas(
ds.maybeIncrementErrCounters(br, err)

if err != nil {
log.VErrEventf(ctx, 2, "RPC error: %s", err)

if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
if ambiguousError != nil {
return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError)
return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate] (last error: %v)",
ambiguousError, err)
}
return nil, err
}
Expand All @@ -2319,10 +2345,6 @@ func (ds *DistSender) sendToReplicas(
// ambiguity.
// 2) SQL recognizes AmbiguousResultErrors and gives them a special code
// (StatementCompletionUnknown).
// TODO(andrei): The use of this code is inconsistent because a) the
// DistSender tries to only return the code for commits, but it'll happily
// forward along AmbiguousResultErrors coming from the replica and b) we
// probably should be returning that code for non-commit statements too.
//
// We retry requests in order to avoid returning errors (in particular,
// AmbiguousResultError). Retrying the batch will either:
Expand All @@ -2337,6 +2359,12 @@ func (ds *DistSender) sendToReplicas(
// can't claim success (and even if we could claim success, we still
// wouldn't have the complete result of the successful evaluation).
//
// Note that in case c), a request is not idempotent if the retry finds
// the request succeeded the first time around, but requires a change to
// the transaction's write timestamp. This is guarded against by setting
// the AmbiguousReplayProtection flag, so that the replay is aware the
// batch has seen an ambiguous error.
//
// Case a) is great - the retry made the request succeed. Case b) is also
// good; due to idempotency we managed to swallow a communication error.
// Case c) is not great - we'll end up returning an error even though the
Expand All @@ -2349,10 +2377,12 @@ func (ds *DistSender) sendToReplicas(
// evaluating twice, overwriting another unrelated write that fell
// in-between.
//
// NB: If this partial batch does not contain the EndTxn request but the
// batch contains a commit, the ambiguous error should be caught on
// retrying the writes, should it need to be propagated.
if withCommit && !grpcutil.RequestDidNotStart(err) {
ambiguousError = err
}
log.VErrEventf(ctx, 2, "RPC error: %s", err)

// If the error wasn't just a context cancellation and the down replica
// is cached as the lease holder, evict it. The only other eviction
Expand Down Expand Up @@ -2508,7 +2538,8 @@ func (ds *DistSender) sendToReplicas(
}
default:
if ambiguousError != nil {
return nil, kvpb.NewAmbiguousResultErrorf("error=%s [propagate]", ambiguousError)
return nil, kvpb.NewAmbiguousResultErrorf("error=%v [propagate] (last error: %v)",
ambiguousError, br.Error.GoError())
}

// The error received is likely not specific to this
Expand Down
Loading

0 comments on commit 8bad483

Please sign in to comment.