Skip to content

Commit

Permalink
kv: ship high-resolution tscache summaries during lease transfers and…
Browse files Browse the repository at this point in the history
… range merges

Fixes cockroachdb#61986.
Fixes cockroachdb#117486.
Unblocks cockroachdb#118000.

This commit uses the new `tscache.Cache.Serialize` method introduced in cockroachdb#118299 to
ship high-resolution summaries of the timestamp cache during lease transfers and
ranges merges. In doing so, it eliminates the loss of precision that occurs in an
incoming leaseholder's timestamp cache when it receives a lease transfer or range
merge.

This loss of precision was a source of transaction retries for three reasons:
1. txn tombstone marker keys would have their timestamp advanced, leading to
   TransactionAbortedError with the `ABORT_REASON_NEW_LEASE_PREVENTS_TXN`
   reason.
2. txn push marker keys would have their timestamp advanced, leading to
   transactions having their commit timestamp pushed, which could lead to
   TransactionRetryError with the `RETRY_SERIALIZABLE` reason.
3. global keys would have their timestamp advanced as if they had been read,
   also leading to transactions having their commit timestamp pushed if they
   wrote to those keys, which could also lead to TransactionRetryError with the
   `RETRY_SERIALIZABLE` reason.

The first issue here is the most severe, both because it can not be refreshed
away and because it affects transactions of all isolation levels.

This commit introduces two new cluster settings to control the maximum size of
these timestamp cache read summaries:
- `kv.lease_transfer_read_summary.local_budget`
- `kv.lease_transfer_read_summary.global_budget`

It configures the local keyspace budget to 4MB and the global keyspace budget to
0B. This default configuration should be sufficient to eliminate the first two
sources of retries described above. The third has not been observed as a serious
issue in practice, so we configure the global budget to 0 so that we can hit a
serialization fast-path.

Release note (ops change): Transaction replay protection state is now
passed between the outgoing and incoming leaseholder for a range during
a lease transfer. This avoids cases where lease transfers can cause
transactions to throw TransactionAbortedError(ABORT_REASON_NEW_LEASE_PREVENTS_TXN) errors.
  • Loading branch information
nvanbenschoten authored and wenyihu6 committed Feb 21, 2024
1 parent abce40d commit 42aa206
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 46 deletions.
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
<tr><td><div id="setting-kv-closed-timestamp-side-transport-interval" class="anchored"><code>kv.closed_timestamp.side_transport_interval</code></div></td><td>duration</td><td><code>200ms</code></td><td>the interval at which the closed timestamp side-transport attempts to advance each range&#39;s closed timestamp; set to 0 to disable the side-transport</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-closed-timestamp-target-duration" class="anchored"><code>kv.closed_timestamp.target_duration</code></div></td><td>duration</td><td><code>3s</code></td><td>if nonzero, attempt to provide closed timestamp notifications for timestamps trailing cluster time by approximately this duration</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-consistency-queue-testing-fast-efos-acquisition-enabled" class="anchored"><code>kv.consistency_queue.testing_fast_efos_acquisition.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>set to true to speed up EventuallyFileOnlySnapshot acquisition/transition for tests at the expense of excessive flushes</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-lease-transfer-read-summary-global-budget" class="anchored"><code>kv.lease_transfer_read_summary.global_budget</code></div></td><td>byte size</td><td><code>0 B</code></td><td>controls the maximum number of bytes that will be used to summarize the global segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-lease-transfer-read-summary-local-budget" class="anchored"><code>kv.lease_transfer_read_summary.local_budget</code></div></td><td>byte size</td><td><code>4.0 MiB</code></td><td>controls the maximum number of bytes that will be used to summarize the local segment of the timestamp cache during lease transfers and range merges. A smaller budget will result in loss of precision.</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-log-range-and-node-events-enabled" class="anchored"><code>kv.log_range_and_node_events.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to transactionally log range events (e.g., split, merge, add/remove voter/non-voter) into system.rangelogand node join and restart events into system.eventolog</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-protectedts-reconciliation-interval" class="anchored"><code>kv.protectedts.reconciliation.interval</code></div></td><td>duration</td><td><code>5m0s</code></td><td>the frequency for reconciling jobs with protected timestamp records</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-kv-range-split-by-load-enabled" class="anchored"><code>kv.range_split.by_load.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>allow automatic splits of ranges based on where load is concentrated</td><td>Dedicated/Self-Hosted</td></tr>
Expand Down
86 changes: 62 additions & 24 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,35 +654,73 @@ func TestStoreLeaseTransferTimestampCacheRead(t *testing.T) {
})
}

