Skip to content

Commit

Permalink
kv: ship high-resolution tscache summaries during lease transfers and…
Browse files Browse the repository at this point in the history
… range merges

Fixes cockroachdb#61986.
Fixes cockroachdb#117486.
Unblocks cockroachdb#118000.

This commit uses the new `tscache.Cache.Serialize` method introduced in cockroachdb#118299 to
ship high-resolution summaries of the timestamp cache during lease transfers and
ranges merges. In doing so, it eliminates the loss of precision that occurs in an
incoming leaseholder's timestamp cache when it receives a lease transfer or range
merge.

This loss of precision was a source of transaction retries for three reasons:
1. txn tombstone marker keys would have their timestamp advanced, leading to
   TransactionAbortedError with the `ABORT_REASON_NEW_LEASE_PREVENTS_TXN`
   reason.
2. txn push marker keys would have their timestamp advanced, leading to
   transactions having their commit timestamp pushed, which could lead to
   TransactionRetryError with the `RETRY_SERIALIZABLE` reason.
3. global keys would have their timestamp advanced as if they had been read,
   also leading to transactions having their commit timestamp pushed if they
   wrote to those keys, which could also lead to TransactionRetryError with the
   `RETRY_SERIALIZABLE` reason.

The first issue here is the most severe, both because it can not be refreshed
away and because it affects transactions of all isolation levels.

This commit introduces two new cluster settings to control the maximum size of
these timestamp cache read summaries:
- `kv.lease_transfer_read_summary.local_budget`
- `kv.lease_transfer_read_summary.global_budget`

It configures the local keyspace budget to 4MB and the global keyspace budget to
0B. This default configuration should be sufficient to eliminate the first two
sources of retries described above. The third has not been observed as a serious
issue in practice, so we configure the global budget to 0 so that we can hit a
serialization fast-path.

Release note (ops change): Transaction replay protection state is now
passed between the outgoing and incoming leaseholder for a range during
a lease transfer. This avoids cases where lease transfers can cause
transactions to throw TransactionAbortedError(ABORT_REASON_NEW_LEASE_PREVENTS_TXN) errors.
  • Loading branch information
nvanbenschoten committed Jan 31, 2024
1 parent d3c3d4e commit 6700e7e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 34 deletions.
36 changes: 24 additions & 12 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,12 +654,13 @@ func TestStoreLeaseTransferTimestampCacheRead(t *testing.T) {
})
}

// TestStoreLeaseTransferTimestampCacheTxnRecord checks the error returned by
// attempts to create a txn record after a lease transfer.
// TestStoreLeaseTransferTimestampCacheTxnRecord checks that no error is
// returned by an attempt to create a txn record after a lease transfer. An
// error is avoided by passing a read summary from the old leaseholder to the
// new leaseholder.
func TestStoreLeaseTransferTimestampCacheTxnRecord(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 117486)
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
Expand All @@ -668,21 +669,32 @@ func TestStoreLeaseTransferTimestampCacheTxnRecord(t *testing.T) {
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)

// Transfer the lease to Servers[0] so we start in a known state. Otherwise,
// Transfer the lease to server 0 so we start in a known state. Otherwise,
// there might be already a lease owned by a random node.
require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(0)))

// Start a txn and perform a write, so that a txn record has to be created by
// the EndTxn.
txn := tc.Servers[0].DB().NewTxn(ctx, "test")
require.NoError(t, txn.Put(ctx, "a", "val"))
// After starting the transaction, transfer the lease. This will wipe the
// timestamp cache, which means that the txn record will not be able to be
// created (because someone might have already aborted the txn).
// the EndTxn when it eventually commits. Don't commit yet.
txn1 := tc.Servers[0].DB().NewTxn(ctx, "test")
require.NoError(t, txn1.Put(ctx, "a", "val"))

// Start another txn and commit. This writes a txn tombstone marker into
// server 0's timestamp cache at a higher timestamp than the first txn's
// tombstone marker will eventually be. Doing so prevents the test from
// fooling itself and passing if only the high water mark of the timestamp
// cache is passed in a read summary during the lease transfer.
txn2 := tc.Servers[0].DB().NewTxn(ctx, "test 2")
require.NoError(t, txn2.Put(ctx, "b", "val"))
require.NoError(t, txn2.Commit(ctx))

// After starting the transaction, transfer the lease. The lease transfer will
// carry over a sufficiently high resolution summary of the old leaseholder's
// timestamp cache so that the new leaseholder can still create a txn record
// for the first txn with certainty that it is not permitting a replay.
require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(1)))

