Skip to content

Commit

Permalink
kv/kvclient: support generalized server-side refreshes
Browse files Browse the repository at this point in the history
This commit generalizes the concept of a server-side transaction refresh
for any request issued by a transaction before it has accumulated
refresh spans. Server-side refreshes were previously only possible for
batches that contained an EndTxn, but this was limiting for no
particularly strong reason.

The commit supports this new functionality by adding a new flag to the
BatchRequest header called `CanForwardReadTimestamp`. This flag
indicates that the batch can be evaluated at a higher timestamp than the
transaction's read timestamp. It is set by the client if the transaction
has not performed any reads that must be refreshed prior to sending this
current batch. When set, it allows the server to handle pushes and write
too old conditions locally.

In the future, we'd like to remove the EndTxn.CanCommitAtHigherTimestamp
flag entirely in favor of this more general mechanism.

While adding support for this new form of server-side refresh, the commit
also adds a retry loop on the read-only request evaluation path. This is
almost never exercised because most read-only requests acquire MVCC read
latches and also don't throw WriteTooOld errors, but it can be exercised
for locking scans, which acquire MVCC write latches and do occasionally
throw WriteTooOld errors.

Release justification: necessary to avoid a performance regression due
to implicit SELECT FOR UPDATE in certain cases where UPDATEs are heavily
contended. Before implicit SFU, a heavily contended UPDATE statement
would always eventually commit after 3 retries:
epoch 1: {Scan} succeeds, {Put, EndTxn} hits WriteTooOld
epoch 2: {Scan} succeeds, {Put} defers WriteTooOld and writes intent,
         {EndTxn} fails
epoch 3: {Scan} succeeds, {Put} rewrites intent, {EndTxn} succeeds

After SFU but before this change we could get cases where the {Scan}
continues to hit WriteTooOld errors and has no bound on the number
of attempts it will make.

After SFU and this change the UPDATE will always commit after exactly
1 retry:
epoch 1: {Scan} hits WriteTooOld and server-side refreshes, {Put, EndTxn}
         succeeds.
  • Loading branch information
nvanbenschoten committed Mar 12, 2020
1 parent 11bffb2 commit 972915d
Show file tree
Hide file tree
Showing 14 changed files with 1,314 additions and 737 deletions.
45 changes: 37 additions & 8 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 26 additions & 5 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 28 additions & 21 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,22 +618,23 @@ func (ds *DistSender) initAndVerifyBatch(
var errNo1PCTxn = roachpb.NewErrorf("cannot send 1PC txn to multiple ranges")

// splitBatchAndCheckForRefreshSpans splits the batch according to the
// canSplitET parameter and checks whether the final request is an
// EndTxn. If so, the EndTxnRequest.CanCommitAtHigherTimestamp
// flag is reset to indicate whether earlier parts of the split may
// result in refresh spans.
// canSplitET parameter and checks whether the batch can forward its
// read timestamp. If the batch has its CanForwardReadTimestamp flag
// set but is being split across multiple sub-batches then the flag in
// the batch header is unset.
func splitBatchAndCheckForRefreshSpans(
ba roachpb.BatchRequest, canSplitET bool,
ba *roachpb.BatchRequest, canSplitET bool,
) [][]roachpb.RequestUnion {
parts := ba.Split(canSplitET)
// If the final part contains an EndTxn, we need to check
// whether earlier split parts contain any refresh spans and properly
// set the CanCommitAtHigherTimestamp flag on the end transaction.
lastPart := parts[len(parts)-1]
lastReq := lastPart[len(lastPart)-1].GetInner()
if et, ok := lastReq.(*roachpb.EndTxnRequest); ok && et.CanCommitAtHigherTimestamp {

// If the batch is split and the header has its CanForwardReadTimestamp flag
// set then we much check whether any request would need to be refreshed in
// the event that the one of the partial batches was to forward its read
// timestamp during a server-side refresh. If any such request exists then
// we unset the CanForwardReadTimestamp flag.
if len(parts) > 1 && ba.CanForwardReadTimestamp {
hasRefreshSpans := func() bool {
for _, part := range parts[:len(parts)-1] {
for _, part := range parts {
for _, req := range part {
if roachpb.NeedsRefresh(req.GetInner()) {
return true
Expand All @@ -643,11 +644,18 @@ func splitBatchAndCheckForRefreshSpans(
return false
}()
if hasRefreshSpans {
etCopy := *et
etCopy.CanCommitAtHigherTimestamp = false
lastPart = append([]roachpb.RequestUnion(nil), lastPart...)
lastPart[len(lastPart)-1].MustSetInner(&etCopy)
parts[len(parts)-1] = lastPart
ba.CanForwardReadTimestamp = false

// If the final part contains an EndTxn request, unset its
// CanCommitAtHigherTimestamp flag as well.
lastPart := parts[len(parts)-1]
if et := lastPart[len(lastPart)-1].GetEndTxn(); et != nil {
etCopy := *et
etCopy.CanCommitAtHigherTimestamp = false
lastPart = append([]roachpb.RequestUnion(nil), lastPart...)
lastPart[len(lastPart)-1].MustSetInner(&etCopy)
parts[len(parts)-1] = lastPart
}
}
}
return parts
Expand Down Expand Up @@ -691,7 +699,7 @@ func (ds *DistSender) Send(
if ba.Txn != nil && ba.Txn.Epoch > 0 && !require1PC {
splitET = true
}
parts := splitBatchAndCheckForRefreshSpans(ba, splitET)
parts := splitBatchAndCheckForRefreshSpans(&ba, splitET)
if len(parts) > 1 && (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) {
// We already verified above that the batch contains only scan requests of the same type.
// Such a batch should never need splitting.
Expand Down Expand Up @@ -737,7 +745,7 @@ func (ds *DistSender) Send(
} else if require1PC {
log.Fatalf(ctx, "required 1PC transaction cannot be split: %s", ba)
}
parts = splitBatchAndCheckForRefreshSpans(ba, true /* split ET */)
parts = splitBatchAndCheckForRefreshSpans(&ba, true /* split ET */)
// Restart transaction of the last chunk as multiple parts with
// EndTxn in the last part.
continue
Expand Down Expand Up @@ -1056,8 +1064,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
// Clone the BatchRequest's transaction so that future mutations to the
// proto don't affect the proto in this batch.
if ba.Txn != nil {
txnCopy := *ba.Txn
ba.Txn = &txnCopy
ba.Txn = ba.Txn.Clone()
}
// Get initial seek key depending on direction of iteration.
var scanDir ScanDirection
Expand Down
Loading

0 comments on commit 972915d

Please sign in to comment.