// TestStoreLeaseTransferTimestampCacheTxnRecord checks the error returned by
// attempts to create a txn record after a lease transfer.
// TestStoreLeaseTransferTimestampCacheTxnRecord checks whether an error is
// returned by an attempt to create a txn record after a lease transfer.
//
// If the local read summary is given a sufficient size budget then information
// about individual transaction tombstone markers can be passed from the old
// leaseholder to the new leaseholder. This allows the new leaseholder to
// conclusively determine that a transaction tombstone marker did not exist for
// the transaction, avoiding any error when the transaction creates its record.
//
// However, if the local read summary is compressed due to an insufficient size
// budget then the new leaseholder must assume that a transaction tombstone
// marker may have existed for the transaction. As a result, an error is thrown
// when the transaction creates its record.
func TestStoreLeaseTransferTimestampCacheTxnRecord(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 117486)
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

key := []byte("a")
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)
testutils.RunTrueAndFalse(t, "sufficient-budget", func(t *testing.T, budget bool) {
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
if !budget {
kvserver.ReadSummaryLocalBudget.Override(ctx, &st.SV, 0)
}
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: st,
},
})
defer tc.Stopper().Stop(ctx)

// Transfer the lease to Servers[0] so we start in a known state. Otherwise,
// there might be already a lease owned by a random node.
require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(0)))

// Start a txn and perform a write, so that a txn record has to be created by
// the EndTxn.
txn := tc.Servers[0].DB().NewTxn(ctx, "test")
require.NoError(t, txn.Put(ctx, "a", "val"))
// After starting the transaction, transfer the lease. This will wipe the
// timestamp cache, which means that the txn record will not be able to be
// created (because someone might have already aborted the txn).
require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(1)))

