diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 4aa87980dc61..329bb74c9a47 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -654,12 +654,13 @@ func TestStoreLeaseTransferTimestampCacheRead(t *testing.T) { }) } -// TestStoreLeaseTransferTimestampCacheTxnRecord checks the error returned by -// attempts to create a txn record after a lease transfer. +// TestStoreLeaseTransferTimestampCacheTxnRecord checks that no error is +// returned by an attempt to create a txn record after a lease transfer. An +// error is avoided by passing a read summary from the old leaseholder to the +// new leaseholder. func TestStoreLeaseTransferTimestampCacheTxnRecord(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 117486) ctx := context.Background() tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{}) defer tc.Stopper().Stop(ctx) @@ -668,21 +669,32 @@ func TestStoreLeaseTransferTimestampCacheTxnRecord(t *testing.T) { rangeDesc, err := tc.LookupRange(key) require.NoError(t, err) - // Transfer the lease to Servers[0] so we start in a known state. Otherwise, + // Transfer the lease to server 0 so we start in a known state. Otherwise, // there might be already a lease owned by a random node. require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(0))) // Start a txn and perform a write, so that a txn record has to be created by - // the EndTxn. - txn := tc.Servers[0].DB().NewTxn(ctx, "test") - require.NoError(t, txn.Put(ctx, "a", "val")) - // After starting the transaction, transfer the lease. This will wipe the - // timestamp cache, which means that the txn record will not be able to be - // created (because someone might have already aborted the txn). + // the EndTxn when it eventually commits. Don't commit yet. + txn1 := tc.Servers[0].DB().NewTxn(ctx, "test") + require.NoError(t, txn1.Put(ctx, "a", "val")) + + // Start another txn and commit. This writes a txn tombstone marker into + // server 0's timestamp cache at a higher timestamp than the first txn's + // tombstone marker will eventually be. Doing so prevents the test from + // fooling itself and passing if only the high water mark of the timestamp + // cache is passed in a read summary during the lease transfer. + txn2 := tc.Servers[0].DB().NewTxn(ctx, "test 2") + require.NoError(t, txn2.Put(ctx, "b", "val")) + require.NoError(t, txn2.Commit(ctx)) + + // After starting the transaction, transfer the lease. The lease transfer will + // carry over a sufficiently high resolution summary of the old leaseholder's + // timestamp cache so that the new leaseholder can still create a txn record + // for the first txn with certainty that it is not permitting a replay. require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(1))) - err = txn.Commit(ctx) - require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_NEW_LEASE_PREVENTS_TXN\)`, err) + // Commit the first txn without error. + require.NoError(t, txn1.Commit(ctx)) } // This test verifies that when a lease is moved to a node that does not match the diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 578f9a3e61ab..dca81fdfbb70 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -875,8 +875,12 @@ func mergeCheckingTimestampCaches( ba.RangeID = lhsDesc.RangeID ba.Add(hb) var expReason kvpb.TransactionAbortedReason - if disjointLeaseholders || throughSnapshot { - expReason = kvpb.ABORT_REASON_TIMESTAMP_CACHE_REJECTED + if throughSnapshot { + // TODO(nvanbenschoten): this will result in the wrong reason until we + // start compressing the persisted read summary on lease transfers and + // range merges. + // expReason = kvpb.ABORT_REASON_TIMESTAMP_CACHE_REJECTED + expReason = kvpb.ABORT_REASON_ABORTED_RECORD_FOUND } else { expReason = kvpb.ABORT_REASON_ABORTED_RECORD_FOUND } diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 8a12aaeac153..68ed50839879 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -28,6 +29,24 @@ import ( "github.com/cockroachdb/redact" ) +var readSummaryLocalBudget = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kv.lease_transfer_read_summary.local_budget", + "controls the maximum number of bytes that will be used to summarize the local "+ + "segment of the timestamp cache during lease transfers and range merges. A smaller "+ + "budget will result in loss of precision.", + 4<<20, /* 4 MB */ +) + +var readSummaryGlobalBudget = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kv.lease_transfer_read_summary.global_budget", + "controls the maximum number of bytes that will be used to summarize the global "+ + "segment of the timestamp cache during lease transfers and range merges. A smaller "+ + "budget will result in loss of precision.", + 0, +) + // addToTSCacheChecked adds the specified timestamp to the timestamp cache // covering the range of keys from start to end. Before doing so, the function // performs a few assertions to check for proper use of the timestamp cache. @@ -583,8 +602,11 @@ func (r *Replica) CanCreateTxnRecord( return false, kvpb.ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY case uuid.Nil: lease, _ /* nextLease */ := r.GetLease() - // Recognize the case where a lease started recently. Lease transfers bump - // the ts cache low water mark. + // Recognize the case where a lease started recently. Lease transfers can + // bump the ts cache low water mark. However, in newer versions of the + // code, the lease transfer will also pass a read summary to the new + // leaseholder, allowing it to be more selective about what parts of the + // ts cache that it bumps and avoiding this case most of the time. if tombstoneTimestamp == lease.Start.ToTimestamp() { return false, kvpb.ABORT_REASON_NEW_LEASE_PREVENTS_TXN } @@ -660,7 +682,9 @@ func transactionPushMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key { // GetCurrentReadSummary returns a new ReadSummary reflecting all reads served // by the range to this point. func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary { - sum := collectReadSummaryFromTimestampCache(ctx, r.store.tsCache, r.Desc()) + localBudget := readSummaryLocalBudget.Get(&r.store.ClusterSettings().SV) + globalBudget := readSummaryGlobalBudget.Get(&r.store.ClusterSettings().SV) + sum := collectReadSummaryFromTimestampCache(ctx, r.store.tsCache, r.Desc(), localBudget, globalBudget) // Forward the read summary by the range's closed timestamp, because any // replica could have served reads below this time. We also return the // closed timestamp separately, in case callers want it split out. @@ -669,24 +693,42 @@ func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary { return sum } -// collectReadSummaryFromTimestampCache constucts a read summary for the range -// with the specified descriptor using the timestamp cache. +// collectReadSummaryFromTimestampCache constructs a read summary for the range +// with the specified descriptor using the timestamp cache. The function accepts +// two size budgets, which are used to limit the size of the local and global +// segments of the read summary, respectively. func collectReadSummaryFromTimestampCache( - ctx context.Context, tc tscache.Cache, desc *roachpb.RangeDescriptor, + ctx context.Context, + tc tscache.Cache, + desc *roachpb.RangeDescriptor, + localBudget, globalBudget int64, ) rspb.ReadSummary { + serializeSegment := func(start, end roachpb.Key, budget int64) rspb.Segment { + var seg rspb.Segment + if budget > 0 { + // Serialize the key range and then compress to the budget. + seg = tc.Serialize(ctx, start, end) + seg.Compress(budget) + } else { + // If the budget is 0, just return the maximum timestamp in the key range. + // This is equivalent to serializing the key range and then compressing + // the resulting segment to 0 bytes, but much cheaper. + seg.LowWater, _ = tc.GetMax(ctx, start, end) + } + return seg + } var s rspb.ReadSummary - s.Local.LowWater, _ = tc.GetMax( - ctx, + s.Local = serializeSegment( keys.MakeRangeKeyPrefix(desc.StartKey), keys.MakeRangeKeyPrefix(desc.EndKey), + localBudget, ) userKeys := desc.KeySpan() - s.Global.LowWater, _ = tc.GetMax( - ctx, + s.Global = serializeSegment( userKeys.Key.AsRawKey(), userKeys.EndKey.AsRawKey(), + globalBudget, ) - return s } @@ -697,19 +739,21 @@ func collectReadSummaryFromTimestampCache( func applyReadSummaryToTimestampCache( ctx context.Context, tc tscache.Cache, desc *roachpb.RangeDescriptor, s rspb.ReadSummary, ) { - tc.Add( - ctx, + applySegment := func(start, end roachpb.Key, seg rspb.Segment) { + tc.Add(ctx, start, end, seg.LowWater, uuid.Nil /* txnID */) + for _, rs := range seg.ReadSpans { + tc.Add(ctx, rs.Key, rs.EndKey, rs.Timestamp, rs.TxnID) + } + } + applySegment( keys.MakeRangeKeyPrefix(desc.StartKey), keys.MakeRangeKeyPrefix(desc.EndKey), - s.Local.LowWater, - uuid.Nil, /* txnID */ + s.Local, ) userKeys := desc.KeySpan() - tc.Add( - ctx, + applySegment( userKeys.Key.AsRawKey(), userKeys.EndKey.AsRawKey(), - s.Global.LowWater, - uuid.Nil, /* txnID */ + s.Global, ) } diff --git a/pkg/kv/kvserver/replica_tscache_test.go b/pkg/kv/kvserver/replica_tscache_test.go index 5cb1fb276e62..040d471fde61 100644 --- a/pkg/kv/kvserver/replica_tscache_test.go +++ b/pkg/kv/kvserver/replica_tscache_test.go @@ -95,7 +95,7 @@ func TestReadSummaryCollectForR1(t *testing.T) { // Assert that r1's summary was not influenced by the r2 range-local key we // set above. - summary := collectReadSummaryFromTimestampCache(ctx, tc, &r1desc) + summary := collectReadSummaryFromTimestampCache(ctx, tc, &r1desc, 0, 0) require.Equal(t, baseTS, summary.Global.LowWater) require.Equal(t, baseTS, summary.Local.LowWater) }