From 42aa206796831e0bd7fe06344193dbe52525684d Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Wed, 24 Jan 2024 23:34:13 -0500 Subject: [PATCH] kv: ship high-resolution tscache summaries during lease transfers and range merges Fixes #61986. Fixes #117486. Unblocks #118000. This commit uses the new `tscache.Cache.Serialize` method introduced in #118299 to ship high-resolution summaries of the timestamp cache during lease transfers and ranges merges. In doing so, it eliminates the loss of precision that occurs in an incoming leaseholder's timestamp cache when it receives a lease transfer or range merge. This loss of precision was a source of transaction retries for three reasons: 1. txn tombstone marker keys would have their timestamp advanced, leading to TransactionAbortedError with the `ABORT_REASON_NEW_LEASE_PREVENTS_TXN` reason. 2. txn push marker keys would have their timestamp advanced, leading to transactions having their commit timestamp pushed, which could lead to TransactionRetryError with the `RETRY_SERIALIZABLE` reason. 3. global keys would have their timestamp advanced as if they had been read, also leading to transactions having their commit timestamp pushed if they wrote to those keys, which could also lead to TransactionRetryError with the `RETRY_SERIALIZABLE` reason. The first issue here is the most severe, both because it can not be refreshed away and because it affects transactions of all isolation levels. This commit introduces two new cluster settings to control the maximum size of these timestamp cache read summaries: - `kv.lease_transfer_read_summary.local_budget` - `kv.lease_transfer_read_summary.global_budget` It configures the local keyspace budget to 4MB and the global keyspace budget to 0B. This default configuration should be sufficient to eliminate the first two sources of retries described above. The third has not been observed as a serious issue in practice, so we configure the global budget to 0 so that we can hit a serialization fast-path. Release note (ops change): Transaction replay protection state is now passed between the outgoing and incoming leaseholder for a range during a lease transfer. This avoids cases where lease transfers can cause transactions to throw TransactionAbortedError(ABORT_REASON_NEW_LEASE_PREVENTS_TXN) errors. --- docs/generated/settings/settings.html | 2 + pkg/kv/kvserver/client_lease_test.go | 86 ++++++++++++++++------- pkg/kv/kvserver/client_merge_test.go | 8 ++- pkg/kv/kvserver/replica_tscache.go | 93 ++++++++++++++++++++----- pkg/kv/kvserver/replica_tscache_test.go | 2 +- 5 files changed, 145 insertions(+), 46 deletions(-) diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 9fdd803ef77a..67ea4fa0db3b 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -68,6 +68,8 @@
kv.closed_timestamp.side_transport_interval
duration200msthe interval at which the closed timestamp side-transport attempts to advance each range's closed timestamp; set to 0 to disable the side-transportServerless/Dedicated/Self-Hosted (read-only)
kv.closed_timestamp.target_duration
duration3sif nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this durationServerless/Dedicated/Self-Hosted (read-only)
kv.consistency_queue.testing_fast_efos_acquisition.enabled
booleanfalseset to true to speed up EventuallyFileOnlySnapshot acquisition/transition for tests at the expense of excessive flushesDedicated/Self-Hosted +
kv.lease_transfer_read_summary.global_budget
byte size0 Bcontrols the maximum number of bytes that will be used to summarize the global segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.Dedicated/Self-Hosted +
kv.lease_transfer_read_summary.local_budget
byte size4.0 MiBcontrols the maximum number of bytes that will be used to summarize the local segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.Dedicated/Self-Hosted
kv.log_range_and_node_events.enabled
booleantrueset to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventologDedicated/Self-Hosted
kv.protectedts.reconciliation.interval
duration5m0sthe frequency for reconciling jobs with protected timestamp recordsServerless/Dedicated/Self-Hosted (read-only)
kv.range_split.by_load.enabled
booleantrueallow automatic splits of ranges based on where load is concentratedDedicated/Self-Hosted diff --git a/pkg/kv/kvserver/client_lease_test.go b/pkg/kv/kvserver/client_lease_test.go index 4aa87980dc61..25a6cc5fac46 100644 --- a/pkg/kv/kvserver/client_lease_test.go +++ b/pkg/kv/kvserver/client_lease_test.go @@ -654,35 +654,73 @@ func TestStoreLeaseTransferTimestampCacheRead(t *testing.T) { }) } -// TestStoreLeaseTransferTimestampCacheTxnRecord checks the error returned by -// attempts to create a txn record after a lease transfer. +// TestStoreLeaseTransferTimestampCacheTxnRecord checks whether an error is +// returned by an attempt to create a txn record after a lease transfer. +// +// If the local read summary is given a sufficient size budget then information +// about individual transaction tombstone markers can be passed from the old +// leaseholder to the new leaseholder. This allows the new leaseholder to +// conclusively determine that a transaction tombstone marker did not exist for +// the transaction, avoiding any error when the transaction creates its record. +// +// However, if the local read summary is compressed due to an insufficient size +// budget then the new leaseholder must assume that a transaction tombstone +// marker may have existed for the transaction. As a result, an error is thrown +// when the transaction creates its record. func TestStoreLeaseTransferTimestampCacheTxnRecord(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 117486) - ctx := context.Background() - tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{}) - defer tc.Stopper().Stop(ctx) - key := []byte("a") - rangeDesc, err := tc.LookupRange(key) - require.NoError(t, err) + testutils.RunTrueAndFalse(t, "sufficient-budget", func(t *testing.T, budget bool) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + if !budget { + kvserver.ReadSummaryLocalBudget.Override(ctx, &st.SV, 0) + } + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: st, + }, + }) + defer tc.Stopper().Stop(ctx) - // Transfer the lease to Servers[0] so we start in a known state. Otherwise, - // there might be already a lease owned by a random node. - require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(0))) - - // Start a txn and perform a write, so that a txn record has to be created by - // the EndTxn. - txn := tc.Servers[0].DB().NewTxn(ctx, "test") - require.NoError(t, txn.Put(ctx, "a", "val")) - // After starting the transaction, transfer the lease. This will wipe the - // timestamp cache, which means that the txn record will not be able to be - // created (because someone might have already aborted the txn). - require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(1))) - - err = txn.Commit(ctx) - require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_NEW_LEASE_PREVENTS_TXN\)`, err) + key := []byte("a") + rangeDesc, err := tc.LookupRange(key) + require.NoError(t, err) + + // Transfer the lease to server 0, so we start in a known state. Otherwise, + // there might be already a lease owned by a random node. + require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(0))) + + // Start a txn and perform a write, so that a txn record has to be created by + // the EndTxn when it eventually commits. Don't commit yet. + txn1 := tc.Servers[0].DB().NewTxn(ctx, "test") + require.NoError(t, txn1.Put(ctx, "a", "val")) + + // Start another txn and commit. This writes a txn tombstone marker into + // server 0's timestamp cache at a higher timestamp than the first txn's + // tombstone marker will eventually be. Doing so prevents the test from + // fooling itself and passing if only the high water mark of the timestamp + // cache is passed in a read summary during the lease transfer. + txn2 := tc.Servers[0].DB().NewTxn(ctx, "test 2") + require.NoError(t, txn2.Put(ctx, "b", "val")) + require.NoError(t, txn2.Commit(ctx)) + + // After starting the transaction, transfer the lease. The lease transfer will + // carry over a sufficiently high resolution summary of the old leaseholder's + // timestamp cache so that the new leaseholder can still create a txn record + // for the first txn with certainty that it is not permitting a replay. + require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(1))) + + // Try to commit the first txn. + err = txn1.Commit(ctx) + if budget { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_TIMESTAMP_CACHE_REJECTED\)`, err) + } + }) } // This test verifies that when a lease is moved to a node that does not match the diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 578f9a3e61ab..dca81fdfbb70 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -875,8 +875,12 @@ func mergeCheckingTimestampCaches( ba.RangeID = lhsDesc.RangeID ba.Add(hb) var expReason kvpb.TransactionAbortedReason - if disjointLeaseholders || throughSnapshot { - expReason = kvpb.ABORT_REASON_TIMESTAMP_CACHE_REJECTED + if throughSnapshot { + // TODO(nvanbenschoten): this will result in the wrong reason until we + // start compressing the persisted read summary on lease transfers and + // range merges. + // expReason = kvpb.ABORT_REASON_TIMESTAMP_CACHE_REJECTED + expReason = kvpb.ABORT_REASON_ABORTED_RECORD_FOUND } else { expReason = kvpb.ABORT_REASON_ABORTED_RECORD_FOUND } diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 8a12aaeac153..46543b482a20 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -28,6 +29,32 @@ import ( "github.com/cockroachdb/redact" ) +// ReadSummaryLocalBudget controls the maximum number of bytes that will be used +// to summarize the local segment of the timestamp cache during lease transfers +// and range merges. +var ReadSummaryLocalBudget = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kv.lease_transfer_read_summary.local_budget", + "controls the maximum number of bytes that will be used to summarize the local "+ + "segment of the timestamp cache during lease transfers and range merges. A smaller "+ + "budget will result in loss of precision.", + 4<<20, /* 4 MB */ + settings.WithPublic, +) + +// ReadSummaryGlobalBudget controls the maximum number of bytes that will be +// used to summarize the global segment of the timestamp cache during lease +// transfers and range merges. +var ReadSummaryGlobalBudget = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kv.lease_transfer_read_summary.global_budget", + "controls the maximum number of bytes that will be used to summarize the global "+ + "segment of the timestamp cache during lease transfers and range merges. A smaller "+ + "budget will result in loss of precision.", + 0, + settings.WithPublic, +) + // addToTSCacheChecked adds the specified timestamp to the timestamp cache // covering the range of keys from start to end. Before doing so, the function // performs a few assertions to check for proper use of the timestamp cache. @@ -583,8 +610,11 @@ func (r *Replica) CanCreateTxnRecord( return false, kvpb.ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY case uuid.Nil: lease, _ /* nextLease */ := r.GetLease() - // Recognize the case where a lease started recently. Lease transfers bump - // the ts cache low water mark. + // Recognize the case where a lease started recently. Lease transfers can + // bump the ts cache low water mark. However, in versions of the code >= + // 24.1, the lease transfer will also pass a read summary to the new + // leaseholder, allowing it to be more selective about what parts of the + // ts cache that it bumps and avoiding this case most of the time. if tombstoneTimestamp == lease.Start.ToTimestamp() { return false, kvpb.ABORT_REASON_NEW_LEASE_PREVENTS_TXN } @@ -660,7 +690,9 @@ func transactionPushMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key { // GetCurrentReadSummary returns a new ReadSummary reflecting all reads served // by the range to this point. func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary { - sum := collectReadSummaryFromTimestampCache(ctx, r.store.tsCache, r.Desc()) + localBudget := ReadSummaryLocalBudget.Get(&r.store.ClusterSettings().SV) + globalBudget := ReadSummaryGlobalBudget.Get(&r.store.ClusterSettings().SV) + sum := collectReadSummaryFromTimestampCache(ctx, r.store.tsCache, r.Desc(), localBudget, globalBudget) // Forward the read summary by the range's closed timestamp, because any // replica could have served reads below this time. We also return the // closed timestamp separately, in case callers want it split out. @@ -669,24 +701,45 @@ func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary { return sum } -// collectReadSummaryFromTimestampCache constucts a read summary for the range -// with the specified descriptor using the timestamp cache. +// collectReadSummaryFromTimestampCache constructs a read summary for the range +// with the specified descriptor using the timestamp cache. The function accepts +// two size budgets, which are used to limit the size of the local and global +// segments of the read summary, respectively. func collectReadSummaryFromTimestampCache( - ctx context.Context, tc tscache.Cache, desc *roachpb.RangeDescriptor, + ctx context.Context, + tc tscache.Cache, + desc *roachpb.RangeDescriptor, + localBudget, globalBudget int64, ) rspb.ReadSummary { + serializeSegment := func(start, end roachpb.Key, budget int64) rspb.Segment { + var seg rspb.Segment + if budget > 0 { + // Serialize the key range and then compress to the budget. + seg = tc.Serialize(ctx, start, end) + // TODO(nvanbenschoten): return a boolean from this function indicating + // whether the segment lost precision when being compressed. Then use that + // to increment a metric to provide observability. + seg.Compress(budget) + } else { + // If the budget is 0, just return the maximum timestamp in the key range. + // This is equivalent to serializing the key range and then compressing + // the resulting segment to 0 bytes, but much cheaper. + seg.LowWater, _ = tc.GetMax(ctx, start, end) + } + return seg + } var s rspb.ReadSummary - s.Local.LowWater, _ = tc.GetMax( - ctx, + s.Local = serializeSegment( keys.MakeRangeKeyPrefix(desc.StartKey), keys.MakeRangeKeyPrefix(desc.EndKey), + localBudget, ) userKeys := desc.KeySpan() - s.Global.LowWater, _ = tc.GetMax( - ctx, + s.Global = serializeSegment( userKeys.Key.AsRawKey(), userKeys.EndKey.AsRawKey(), + globalBudget, ) - return s } @@ -697,19 +750,21 @@ func collectReadSummaryFromTimestampCache( func applyReadSummaryToTimestampCache( ctx context.Context, tc tscache.Cache, desc *roachpb.RangeDescriptor, s rspb.ReadSummary, ) { - tc.Add( - ctx, + applySegment := func(start, end roachpb.Key, seg rspb.Segment) { + tc.Add(ctx, start, end, seg.LowWater, uuid.Nil /* txnID */) + for _, rs := range seg.ReadSpans { + tc.Add(ctx, rs.Key, rs.EndKey, rs.Timestamp, rs.TxnID) + } + } + applySegment( keys.MakeRangeKeyPrefix(desc.StartKey), keys.MakeRangeKeyPrefix(desc.EndKey), - s.Local.LowWater, - uuid.Nil, /* txnID */ + s.Local, ) userKeys := desc.KeySpan() - tc.Add( - ctx, + applySegment( userKeys.Key.AsRawKey(), userKeys.EndKey.AsRawKey(), - s.Global.LowWater, - uuid.Nil, /* txnID */ + s.Global, ) } diff --git a/pkg/kv/kvserver/replica_tscache_test.go b/pkg/kv/kvserver/replica_tscache_test.go index 5cb1fb276e62..040d471fde61 100644 --- a/pkg/kv/kvserver/replica_tscache_test.go +++ b/pkg/kv/kvserver/replica_tscache_test.go @@ -95,7 +95,7 @@ func TestReadSummaryCollectForR1(t *testing.T) { // Assert that r1's summary was not influenced by the r2 range-local key we // set above. - summary := collectReadSummaryFromTimestampCache(ctx, tc, &r1desc) + summary := collectReadSummaryFromTimestampCache(ctx, tc, &r1desc, 0, 0) require.Equal(t, baseTS, summary.Global.LowWater) require.Equal(t, baseTS, summary.Local.LowWater) }