err = txn.Commit(ctx)
require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_NEW_LEASE_PREVENTS_TXN\)`, err)
key := []byte("a")
rangeDesc, err := tc.LookupRange(key)
require.NoError(t, err)

// Transfer the lease to server 0, so we start in a known state. Otherwise,
// there might be already a lease owned by a random node.
require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(0)))

// Start a txn and perform a write, so that a txn record has to be created by
// the EndTxn when it eventually commits. Don't commit yet.
txn1 := tc.Servers[0].DB().NewTxn(ctx, "test")
require.NoError(t, txn1.Put(ctx, "a", "val"))

// Start another txn and commit. This writes a txn tombstone marker into
// server 0's timestamp cache at a higher timestamp than the first txn's
// tombstone marker will eventually be. Doing so prevents the test from
// fooling itself and passing if only the high water mark of the timestamp
// cache is passed in a read summary during the lease transfer.
txn2 := tc.Servers[0].DB().NewTxn(ctx, "test 2")
require.NoError(t, txn2.Put(ctx, "b", "val"))
require.NoError(t, txn2.Commit(ctx))

// After starting the transaction, transfer the lease. The lease transfer will
// carry over a sufficiently high resolution summary of the old leaseholder's
// timestamp cache so that the new leaseholder can still create a txn record
// for the first txn with certainty that it is not permitting a replay.
require.NoError(t, tc.TransferRangeLease(rangeDesc, tc.Target(1)))

// Try to commit the first txn.
err = txn1.Commit(ctx)
if budget {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Regexp(t, `TransactionAbortedError\(ABORT_REASON_TIMESTAMP_CACHE_REJECTED\)`, err)
}
})
}

// This test verifies that when a lease is moved to a node that does not match the
Expand Down
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,8 +875,12 @@ func mergeCheckingTimestampCaches(
ba.RangeID = lhsDesc.RangeID
ba.Add(hb)
var expReason kvpb.TransactionAbortedReason
if disjointLeaseholders || throughSnapshot {
expReason = kvpb.ABORT_REASON_TIMESTAMP_CACHE_REJECTED
if throughSnapshot {
// TODO(nvanbenschoten): this will result in the wrong reason until we
// start compressing the persisted read summary on lease transfers and
// range merges.
// expReason = kvpb.ABORT_REASON_TIMESTAMP_CACHE_REJECTED
expReason = kvpb.ABORT_REASON_ABORTED_RECORD_FOUND
} else {
expReason = kvpb.ABORT_REASON_ABORTED_RECORD_FOUND
}
Expand Down
93 changes: 74 additions & 19 deletions pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,40 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/tscache"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// ReadSummaryLocalBudget controls the maximum number of bytes that will be used
// to summarize the local segment of the timestamp cache during lease transfers
// and range merges.
var ReadSummaryLocalBudget = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kv.lease_transfer_read_summary.local_budget",
"controls the maximum number of bytes that will be used to summarize the local "+
"segment of the timestamp cache during lease transfers and range merges. A smaller "+
"budget will result in loss of precision.",
4<<20, /* 4 MB */
settings.WithPublic,
)

// ReadSummaryGlobalBudget controls the maximum number of bytes that will be
// used to summarize the global segment of the timestamp cache during lease
// transfers and range merges.
var ReadSummaryGlobalBudget = settings.RegisterByteSizeSetting(
settings.SystemOnly,
"kv.lease_transfer_read_summary.global_budget",
"controls the maximum number of bytes that will be used to summarize the global "+
"segment of the timestamp cache during lease transfers and range merges. A smaller "+
"budget will result in loss of precision.",
0,
settings.WithPublic,
)

// addToTSCacheChecked adds the specified timestamp to the timestamp cache
// covering the range of keys from start to end. Before doing so, the function
// performs a few assertions to check for proper use of the timestamp cache.
Expand Down Expand Up @@ -583,8 +610,11 @@ func (r *Replica) CanCreateTxnRecord(
return false, kvpb.ABORT_REASON_RECORD_ALREADY_WRITTEN_POSSIBLE_REPLAY
case uuid.Nil:
lease, _ /* nextLease */ := r.GetLease()
// Recognize the case where a lease started recently. Lease transfers bump
// the ts cache low water mark.
// Recognize the case where a lease started recently. Lease transfers can
// bump the ts cache low water mark. However, in versions of the code >=
// 24.1, the lease transfer will also pass a read summary to the new
// leaseholder, allowing it to be more selective about what parts of the
// ts cache that it bumps and avoiding this case most of the time.
if tombstoneTimestamp == lease.Start.ToTimestamp() {
return false, kvpb.ABORT_REASON_NEW_LEASE_PREVENTS_TXN
}
Expand Down Expand Up @@ -660,7 +690,9 @@ func transactionPushMarker(key roachpb.Key, txnID uuid.UUID) roachpb.Key {
// GetCurrentReadSummary returns a new ReadSummary reflecting all reads served
// by the range to this point.
func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary {
sum := collectReadSummaryFromTimestampCache(ctx, r.store.tsCache, r.Desc())
localBudget := ReadSummaryLocalBudget.Get(&r.store.ClusterSettings().SV)
globalBudget := ReadSummaryGlobalBudget.Get(&r.store.ClusterSettings().SV)
sum := collectReadSummaryFromTimestampCache(ctx, r.store.tsCache, r.Desc(), localBudget, globalBudget)
// Forward the read summary by the range's closed timestamp, because any
// replica could have served reads below this time. We also return the
// closed timestamp separately, in case callers want it split out.
Expand All @@ -669,24 +701,45 @@ func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary {
return sum
}

// collectReadSummaryFromTimestampCache constucts a read summary for the range
// with the specified descriptor using the timestamp cache.
// collectReadSummaryFromTimestampCache constructs a read summary for the range
// with the specified descriptor using the timestamp cache. The function accepts
// two size budgets, which are used to limit the size of the local and global
// segments of the read summary, respectively.
func collectReadSummaryFromTimestampCache(
ctx context.Context, tc tscache.Cache, desc *roachpb.RangeDescriptor,
ctx context.Context,
tc tscache.Cache,
desc *roachpb.RangeDescriptor,
localBudget, globalBudget int64,
) rspb.ReadSummary {
serializeSegment := func(start, end roachpb.Key, budget int64) rspb.Segment {
var seg rspb.Segment
if budget > 0 {
// Serialize the key range and then compress to the budget.
seg = tc.Serialize(ctx, start, end)
// TODO(nvanbenschoten): return a boolean from this function indicating
// whether the segment lost precision when being compressed. Then use that
// to increment a metric to provide observability.
seg.Compress(budget)
} else {
// If the budget is 0, just return the maximum timestamp in the key range.
// This is equivalent to serializing the key range and then compressing
// the resulting segment to 0 bytes, but much cheaper.
seg.LowWater, _ = tc.GetMax(ctx, start, end)
}
return seg
}
var s rspb.ReadSummary
s.Local.LowWater, _ = tc.GetMax(
ctx,
s.Local = serializeSegment(
keys.MakeRangeKeyPrefix(desc.StartKey),
keys.MakeRangeKeyPrefix(desc.EndKey),
localBudget,
)
userKeys := desc.KeySpan()
s.Global.LowWater, _ = tc.GetMax(
ctx,
s.Global = serializeSegment(
userKeys.Key.AsRawKey(),
userKeys.EndKey.AsRawKey(),
globalBudget,
)

return s
}

Expand All @@ -697,19 +750,21 @@ func collectReadSummaryFromTimestampCache(
func applyReadSummaryToTimestampCache(
ctx context.Context, tc tscache.Cache, desc *roachpb.RangeDescriptor, s rspb.ReadSummary,
) {
tc.Add(
ctx,
applySegment := func(start, end roachpb.Key, seg rspb.Segment) {
tc.Add(ctx, start, end, seg.LowWater, uuid.Nil /* txnID */)
for _, rs := range seg.ReadSpans {
tc.Add(ctx, rs.Key, rs.EndKey, rs.Timestamp, rs.TxnID)
}
}
applySegment(
keys.MakeRangeKeyPrefix(desc.StartKey),
keys.MakeRangeKeyPrefix(desc.EndKey),
s.Local.LowWater,
uuid.Nil, /* txnID */
s.Local,
)
userKeys := desc.KeySpan()
tc.Add(
ctx,
applySegment(
userKeys.Key.AsRawKey(),
userKeys.EndKey.AsRawKey(),
s.Global.LowWater,
uuid.Nil, /* txnID */
s.Global,
)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_tscache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestReadSummaryCollectForR1(t *testing.T) {

// Assert that r1's summary was not influenced by the r2 range-local key we
// set above.
summary := collectReadSummaryFromTimestampCache(ctx, tc, &r1desc)
summary := collectReadSummaryFromTimestampCache(ctx, tc, &r1desc, 0, 0)
require.Equal(t, baseTS, summary.Global.LowWater)
require.Equal(t, baseTS, summary.Local.LowWater)
}

0 comments on commit 42aa206

Please sign in to comment.