err = txn.Commit(ctx)
require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_NEW_LEASE_PREVENTS_TXN\)`, err)
// Commit the first txn without error.
require.NoError(t, txn1.Commit(ctx))
}

// This test verifies that when a lease is moved to a node that does not match the
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,12 @@ func mergeCheckingTimestampCaches(
ba.RangeID = lhsDesc.RangeID
ba.Add(hb)
var expReason kvpb.TransactionAbortedReason
if disjointLeaseholders || throughSnapshot {
expReason = kvpb.ABORT_REASON_TIMESTAMP_CACHE_REJECTED
if throughSnapshot {
// TODO(nvanbenschoten): this will result in the wrong reason until we
// start compressing the persisted read summary on lease transfers and
// range merges.
// expReason = kvpb.ABORT_REASON_TIMESTAMP_CACHE_REJECTED
expReason = kvpb.ABORT_REASON_ABORTED_RECORD_FOUND
} else {
expReason = kvpb.ABORT_REASON_ABORTED_RECORD_FOUND
}
Expand Down
82 changes: 63 additions & 19 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,32 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

var readSummaryLocalBudget = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kv.lease_transfer_read_summary.local_budget",
"controls the maximum number of bytes that will be used to summarize the local "+
"segment of the timestamp cache during lease transfers and range merges. A smaller "+
"budget will result in loss of precision.",
4<<20, /* 4 MB */
)

var readSummaryGlobalBudget = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kv.lease_transfer_read_summary.global_budget",
"controls the maximum number of bytes that will be used to summarize the global "+
"segment of the timestamp cache during lease transfers and range merges. A smaller "+
"budget will result in loss of precision.",
0,
)

// addToTSCacheChecked adds the specified timestamp to the timestamp cache
// covering the range of keys from start to end. Before doing so, the function
// performs a few assertions to check for proper use of the timestamp cache.
Expand Down Expand Up @@ -583,8 +602,11 @@ func (r *Replica) CanCreateTxnRecord(
return false, kvpb.ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY
case uuid.Nil:
lease, _ /* nextLease */ := r.GetLease()
// Recognize the case where a lease started recently. Lease transfers bump
// the ts cache low water mark.
// Recognize the case where a lease started recently. Lease transfers can
// bump the ts cache low water mark. However, in newer versions of the
// code, the lease transfer will also pass a read summary to the new
// leaseholder, allowing it to be more selective about what parts of the
// ts cache that it bumps and avoiding this case most of the time.
if tombstoneTimestamp == lease.Start.ToTimestamp() {
return false, kvpb.ABORT_REASON_NEW_LEASE_PREVENTS_TXN
}
Expand Down Expand Up @@ -660,7 +682,9 @@ func transactionPushMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key {
// GetCurrentReadSummary returns a new ReadSummary reflecting all reads served
// by the range to this point.
func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary {
sum := collectReadSummaryFromTimestampCache(ctx, r.store.tsCache, r.Desc())
localBudget := readSummaryLocalBudget.Get(&r.store.ClusterSettings().SV)
globalBudget := readSummaryGlobalBudget.Get(&r.store.ClusterSettings().SV)
sum := collectReadSummaryFromTimestampCache(ctx, r.store.tsCache, r.Desc(), localBudget, globalBudget)
// Forward the read summary by the range's closed timestamp, because any
// replica could have served reads below this time. We also return the
// closed timestamp separately, in case callers want it split out.
Expand All @@ -669,24 +693,42 @@ func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary {
return sum
}

// collectReadSummaryFromTimestampCache constucts a read summary for the range
// with the specified descriptor using the timestamp cache.
// collectReadSummaryFromTimestampCache constructs a read summary for the range
// with the specified descriptor using the timestamp cache. The function accepts
// two size budgets, which are used to limit the size of the local and global
// segments of the read summary, respectively.
func collectReadSummaryFromTimestampCache(
ctx context.Context, tc tscache.Cache, desc *roachpb.RangeDescriptor,
ctx context.Context,
tc tscache.Cache,
desc *roachpb.RangeDescriptor,
localBudget, globalBudget int64,
) rspb.ReadSummary {
serializeSegment := func(start, end roachpb.Key, budget int64) rspb.Segment {
var seg rspb.Segment
if budget > 0 {
// Serialize the key range and then compress to the budget.
seg = tc.Serialize(ctx, start, end)
seg.Compress(budget)
} else {
// If the budget is 0, just return the maximum timestamp in the key range.
// This is equivalent to serializing the key range and then compressing
// the resulting segment to 0 bytes, but much cheaper.
seg.LowWater, _ = tc.GetMax(ctx, start, end)
}
return seg
}
var s rspb.ReadSummary
s.Local.LowWater, _ = tc.GetMax(
ctx,
s.Local = serializeSegment(
keys.MakeRangeKeyPrefix(desc.StartKey),
keys.MakeRangeKeyPrefix(desc.EndKey),
localBudget,
)
userKeys := desc.KeySpan()
s.Global.LowWater, _ = tc.GetMax(
ctx,
s.Global = serializeSegment(
userKeys.Key.AsRawKey(),
userKeys.EndKey.AsRawKey(),
globalBudget,
)

return s
}

Expand All @@ -697,19 +739,21 @@ func collectReadSummaryFromTimestampCache(
func applyReadSummaryToTimestampCache(
ctx context.Context, tc tscache.Cache, desc *roachpb.RangeDescriptor, s rspb.ReadSummary,
) {
tc.Add(
ctx,
applySegment := func(start, end roachpb.Key, seg rspb.Segment) {
tc.Add(ctx, start, end, seg.LowWater, uuid.Nil /* txnID */)
for _, rs := range seg.ReadSpans {
tc.Add(ctx, rs.Key, rs.EndKey, rs.Timestamp, rs.TxnID)
}
}
applySegment(
keys.MakeRangeKeyPrefix(desc.StartKey),
keys.MakeRangeKeyPrefix(desc.EndKey),
s.Local.LowWater,
uuid.Nil, /* txnID */
s.Local,
)
userKeys := desc.KeySpan()
tc.Add(
ctx,
applySegment(
userKeys.Key.AsRawKey(),
userKeys.EndKey.AsRawKey(),
s.Global.LowWater,
uuid.Nil, /* txnID */
s.Global,
)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_tscache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestReadSummaryCollectForR1(t *testing.T) {

// Assert that r1's summary was not influenced by the r2 range-local key we
// set above.
summary := collectReadSummaryFromTimestampCache(ctx, tc, &r1desc)
summary := collectReadSummaryFromTimestampCache(ctx, tc, &r1desc, 0, 0)
require.Equal(t, baseTS, summary.Global.LowWater)
require.Equal(t, baseTS, summary.Local.LowWater)
}

0 comments on commit 6700e7e

Please sign in to comment.