diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index a367724e51f8..0930a0bd7a97 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -13,7 +13,6 @@ package batcheval import ( "bytes" "context" - "math" "sync/atomic" "time" @@ -32,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -517,6 +517,28 @@ func resolveLocalLocks( args *roachpb.EndTxnRequest, txn *roachpb.Transaction, evalCtx EvalContext, +) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) { + var resolveAllowance int64 = lockResolutionBatchSize + if args.InternalCommitTrigger != nil { + // If this is a system transaction (such as a split or merge), don't + // enforce the resolve allowance. These transactions rely on having + // their locks resolved synchronously. + resolveAllowance = 0 + } + return resolveLocalLocksWithPagination(ctx, desc, readWriter, ms, args, txn, evalCtx, resolveAllowance) +} + +// resolveLocalLocksWithPagination is resolveLocalLocks but with a max key +// limit. +func resolveLocalLocksWithPagination( + ctx context.Context, + desc *roachpb.RangeDescriptor, + readWriter storage.ReadWriter, + ms *enginepb.MVCCStats, + args *roachpb.EndTxnRequest, + txn *roachpb.Transaction, + evalCtx EvalContext, + maxKeys int64, ) (resolvedLocks []roachpb.LockUpdate, externalLocks []roachpb.Span, _ error) { if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil { // If this is a merge, then use the post-merge descriptor to determine @@ -525,19 +547,14 @@ func resolveLocalLocks( desc = &mergeTrigger.LeftDesc } - var resolveAllowance int64 = lockResolutionBatchSize - if args.InternalCommitTrigger != nil { - // If this is a system transaction (such as a split or merge), don't enforce the resolve allowance. - // These transactions rely on having their locks resolved synchronously. - resolveAllowance = math.MaxInt64 - } - - for _, span := range args.LockSpans { + i := 0 + f := func(maxKeys, targetBytes int64) (numKeys int64, numBytes int64, resumeSpan *roachpb.Span, err error) { + if i >= len(args.LockSpans) { + return 0, 0, nil, iterutil.StopIteration() + } + span := args.LockSpans[i] + i++ if err := func() error { - if resolveAllowance == 0 { - externalLocks = append(externalLocks, span) - return nil - } update := roachpb.MakeLockUpdate(txn, span) if len(span.EndKey) == 0 { // For single-key lock updates, do a KeyAddress-aware check of @@ -560,7 +577,7 @@ func resolveLocalLocks( return err } if ok { - resolveAllowance-- + numKeys = 1 } resolvedLocks = append(resolvedLocks, update) // If requested, replace point tombstones with range tombstones. @@ -580,19 +597,15 @@ func resolveLocalLocks( externalLocks = append(externalLocks, outSpans...) if inSpan != nil { update.Span = *inSpan - numKeys, _, resumeSpan, _, err := storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, - storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: resolveAllowance}) + numKeys, _, resumeSpan, _, err = storage.MVCCResolveWriteIntentRange(ctx, readWriter, ms, update, + storage.MVCCResolveWriteIntentRangeOptions{MaxKeys: maxKeys}) if err != nil { return err } if evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution != nil { atomic.AddInt64(evalCtx.EvalKnobs().NumKeysEvaluatedForRangeIntentResolution, numKeys) } - resolveAllowance -= numKeys if resumeSpan != nil { - if resolveAllowance != 0 { - log.Fatalf(ctx, "expected resolve allowance to be exactly 0 resolving %s; got %d", update.Span, resolveAllowance) - } update.EndKey = resumeSpan.Key externalLocks = append(externalLocks, *resumeSpan) } @@ -609,11 +622,19 @@ func resolveLocalLocks( } return nil }(); err != nil { - return nil, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status) + return 0, 0, nil, errors.Wrapf(err, "resolving lock at %s on end transaction [%s]", span, txn.Status) } + return numKeys, numBytes, resumeSpan, nil + } + + numKeys, _, _, err := storage.MVCCPagination(ctx, maxKeys, 0, f) + if err != nil { + return nil, nil, err } - removedAny := resolveAllowance != lockResolutionBatchSize + externalLocks = append(externalLocks, args.LockSpans[i:]...) + + removedAny := numKeys > 0 if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) { if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil { return nil, nil, err diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 41fafd5e5346..3c9178816a9a 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -4086,9 +4086,10 @@ func MVCCIterate( return intents, nil } -// MVCCPagination invokes f() until it returns done (i.e. we have iterated -// through all elements) or an error, or until the number of keys hits the -// maxKeys limit or the number of bytes hits the targetBytes limit. +// MVCCPagination invokes f() until it returns an error (note that the +// iterutil.StopIteration() error means we have iterated through all elements), +// or until the number of keys hits the maxKeys limit or the number of bytes +// hits the targetBytes limit. func MVCCPagination( ctx context.Context, maxKeys, targetBytes int64,