Skip to content

Commit

Permalink
storage: Add byte pagination to synchronous intent resolution / EndTxn
Browse files Browse the repository at this point in the history
Informs: cockroachdb#77228

Intent resolution batches are sequenced on raft and each batch can
consist of 100-200 intents. If an intent key or even value in some cases
are large, it is possible that resolving all intents in the batch would
result in a raft command size exceeding the max raft command size
kv.raft.command.max_size.

To address this, we add support for TargetBytes in resolve intent and
resolve intent range commands, allowing us to stop resolving intents in
the batch as soon as we exceed the TargetBytes max bytes limit.

This PR adds byte pagination for synchronous intent resolution (i.e.
the EndTxn / End Transaction command).

Release note: None
  • Loading branch information
KaiSun314 committed Feb 24, 2023
1 parent f426a48 commit f9992b6
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 17 deletions.
45 changes: 28 additions & 17 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ func IsEndTxnTriggeringRetryError(
}

const lockResolutionBatchSize = 500
const lockResolutionBatchByteSize = 4 << 20 // 4 MB.

// resolveLocalLocks synchronously resolves any locks that are local to this
// range in the same batch and returns those lock spans. The remainder are
Expand All @@ -541,17 +542,19 @@ func resolveLocalLocks(
evalCtx EvalContext,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
var resolveAllowance int64 = lockResolutionBatchSize
var targetBytes int64 = lockResolutionBatchByteSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't
// enforce the resolve allowance. These transactions rely on having
// their locks resolved synchronously.
resolveAllowance = 0
targetBytes = 0
}
return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance)
return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance, targetBytes)
}

// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key
// limit.
// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key and
// target bytes limit.
func resolveLocalLocksWithPagination(
ctx context.Context,
desc *roachpb.RangeDescriptor,
Expand All @@ -561,6 +564,7 @@ func resolveLocalLocksWithPagination(
txn *roachpb.Transaction,
evalCtx EvalContext,
maxKeys int64,
targetBytes int64,
) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) {
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
Expand Down Expand Up @@ -593,24 +597,31 @@ func resolveLocalLocksWithPagination(
//
// Note that the underlying pebbleIterator will still be reused
// since readWriter is a pebbleBatch in the typical case.
ok, _, _, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{})
ok, numBytes, resumeSpan, err := storage.MVCCResolveWriteIntent(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentOptions{TargetBytes: targetBytes})
if err != nil {
return 0, 0, 0, err
}
if ok {
numKeys = 1
}
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return 0, 0, 0, err
if resumeSpan != nil {
externalLocks = append(externalLocks, *resumeSpan)
resumeReason = kvpb.RESUME_BYTE_LIMIT
} else {
// !ok && resumeSpan == nil is a valid condition that means
// that no intent was found.
resolvedLocks = append(resolvedLocks, update)
// If requested, replace point tombstones with range tombstones.
if ok && evalCtx.EvalKnobs().UseRangeTombstonesForPointDeletes {
if err := storage.ReplacePointTombstonesWithRangeTombstones(
ctx, spanset.DisableReadWriterAssertions(readWriter),
ms, update.Key, update.EndKey); err != nil {
return 0, 0, 0, err
}
}
}
return numKeys, 0, 0, nil
return numKeys, numBytes, resumeReason, nil
}
// For update ranges, cut into parts inside and outside our key
// range. Resolve locally inside, delegate the rest. In particular,
Expand All @@ -619,8 +630,8 @@ func resolveLocalLocksWithPagination(
externalLocks = append(externalLocks, outSpans...)
if inSpan != nil {
update.Span = *inSpan
numKeys, _, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys})
numKeys, numBytes, resumeSpan, resumeReason, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update,
storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys, TargetBytes: targetBytes})
if err != nil {
return 0, 0, 0, err
}
Expand All @@ -640,12 +651,12 @@ func resolveLocalLocksWithPagination(
return 0, 0, 0, err
}
}
return numKeys, 0, resumeReason, nil
return numKeys, numBytes, resumeReason, nil
}
return 0, 0, 0, nil
}

numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, 0, false, f)
numKeys, _, _, err := storage.MVCCPaginate(ctx, maxKeys, targetBytes, false, f)
if err != nil {
return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status)
}
Expand Down
206 changes: 206 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package batcheval

import (
"context"
"fmt"
"regexp"
"testing"

Expand Down Expand Up @@ -1390,3 +1391,208 @@ func TestComputeSplitRangeKeyStatsDelta(t *testing.T) {
})
}
}

// TestResolveLocalLocks tests resolveLocalLocks for point and ranged intents
// as well as under a max key or max byte limit, ensuring the returned
// resolvedLocks, externalLocks, and numBytes are as expected.
func TestResolveLocalLocks(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
intToKey := func(i int) roachpb.Key {
return roachpb.Key(fmt.Sprintf("%01000d", i))
}
ceil := func(i int, j int) int {
return (i + j - 1) / j
}

const (
numKeys = 20
keysPerRangedLock = 4
maxKeys = 11 // not divisible by keysPerRangedLock
targetBytes = 11900
keysFromTargetBytes = 12 // divisible by keysPerRangedLock
)

pointLocks := make([]roachpb.Span, numKeys)
for i := range pointLocks {
pointLocks[i].Key = intToKey(i)
}
rangedLocks := make([]roachpb.Span, numKeys/keysPerRangedLock)
for i := range rangedLocks {
rangedLocks[i].Key = intToKey(i * keysPerRangedLock)
rangedLocks[i].EndKey = intToKey((i + 1) * keysPerRangedLock)
}

expectedResolvedLocksPointMaxKeys := make([]roachpb.Span, maxKeys)
for i := range expectedResolvedLocksPointMaxKeys {
expectedResolvedLocksPointMaxKeys[i].Key = intToKey(i)
}
expectedExternalLocksPointMaxKeys := make([]roachpb.Span, numKeys-maxKeys)
for i := range expectedExternalLocksPointMaxKeys {
expectedExternalLocksPointMaxKeys[i].Key = intToKey(i + maxKeys)
}

expectedResolvedLocksRangedMaxKeys := make([]roachpb.Span, ceil(maxKeys, keysPerRangedLock))
for i := range expectedResolvedLocksRangedMaxKeys {
expectedResolvedLocksRangedMaxKeys[i].Key = intToKey(i * keysPerRangedLock)
if i == len(expectedResolvedLocksRangedMaxKeys)-1 {
expectedResolvedLocksRangedMaxKeys[i].EndKey = intToKey(maxKeys - 1).Next()
} else {
expectedResolvedLocksRangedMaxKeys[i].EndKey = intToKey((i + 1) * keysPerRangedLock)
}
}
expectedExternalLocksRangedMaxKeys := make([]roachpb.Span, ceil(numKeys, keysPerRangedLock)-ceil(maxKeys, keysPerRangedLock)+1)
for i := range expectedExternalLocksRangedMaxKeys {
offset := maxKeys / keysPerRangedLock
if i == 0 {
expectedExternalLocksRangedMaxKeys[i].Key = intToKey(maxKeys - 1).Next()
} else {
expectedExternalLocksRangedMaxKeys[i].Key = intToKey((i + offset) * keysPerRangedLock)
}
expectedExternalLocksRangedMaxKeys[i].EndKey = intToKey((i + offset + 1) * keysPerRangedLock)
}

expectedResolvedLocksPointTargetBytes := make([]roachpb.Span, keysFromTargetBytes)
for i := range expectedResolvedLocksPointTargetBytes {
expectedResolvedLocksPointTargetBytes[i].Key = intToKey(i)
}
expectedExternalLocksPointTargetBytes := make([]roachpb.Span, numKeys-keysFromTargetBytes)
for i := range expectedExternalLocksPointTargetBytes {
expectedExternalLocksPointTargetBytes[i].Key = intToKey(i + keysFromTargetBytes)
}

expectedResolvedLocksRangedTargetBytes := make([]roachpb.Span, keysFromTargetBytes/keysPerRangedLock)
for i := range expectedResolvedLocksRangedTargetBytes {
expectedResolvedLocksRangedTargetBytes[i].Key = intToKey(i * keysPerRangedLock)
expectedResolvedLocksRangedTargetBytes[i].EndKey = intToKey((i + 1) * keysPerRangedLock)
}
expectedExternalLocksRangedTargetBytes := make([]roachpb.Span, ceil(numKeys, keysPerRangedLock)-keysFromTargetBytes/keysPerRangedLock)
for i := range expectedExternalLocksRangedTargetBytes {
offset := keysFromTargetBytes / keysPerRangedLock
expectedExternalLocksRangedTargetBytes[i].Key = intToKey((i + offset) * keysPerRangedLock)
expectedExternalLocksRangedTargetBytes[i].EndKey = intToKey((i + offset + 1) * keysPerRangedLock)
}

expectedResolvedLocksNoLimit := make([]roachpb.Span, numKeys)
for i := range expectedResolvedLocksNoLimit {
expectedResolvedLocksNoLimit[i].Key = intToKey(i)
}
expectedExternalLocksNoLimit := make([]roachpb.Span, 0)

testCases := []struct {
desc string
lockSpans []roachpb.Span
resolveAllowance int64
targetBytes int64
expectedResolvedLocks []roachpb.Span
expectedExternalLocks []roachpb.Span
}{
// Point intent resolution with a max keys limit. 20 point intents, 11
// become resolved locks and 9 become external locks.
{
desc: "Point locks with max keys",
lockSpans: pointLocks,
resolveAllowance: maxKeys,
targetBytes: 0,
expectedResolvedLocks: expectedResolvedLocksPointMaxKeys,
expectedExternalLocks: expectedExternalLocksPointMaxKeys,
},
// Ranged intent resolution with a max keys limit. 5 ranged locks (each
// containing 4 keys), 3 become resolved locks (containing the first 2
// locks and part of the 3rd lock) and 3 become external locks (containing
// the remaining part of the 3rd lock and the last 2 locks). Note that the
// max key limit splits in between the 3rd lock, so the resolved locks will
// contain the first part of the 3rd lock span and the external locks will
// contain the remaining part of the 3rd lock span.
{
desc: "Ranged locks with max keys",
lockSpans: rangedLocks,
resolveAllowance: maxKeys,
targetBytes: 0,
expectedResolvedLocks: expectedResolvedLocksRangedMaxKeys,
expectedExternalLocks: expectedExternalLocksRangedMaxKeys,
},
// Point intent resolution with a target bytes limit. 20 point intents, 12
// become resolved locks and 8 become external locks.
{
desc: "Point span with target bytes",
lockSpans: pointLocks,
resolveAllowance: 0,
targetBytes: targetBytes,
expectedResolvedLocks: expectedResolvedLocksPointTargetBytes,
expectedExternalLocks: expectedExternalLocksPointTargetBytes,
},
// Ranged intent resolution with a target bytes limit. 5 ranged locks (each
// containing 4 keys), 3 become resolved locks (containing the first 3
// locks) and 2 become external locks (containing the last 2 locks). Note
// that the target byte limit does not split in between any locks, so the
// resolved and external locks do not contain part of a lock span for any
// lock.
{
desc: "Ranged span with target bytes",
lockSpans: rangedLocks,
resolveAllowance: 0,
targetBytes: targetBytes,
expectedResolvedLocks: expectedResolvedLocksRangedTargetBytes,
expectedExternalLocks: expectedExternalLocksRangedTargetBytes,
},
// Point intent resolution without any limit. 20 point intents, 20 become
// resolved locks and 0 become external locks.
{
desc: "No key or byte limit",
lockSpans: pointLocks,
resolveAllowance: 0,
targetBytes: 0,
expectedResolvedLocks: expectedResolvedLocksNoLimit,
expectedExternalLocks: expectedExternalLocksNoLimit,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
db := storage.NewDefaultInMemForTesting()
defer db.Close()
batch := db.NewBatch()
defer batch.Close()

ts := hlc.Timestamp{WallTime: 1}
txn := roachpb.MakeTransaction("test", roachpb.Key("a"), 0, ts, 0, 1)
txn.Status = roachpb.COMMITTED

for i := 0; i < numKeys; i++ {
err := storage.MVCCPut(ctx, batch, nil, intToKey(i), ts, hlc.ClockTimestamp{}, roachpb.MakeValueFromString("a"), &txn)
require.NoError(t, err)
}
resolvedLocks, externalLocks, err := resolveLocalLocksWithPagination(
ctx,
&roachpb.RangeDescriptor{
StartKey: roachpb.RKeyMin,
EndKey: roachpb.RKeyMax,
},
batch,
nil,
&kvpb.EndTxnRequest{
LockSpans: tc.lockSpans,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{},
},
&txn,
(&MockEvalCtx{}).EvalContext(),
tc.resolveAllowance,
tc.targetBytes,
)
require.NoError(t, err)
require.Equal(t, len(tc.expectedResolvedLocks), len(resolvedLocks))
for i, lock := range resolvedLocks {
require.Equal(t, tc.expectedResolvedLocks[i].Key, lock.Key)
require.Equal(t, tc.expectedResolvedLocks[i].EndKey, lock.EndKey)
}
require.Equal(t, len(tc.expectedExternalLocks), len(externalLocks))
for i, lock := range externalLocks {
require.Equal(t, tc.expectedExternalLocks[i].Key, lock.Key)
require.Equal(t, tc.expectedExternalLocks[i].EndKey, lock.EndKey)
}
})
}
}
Loading

0 comments on commit f9992b6

Please sign in to comment.