diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go index 18a84383ceb9..fa0dac2b40d3 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable.go @@ -222,7 +222,7 @@ func EvalAddSSTable( } var statsDelta enginepb.MVCCStats - maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) checkConflicts := args.DisallowConflicts || args.DisallowShadowing || !args.DisallowShadowingBelow.IsEmpty() if checkConflicts { @@ -258,7 +258,7 @@ func EvalAddSSTable( log.VEventf(ctx, 2, "checking conflicts for SSTable [%s,%s)", start.Key, end.Key) statsDelta, err = storage.CheckSSTConflicts(ctx, sst, readWriter, start, end, leftPeekBound, rightPeekBound, - args.DisallowShadowing, args.DisallowShadowingBelow, sstTimestamp, maxIntents, usePrefixSeek) + args.DisallowShadowing, args.DisallowShadowingBelow, sstTimestamp, maxLockConflicts, usePrefixSeek) statsDelta.Add(sstReqStatsDelta) if err != nil { return result.Result{}, errors.Wrap(err, "checking for key collisions") @@ -269,7 +269,7 @@ func EvalAddSSTable( // caller is expected to make sure there are no writers across the span, // and thus no or few locks, so this is cheap in the common case. log.VEventf(ctx, 2, "checking conflicting locks for SSTable [%s,%s)", start.Key, end.Key) - locks, err := storage.ScanLocks(ctx, readWriter, start.Key, end.Key, maxIntents, 0) + locks, err := storage.ScanLocks(ctx, readWriter, start.Key, end.Key, maxLockConflicts, 0) if err != nil { return result.Result{}, errors.Wrap(err, "scanning locks") } else if len(locks) > 0 { diff --git a/pkg/kv/kvserver/batcheval/cmd_clear_range.go b/pkg/kv/kvserver/batcheval/cmd_clear_range.go index b725058e9484..f3c3d0c742c2 100644 --- a/pkg/kv/kvserver/batcheval/cmd_clear_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_clear_range.go @@ -112,8 +112,8 @@ func ClearRange( // txns. Otherwise, txn recovery would fail to find these intents and // consider the txn incomplete, uncommitting it and its writes (even those // outside of the cleared range). - maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) - locks, err := storage.ScanLocks(ctx, readWriter, from, to, maxIntents, 0) + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) + locks, err := storage.ScanLocks(ctx, readWriter, from, to, maxLockConflicts, 0) if err != nil { return result.Result{}, err } else if len(locks) > 0 { diff --git a/pkg/kv/kvserver/batcheval/cmd_delete_range.go b/pkg/kv/kvserver/batcheval/cmd_delete_range.go index 8158507d0c3f..cb805b803375 100644 --- a/pkg/kv/kvserver/batcheval/cmd_delete_range.go +++ b/pkg/kv/kvserver/batcheval/cmd_delete_range.go @@ -152,7 +152,7 @@ func DeleteRange( leftPeekBound, rightPeekBound := rangeTombstonePeekBounds( args.Key, args.EndKey, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey()) - maxIntents := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV) // If no predicate parameters are passed, use the fast path. If we're // deleting the entire Raft range, use an even faster path that avoids a @@ -167,7 +167,7 @@ func DeleteRange( } if err := storage.MVCCDeleteRangeUsingTombstone(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, - args.IdempotentTombstone, maxIntents, statsCovered); err != nil { + args.IdempotentTombstone, maxLockConflicts, statsCovered); err != nil { return result.Result{}, err } var res result.Result @@ -197,7 +197,7 @@ func DeleteRange( resumeSpan, err := storage.MVCCPredicateDeleteRange(ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, h.Timestamp, cArgs.Now, leftPeekBound, rightPeekBound, args.Predicates, h.MaxSpanRequestKeys, maxDeleteRangeBatchBytes, - defaultRangeTombstoneThreshold, maxIntents) + defaultRangeTombstoneThreshold, maxLockConflicts) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 259c03756926..1ed47e7c3c2d 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -154,9 +154,9 @@ func evalExport( maxSize = targetSize + uint64(allowedOverage) } - var maxIntents uint64 - if m := storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV); m > 0 { - maxIntents = uint64(m) + var maxLockConflicts uint64 + if m := storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV); m > 0 { + maxLockConflicts = uint64(m) } // Only use resume timestamp if splitting mid key is enabled. @@ -184,7 +184,7 @@ func evalExport( ExportAllRevisions: exportAllRevisions, TargetSize: targetSize, MaxSize: maxSize, - MaxIntents: maxIntents, + MaxLockConflicts: maxLockConflicts, StopMidKey: args.SplitMidKey, } var summary kvpb.BulkOpSummary diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 18115de51ba9..6deb362cf52c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -46,7 +46,7 @@ func ReverseScan( ScanStats: cArgs.ScanStats, Uncertainty: cArgs.Uncertainty, MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetBytes: h.TargetBytes, AllowEmpty: h.AllowEmpty, WholeRowsOfSize: h.WholeRowsOfSize, diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 0307058698ed..11902ec2129b 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -46,7 +46,7 @@ func Scan( ScanStats: cArgs.ScanStats, Uncertainty: cArgs.Uncertainty, MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + MaxLockConflicts: storage.MaxConflictsPerLockConflictError.Get(&cArgs.EvalCtx.ClusterSettings().SV), TargetBytes: h.TargetBytes, AllowEmpty: h.AllowEmpty, WholeRowsOfSize: h.WholeRowsOfSize, diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 46f63da881ed..954f6c14e6de 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -303,7 +303,7 @@ func (r *Replica) canDropLatchesBeforeEval( ctx, 3, "can drop latches early for batch (%v); scanning lock table first to detect conflicts", ba, ) - maxIntents := storage.MaxIntentsPerLockConflictError.Get(&r.store.cfg.Settings.SV) + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&r.store.cfg.Settings.SV) var intents []roachpb.Intent // Check if any of the requests within the batch need to resolve any intents // or if any of them need to use an intent interleaving iterator. @@ -315,7 +315,7 @@ func (r *Replica) canDropLatchesBeforeEval( txnID = ba.Txn.ID } needsIntentInterleavingForThisRequest, err := storage.ScanConflictingIntentsForDroppingLatchesEarly( - ctx, rw, txnID, ba.Header.Timestamp, start, end, &intents, maxIntents, + ctx, rw, txnID, ba.Header.Timestamp, start, end, &intents, maxLockConflicts, ) if err != nil { return false /* ok */, true /* stillNeedsIntentInterleaving */, kvpb.NewError( @@ -323,7 +323,7 @@ func (r *Replica) canDropLatchesBeforeEval( ) } stillNeedsIntentInterleaving = stillNeedsIntentInterleaving || needsIntentInterleavingForThisRequest - if maxIntents != 0 && int64(len(intents)) >= maxIntents { + if maxLockConflicts != 0 && int64(len(intents)) >= maxLockConflicts { break } } diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 9d3db54f95ff..3783475b0e7c 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -2306,9 +2306,9 @@ func TestStoreScanIntents(t *testing.T) { // limits. // // The test proceeds as follows: a writer lays down more than -// `MaxIntentsPerLockConflictError` intents, and a reader is expected to +// `MaxConflictsPerLockConflictError` intents, and a reader is expected to // encounter these intents and raise a `LockConflictError` with exactly -// `MaxIntentsPerLockConflictError` intents in the error. +// `MaxConflictsPerLockConflictError` intents in the error. func TestStoreScanIntentsRespectsLimit(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -2331,10 +2331,10 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) { ctx context.Context, ba *kvpb.BatchRequest, pErr *kvpb.Error, ) { if errors.HasType(pErr.GoError(), (*kvpb.LockConflictError)(nil)) { - // Assert that the LockConflictError has MaxIntentsPerLockConflictError intents. + // Assert that the LockConflictError has MaxConflictsPerLockConflictError intents. if trap := interceptLockConflictErrors.Load(); trap != nil && trap.(bool) { require.Equal( - t, storage.MaxIntentsPerLockConflictErrorDefault, + t, storage.MaxConflictsPerLockConflictErrorDefault, len(pErr.GetDetail().(*kvpb.LockConflictError).Locks), ) interceptLockConflictErrors.Store(false) @@ -2356,13 +2356,13 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) { var wg sync.WaitGroup wg.Add(2) - // Lay down more than `MaxIntentsPerLockConflictErrorDefault` intents. + // Lay down more than `MaxConflictsPerLockConflictErrorDefault` intents. go func() { defer wg.Done() txn := newTransaction( "test", roachpb.Key("test-key"), roachpb.NormalUserPriority, tc.Server(0).Clock(), ) - for j := 0; j < storage.MaxIntentsPerLockConflictErrorDefault+10; j++ { + for j := 0; j < storage.MaxConflictsPerLockConflictErrorDefault+10; j++ { var key roachpb.Key key = append(key, keys.ScratchRangeMin...) key = append(key, []byte(fmt.Sprintf("%d", j))...) @@ -2385,10 +2385,10 @@ func TestStoreScanIntentsRespectsLimit(t *testing.T) { } // Now, expect a conflicting reader to encounter the intents and raise a - // LockConflictError with exactly `MaxIntentsPerLockConflictErrorDefault` + // LockConflictError with exactly `MaxConflictsPerLockConflictErrorDefault` // intents. See the TestingConcurrencyRetryFilter above. var ba kv.Batch - for i := 0; i < storage.MaxIntentsPerLockConflictErrorDefault+10; i += 10 { + for i := 0; i < storage.MaxConflictsPerLockConflictErrorDefault+10; i += 10 { for _, key := range intentKeys[i : i+10] { args := getArgs(key) ba.AddRawRequest(&args) diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index be2106cb3629..bd5e4512e76f 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -214,6 +214,10 @@ type FlowBase struct { // goroutines. startedGoroutines bool + // headProcStarted tracks whether Start was called on the "head" processor + // in Run. + headProcStarted bool + // inboundStreams are streams that receive data from other hosts; this map // is to be passed to FlowRegistry.RegisterFlow. This map is populated in // Flow.Setup(), so it is safe to lookup into concurrently later. @@ -571,6 +575,7 @@ func (f *FlowBase) Run(ctx context.Context, noWait bool) { } f.resumeCtx = ctx log.VEventf(ctx, 1, "running %T in the flow's goroutine", headProc) + f.headProcStarted = true headProc.Run(ctx, headOutput) } @@ -661,17 +666,26 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) { // ConsumerClosed on the source (i.e. the "head" processor). // // The method is only called if: +// - the flow is local (pausable portals currently don't support DistSQL) // - there is exactly 1 processor in the flow that runs in its own goroutine // (which is always the case for pausable portal model at this time) +// - Start was called on that processor (ConsumerClosed is only valid to be +// called after Start) // - that single processor implements execinfra.RowSource interface (those // processors that don't implement it shouldn't be running through pausable // portal model). // // Otherwise, this method is a noop. func (f *FlowBase) ConsumerClosedOnHeadProc() { + if !f.IsLocal() { + return + } if len(f.processors) != 1 { return } + if !f.headProcStarted { + return + } rs, ok := f.processors[0].(execinfra.RowSource) if !ok { return diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 64931cba2d14..9697d654bc2e 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1945,17 +1945,21 @@ func assertMVCCIteratorInvariants(iter MVCCIterator) error { return errors.AssertionFailedf("unknown type for engine key %s", engineKey) } - // Value must equal UnsafeValue. - u, err := iter.UnsafeValue() - if err != nil { - return err - } - v, err := iter.Value() - if err != nil { - return err - } - if !bytes.Equal(v, u) { - return errors.AssertionFailedf("Value %x does not match UnsafeValue %x at %s", v, u, key) + // If the iterator position has a point key, Value must equal UnsafeValue. + // NB: It's only valid to read an iterator's Value if the iterator is + // positioned at a point key. + if hasPoint, _ := iter.HasPointAndRange(); hasPoint { + u, err := iter.UnsafeValue() + if err != nil { + return err + } + v, err := iter.Value() + if err != nil { + return err + } + if !bytes.Equal(v, u) { + return errors.AssertionFailedf("Value %x does not match UnsafeValue %x at %s", v, u, key) + } } // For prefix iterators, any range keys must be point-sized. We've already @@ -1994,7 +1998,7 @@ func ScanConflictingIntentsForDroppingLatchesEarly( ts hlc.Timestamp, start, end roachpb.Key, intents *[]roachpb.Intent, - maxIntents int64, + maxLockConflicts int64, ) (needIntentHistory bool, err error) { if err := ctx.Err(); err != nil { return false, err @@ -2031,7 +2035,7 @@ func ScanConflictingIntentsForDroppingLatchesEarly( var meta enginepb.MVCCMetadata var ok bool for ok, err = iter.SeekEngineKeyGE(EngineKey{Key: ltStart}); ok; ok, err = iter.NextEngineKey() { - if maxIntents != 0 && int64(len(*intents)) >= maxIntents { + if maxLockConflicts != 0 && int64(len(*intents)) >= maxLockConflicts { // Return early if we're done accumulating intents; make no claims about // not needing intent history. return true /* needsIntentHistory */, nil diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index 2748166a7d4e..c0a3c7e21289 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -2264,7 +2264,7 @@ func TestScanConflictingIntentsForDroppingLatchesEarly(t *testing.T) { tc.start, tc.end, &intents, - 0, /* maxIntents */ + 0, /* maxLockConflicts */ ) if tc.expErr != "" { require.Error(t, err) @@ -2486,7 +2486,7 @@ func TestScanConflictingIntentsForDroppingLatchesEarlyReadYourOwnWrites(t *testi keyA, nil, &intents, - 0, /* maxIntents */ + 0, /* maxLockConflicts */ ) require.NoError(t, err) if alwaysFallbackToIntentInterleavingIteratorForReadYourOwnWrites { diff --git a/pkg/storage/metamorphic/operations.go b/pkg/storage/metamorphic/operations.go index 23775952d99c..b65ad888de00 100644 --- a/pkg/storage/metamorphic/operations.go +++ b/pkg/storage/metamorphic/operations.go @@ -335,7 +335,7 @@ func (m mvccDeleteRangeUsingRangeTombstoneOp) run(ctx context.Context) string { } err := storage.MVCCDeleteRangeUsingTombstone(ctx, writer, nil, m.key, m.endKey, m.ts, - hlc.ClockTimestamp{}, m.key, m.endKey, false /* idempotent */, math.MaxInt64, /* maxIntents */ + hlc.ClockTimestamp{}, m.key, m.endKey, false /* idempotent */, math.MaxInt64, /* maxLockConflicts */ nil /* msCovered */) if err != nil { return fmt.Sprintf("error: %s", err) diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 2afb6fd18e1c..597d1b71fdf2 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -61,12 +61,11 @@ const ( // minimum total for a single store node must be under 2048 for Windows // compatibility. MinimumMaxOpenFiles = 1700 - // MaxIntentsPerLockConflictErrorDefault is the default value for maximum - // number of intents reported by ExportToSST and Scan operations in - // LockConflictError is set to half of the maximum lock table size. This - // value is subject to tuning in real environment as we have more data - // available. - MaxIntentsPerLockConflictErrorDefault = 5000 + // MaxConflictsPerLockConflictErrorDefault is the default value for maximum + // number of locks reported by ExportToSST and Scan operations in + // LockConflictError is set to half of the maximum lock table size. This value + // is subject to tuning in real environment as we have more data available. + MaxConflictsPerLockConflictErrorDefault = 5000 ) var minWALSyncInterval = settings.RegisterDurationSetting( @@ -108,15 +107,15 @@ func CanUseMVCCRangeTombstones(ctx context.Context, st *cluster.Settings) bool { MVCCRangeTombstonesEnabledInMixedClusters.Get(&st.SV) } -// MaxIntentsPerLockConflictError sets maximum number of intents returned in -// LockConflictError in operations that return multiple intents per error. +// MaxConflictsPerLockConflictError sets maximum number of locks returned in +// LockConflictError in operations that return multiple locks per error. // Currently it is used in Scan, ReverseScan, and ExportToSST. -// TODO(nvanbenschoten): rename to MaxLocksPerLockConflictError. -var MaxIntentsPerLockConflictError = settings.RegisterIntSetting( +var MaxConflictsPerLockConflictError = settings.RegisterIntSetting( settings.TenantWritable, "storage.mvcc.max_intents_per_error", - "maximum number of intents returned in error during export of scan requests", - MaxIntentsPerLockConflictErrorDefault, + "maximum number of locks returned in error during export or scan requests", + MaxConflictsPerLockConflictErrorDefault, + settings.WithName("storage.mvcc.max_conflicts_per_lock_conflict_error"), ) var rocksdbConcurrency = envutil.EnvOrDefaultInt( @@ -3274,7 +3273,7 @@ func MVCCDeleteRange( // // This operation is non-transactional, but will check for existing intents in // the target key span, regardless of timestamp, and return a LockConflictError -// containing up to maxIntents intents. +// containing up to maxLockConflicts locks. // // MVCCPredicateDeleteRange will return with a resumeSpan if the number of tombstones // written exceeds maxBatchSize or the size of the written tombstones exceeds maxByteSize. @@ -3307,7 +3306,7 @@ func MVCCPredicateDeleteRange( predicates kvpb.DeleteRangePredicates, maxBatchSize, maxBatchByteSize int64, rangeTombstoneThreshold int64, - maxIntents int64, + maxLockConflicts int64, ) (*roachpb.Span, error) { if maxBatchSize == 0 { @@ -3337,7 +3336,7 @@ func MVCCPredicateDeleteRange( } // Check for any overlapping locks, and return them to be resolved. - if locks, err := ScanLocks(ctx, rw, startKey, endKey, maxIntents, 0); err != nil { + if locks, err := ScanLocks(ctx, rw, startKey, endKey, maxLockConflicts, 0); err != nil { return nil, err } else if len(locks) > 0 { return nil, &kvpb.LockConflictError{Locks: locks} @@ -3435,7 +3434,7 @@ func MVCCPredicateDeleteRange( batchByteSize+runByteSize >= maxBatchByteSize { if err := MVCCDeleteRangeUsingTombstone(ctx, rw, ms, runStart, runEnd.Next(), endTime, localTimestamp, leftPeekBound, rightPeekBound, - false /* idempotent */, maxIntents, nil); err != nil { + false /* idempotent */, maxLockConflicts, nil); err != nil { return err } batchByteSize += int64(MVCCRangeKey{StartKey: runStart, EndKey: runEnd, Timestamp: endTime}.EncodedSize()) @@ -3576,8 +3575,8 @@ func MVCCPredicateDeleteRange( // MVCCDeleteRangeUsingTombstone deletes the given MVCC keyspan at the given // timestamp using an MVCC range tombstone (rather than MVCC point tombstones). // This operation is non-transactional, but will check for existing intents and -// return a LockConflictError containing up to maxIntents intents. Can't be used -// across local keyspace. +// return a LockConflictError containing up to maxLockConflicts locks. Can't be +// used across local keyspace. // // The leftPeekBound and rightPeekBound parameters are used when looking for // range tombstones that we'll merge or overlap with. These are provided to @@ -3609,7 +3608,7 @@ func MVCCDeleteRangeUsingTombstone( localTimestamp hlc.ClockTimestamp, leftPeekBound, rightPeekBound roachpb.Key, idempotent bool, - maxIntents int64, + maxLockConflicts int64, msCovered *enginepb.MVCCStats, ) error { // Validate the range key. We must do this first, to catch e.g. any bound violations. @@ -3642,7 +3641,7 @@ func MVCCDeleteRangeUsingTombstone( } // Check for any overlapping locks, and return them to be resolved. - if locks, err := ScanLocks(ctx, rw, startKey, endKey, maxIntents, 0); err != nil { + if locks, err := ScanLocks(ctx, rw, startKey, endKey, maxLockConflicts, 0); err != nil { return err } else if len(locks) > 0 { return &kvpb.LockConflictError{Locks: locks} @@ -3955,7 +3954,7 @@ func mvccScanInit( targetBytes: opts.TargetBytes, allowEmpty: opts.AllowEmpty, wholeRows: opts.WholeRowsOfSize > 1, // single-KV rows don't need processing - maxIntents: opts.MaxIntents, + maxLockConflicts: opts.MaxLockConflicts, inconsistent: opts.Inconsistent, skipLocked: opts.SkipLocked, tombstones: opts.Tombstones, @@ -4154,12 +4153,12 @@ type MVCCScanOptions struct { // and AllowEmpty is false, in which case the remaining KV pairs of the row // will be fetched and returned too. WholeRowsOfSize int32 - // MaxIntents is a maximum number of intents collected by scanner in - // consistent mode before returning LockConflictError. + // MaxLockConflicts is a maximum number of locks (intents) collected by + // scanner in consistent mode before returning LockConflictError. // // Not used in inconsistent scans. // The zero value indicates no limit. - MaxIntents int64 + MaxLockConflicts int64 // MemoryAccount is used for tracking memory allocations. MemoryAccount *mon.BoundAccount // LockTable is used to determine whether keys are locked in the in-memory @@ -5415,12 +5414,12 @@ func MVCCCheckForAcquireLock( txn *roachpb.Transaction, str lock.Strength, key roachpb.Key, - maxConflicts int64, + maxLockConflicts int64, ) error { if err := validateLockAcquisition(txn, str); err != nil { return err } - ltScanner, err := newLockTableKeyScanner(reader, txn, str, maxConflicts) + ltScanner, err := newLockTableKeyScanner(reader, txn, str, maxLockConflicts) if err != nil { return err } @@ -5440,12 +5439,12 @@ func MVCCAcquireLock( str lock.Strength, key roachpb.Key, ms *enginepb.MVCCStats, - maxConflicts int64, + maxLockConflicts int64, ) error { if err := validateLockAcquisition(txn, str); err != nil { return err } - ltScanner, err := newLockTableKeyScanner(rw, txn, str, maxConflicts) + ltScanner, err := newLockTableKeyScanner(rw, txn, str, maxLockConflicts) if err != nil { return err } @@ -7254,7 +7253,7 @@ func mvccExportToWriter( // If we do it means this export can't complete and is aborted. We need to loop over remaining data // to collect all matching intents before returning them in an error to the caller. if iter.NumCollectedIntents() > 0 { - for uint64(iter.NumCollectedIntents()) < opts.MaxIntents { + for uint64(iter.NumCollectedIntents()) < opts.MaxLockConflicts { iter.NextKey() // If we encounter other errors during intent collection, we return our original write intent failure. // We would find this new error again upon retry. @@ -7328,12 +7327,12 @@ type MVCCExportOptions struct { // to an SST that exceeds maxSize, an error will be returned. This parameter // exists to prevent creating SSTs which are too large to be used. MaxSize uint64 - // MaxIntents specifies the number of intents to collect and return in a - // LockConflictError. The caller will likely resolve the returned intents and - // retry the call, which would be quadratic, so this significantly reduces the - // overall number of scans. 0 disables batching and returns the first intent, - // pass math.MaxUint64 to collect all. - MaxIntents uint64 + // MaxLockConflicts specifies the number of locks (intents) to collect and + // return in a LockConflictError. The caller will likely resolve the returned + // intents and retry the call, which would be quadratic, so this significantly + // reduces the overall number of scans. 0 disables batching and returns the + // first intent, pass math.MaxUint64 to collect all. + MaxLockConflicts uint64 // If StopMidKey is false, once function reaches targetSize it would continue // adding all versions until it reaches next key or end of range. If true, it // would stop immediately when targetSize is reached and return the next versions diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 0a2189ddec48..106daa83146c 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -101,7 +101,7 @@ var ( // put_blind_inline k= v= [prev=] // get [t=] [ts=[,]] [resolve [status=]] k= [inconsistent] [skipLocked] [tombstones] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [maxKeys=] [targetBytes=] [allowEmpty] // scan [t=] [ts=[,]] [resolve [status=]] k= [end=] [inconsistent] [skipLocked] [tombstones] [reverse] [failOnMoreRecent] [localUncertaintyLimit=[,]] [globalUncertaintyLimit=[,]] [max=] [targetbytes=] [wholeRows[=]] [allowEmpty] -// export [k=] [end=] [ts=[,]] [kTs=[,]] [startTs=[,]] [maxIntents=] [allRevisions] [targetSize=] [maxSize=] [stopMidKey] [fingerprint] +// export [k=] [end=] [ts=[,]] [kTs=[,]] [startTs=[,]] [maxLockConflicts=] [allRevisions] [targetSize=] [maxSize=] [stopMidKey] [fingerprint] // // iter_new [k=] [end=] [prefix] [kind=key|keyAndIntents] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [maskBelow=[,]] // iter_new_incremental [k=] [end=] [startTs=[,]] [endTs=[,]] [types=pointsOnly|pointsWithRanges|pointsAndRanges|rangesOnly] [maskBelow=[,]] [intents=error|aggregate|emit] @@ -1531,8 +1531,8 @@ func cmdExport(e *evalCtx) error { StripIndexPrefixAndTimestamp: e.hasArg("stripped"), }, } - if e.hasArg("maxIntents") { - e.scanArg("maxIntents", &opts.MaxIntents) + if e.hasArg("maxLockConflicts") { + e.scanArg("maxLockConflicts", &opts.MaxLockConflicts) } if e.hasArg("targetSize") { e.scanArg("targetSize", &opts.TargetSize) diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index 5f81cee77624..5b563b2a3ff8 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -201,7 +201,7 @@ func assertExportedErrs( ExportAllRevisions: revisions, TargetSize: big, MaxSize: big, - MaxIntents: uint64(MaxIntentsPerLockConflictError.Default()), + MaxLockConflicts: uint64(MaxConflictsPerLockConflictError.Default()), StopMidKey: false, }, &bytes.Buffer{}) require.Error(t, err) diff --git a/pkg/storage/mvcc_stats_test.go b/pkg/storage/mvcc_stats_test.go index e5cee4f4ac1f..fd9b1c975319 100644 --- a/pkg/storage/mvcc_stats_test.go +++ b/pkg/storage/mvcc_stats_test.go @@ -1724,20 +1724,20 @@ func TestMVCCStatsRandomized(t *testing.T) { desc = fmt.Sprintf("mvccDeleteRangeUsingTombstone=%s", roachpb.Span{Key: mvccRangeDelKey, EndKey: mvccRangeDelEndKey}) const idempotent = false - const maxIntents = 0 // unlimited + const maxLockConflicts = 0 // unlimited msCovered := (*enginepb.MVCCStats)(nil) err = MVCCDeleteRangeUsingTombstone( ctx, s.batch, s.MSDelta, mvccRangeDelKey, mvccRangeDelEndKey, s.TS, hlc.ClockTimestamp{}, nil, /* leftPeekBound */ - nil /* rightPeekBound */, idempotent, maxIntents, msCovered, + nil /* rightPeekBound */, idempotent, maxLockConflicts, msCovered, ) } else { rangeTombstoneThreshold := s.rng.Int63n(5) desc = fmt.Sprintf("mvccPredicateDeleteRange=%s, predicates=%s, rangeTombstoneThreshold=%d", roachpb.Span{Key: mvccRangeDelKey, EndKey: mvccRangeDelEndKey}, predicates, rangeTombstoneThreshold) - const maxIntents = 0 // unlimited + const maxLockConflicts = 0 // unlimited _, err = MVCCPredicateDeleteRange(ctx, s.batch, s.MSDelta, mvccRangeDelKey, mvccRangeDelEndKey, s.TS, hlc.ClockTimestamp{}, nil /* leftPeekBound */, nil, /* rightPeekBound */ - predicates, 0, 0, rangeTombstoneThreshold, maxIntents) + predicates, 0, 0, rangeTombstoneThreshold, maxLockConflicts) } if err != nil { return false, desc + ": " + err.Error() diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 61f0ee2f9672..a471aac1fd5c 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -597,7 +597,7 @@ func TestMVCCScanLockConflictError(t *testing.T) { for _, scan := range scanCases { t.Run(scan.name, func(t *testing.T) { res, err := MVCCScan(ctx, engine, testKey1, testKey6.Next(), - hlc.Timestamp{WallTime: 1}, MVCCScanOptions{Inconsistent: !scan.consistent, Txn: scan.txn, MaxIntents: 2}) + hlc.Timestamp{WallTime: 1}, MVCCScanOptions{Inconsistent: !scan.consistent, Txn: scan.txn, MaxLockConflicts: 2}) var lcErr *kvpb.LockConflictError _ = errors.As(err, &lcErr) if (err == nil) != (lcErr == nil) { @@ -6635,7 +6635,7 @@ func TestMVCCExportToSSTFailureIntentBatching(t *testing.T) { ExportAllRevisions: true, TargetSize: 0, MaxSize: 0, - MaxIntents: uint64(MaxIntentsPerLockConflictError.Default()), + MaxLockConflicts: uint64(MaxConflictsPerLockConflictError.Default()), StopMidKey: false, }, &bytes.Buffer{}) if len(expectedIntentIndices) == 0 { @@ -6655,7 +6655,7 @@ func TestMVCCExportToSSTFailureIntentBatching(t *testing.T) { } // Export range is fixed to k:["00010", "10000"), ts:(999, 2000] for all tests. - testDataCount := int(MaxIntentsPerLockConflictError.Default() + 1) + testDataCount := int(MaxConflictsPerLockConflictError.Default() + 1) testData := make([]testValue, testDataCount*2) expectedErrors := make([]int, testDataCount) for i := 0; i < testDataCount; i++ { @@ -6663,7 +6663,7 @@ func TestMVCCExportToSSTFailureIntentBatching(t *testing.T) { testData[i*2+1] = intent(key(i*2+12), "intent", ts(1001)) expectedErrors[i] = i*2 + 1 } - t.Run("Receive no more than limit intents", checkReportedErrors(testData, expectedErrors[:MaxIntentsPerLockConflictError.Default()])) + t.Run("Receive no more than limit intents", checkReportedErrors(testData, expectedErrors[:MaxConflictsPerLockConflictError.Default()])) } // TestMVCCExportToSSTSplitMidKey verifies that split mid key in exports will diff --git a/pkg/storage/pebble_mvcc_scanner.go b/pkg/storage/pebble_mvcc_scanner.go index de09c4bf9fbf..3524c35698ce 100644 --- a/pkg/storage/pebble_mvcc_scanner.go +++ b/pkg/storage/pebble_mvcc_scanner.go @@ -408,12 +408,12 @@ type pebbleMVCCScanner struct { // allowEmpty is false, and the partial row is the first row in the result, // the row will instead be completed by fetching additional KV pairs. wholeRows bool - // Stop adding intents and abort scan once maxIntents threshold is reached. - // This limit is only applicable to consistent scans since they return - // intents as an error. + // Stop adding intents and abort scan once maxLockConflicts threshold is + // reached. This limit is only applicable to consistent scans since they + // return intents as an error. // Not used in inconsistent scans. // Ignored if zero. - maxIntents int64 + maxLockConflicts int64 // Resume fields describe the resume span to return. resumeReason must be set // to a non-zero value to return a resume span, the others are optional. resumeReason kvpb.ResumeReason @@ -982,7 +982,7 @@ func (p *pebbleMVCCScanner) getOne(ctx context.Context) (ok, added bool) { // may want to resolve it. Unlike below, this intent will not result in // a LockConflictError because MVCC{Scan,Get}Options.errOnIntents returns // false when skipLocked in enabled. - if p.maxIntents == 0 || int64(p.intents.Count()) < p.maxIntents { + if p.maxLockConflicts == 0 || int64(p.intents.Count()) < p.maxLockConflicts { if !p.addCurIntent(ctx) { return false, false } @@ -1005,7 +1005,7 @@ func (p *pebbleMVCCScanner) getOne(ctx context.Context) (ok, added bool) { return false, false } // Limit number of intents returned in lock conflict error. - if p.maxIntents > 0 && int64(p.intents.Count()) >= p.maxIntents { + if p.maxLockConflicts > 0 && int64(p.intents.Count()) >= p.maxLockConflicts { p.resumeReason = kvpb.RESUME_INTENT_LIMIT return false, false } @@ -1816,7 +1816,7 @@ func (p *pebbleMVCCScanner) isKeyLockedByConflictingTxn( } if ok { // The key is locked or reserved, so ignore it. - if txn != nil && (p.maxIntents == 0 || int64(p.intents.Count()) < p.maxIntents) { + if txn != nil && (p.maxLockConflicts == 0 || int64(p.intents.Count()) < p.maxLockConflicts) { // However, if the key is locked, we return the lock holder separately // (if we have room); the caller may want to resolve it. if !p.addKeyAndMetaAsIntent(ctx, key, txn) { diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index 4b156f197cc9..a8108efafdca 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -112,7 +112,7 @@ func CheckSSTConflicts( disallowShadowing bool, disallowShadowingBelow hlc.Timestamp, sstTimestamp hlc.Timestamp, - maxIntents int64, + maxLockConflicts int64, usePrefixSeek bool, ) (enginepb.MVCCStats, error) { @@ -281,7 +281,7 @@ func CheckSSTConflicts( // would be quadratic, so this significantly reduces the overall number // of scans. intents = append(intents, roachpb.MakeIntent(mvccMeta.Txn, extIter.UnsafeKey().Key.Clone())) - if int64(len(intents)) >= maxIntents { + if int64(len(intents)) >= maxLockConflicts { return &kvpb.LockConflictError{Locks: roachpb.AsLocks(intents)} } return nil @@ -531,7 +531,7 @@ func CheckSSTConflicts( return enginepb.MVCCStats{}, err } intents = append(intents, roachpb.MakeIntent(mvccMeta.Txn, extIter.UnsafeKey().Key.Clone())) - if int64(len(intents)) >= maxIntents { + if int64(len(intents)) >= maxLockConflicts { return statsDiff, &kvpb.LockConflictError{Locks: roachpb.AsLocks(intents)} } extIter.Next() diff --git a/pkg/storage/sst_test.go b/pkg/storage/sst_test.go index a5b123f5f261..67e8003f87f8 100644 --- a/pkg/storage/sst_test.go +++ b/pkg/storage/sst_test.go @@ -29,7 +29,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestCheckSSTConflictsMaxIntents(t *testing.T) { +func TestCheckSSTConflictsMaxLockConflicts(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -38,15 +38,15 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) { start, end := "a", "z" testcases := []struct { - maxIntents int64 - expectIntents []string + maxLockConflicts int64 + expectIntents []string }{ - {maxIntents: -1, expectIntents: []string{"a"}}, - {maxIntents: 0, expectIntents: []string{"a"}}, - {maxIntents: 1, expectIntents: []string{"a"}}, - {maxIntents: 2, expectIntents: []string{"a", "b"}}, - {maxIntents: 3, expectIntents: []string{"a", "b", "c"}}, - {maxIntents: 4, expectIntents: []string{"a", "b", "c"}}, + {maxLockConflicts: -1, expectIntents: []string{"a"}}, + {maxLockConflicts: 0, expectIntents: []string{"a"}}, + {maxLockConflicts: 1, expectIntents: []string{"a"}}, + {maxLockConflicts: 2, expectIntents: []string{"a", "b"}}, + {maxLockConflicts: 3, expectIntents: []string{"a", "b", "c"}}, + {maxLockConflicts: 4, expectIntents: []string{"a", "b", "c"}}, } // Create SST with keys equal to intents at txn2TS. @@ -86,13 +86,13 @@ func TestCheckSSTConflictsMaxIntents(t *testing.T) { require.NoError(t, engine.Flush()) for _, tc := range testcases { - t.Run(fmt.Sprintf("maxIntents=%d", tc.maxIntents), func(t *testing.T) { + t.Run(fmt.Sprintf("maxLockConflicts=%d", tc.maxLockConflicts), func(t *testing.T) { for _, usePrefixSeek := range []bool{false, true} { t.Run(fmt.Sprintf("usePrefixSeek=%v", usePrefixSeek), func(t *testing.T) { // Provoke and check LockConflictError. startKey, endKey := MVCCKey{Key: roachpb.Key(start)}, MVCCKey{Key: roachpb.Key(end)} _, err := CheckSSTConflicts(ctx, sstFile.Bytes(), engine, startKey, endKey, startKey.Key, endKey.Key.Next(), - false /*disallowShadowing*/, hlc.Timestamp{} /*disallowShadowingBelow*/, hlc.Timestamp{} /* sstReqTS */, tc.maxIntents, usePrefixSeek) + false /*disallowShadowing*/, hlc.Timestamp{} /*disallowShadowingBelow*/, hlc.Timestamp{} /* sstReqTS */, tc.maxLockConflicts, usePrefixSeek) require.Error(t, err) lcErr := &kvpb.LockConflictError{} require.ErrorAs(t, err, &lcErr) diff --git a/pkg/storage/testdata/mvcc_histories/export b/pkg/storage/testdata/mvcc_histories/export index 6fe60279c9b0..9fe8b0299143 100644 --- a/pkg/storage/testdata/mvcc_histories/export +++ b/pkg/storage/testdata/mvcc_histories/export @@ -84,12 +84,12 @@ export k=a end=z error: (*kvpb.LockConflictError:) conflicting locks on "a" run error -export k=a end=z maxIntents=100 +export k=a end=z maxLockConflicts=100 ---- error: (*kvpb.LockConflictError:) conflicting locks on "a", "d", "j", "l", "o" run error -export k=a end=z maxIntents=3 +export k=a end=z maxLockConflicts=3 ---- error: (*kvpb.LockConflictError:) conflicting locks on "a", "d", "j" diff --git a/pkg/storage/testdata/mvcc_histories/export_fingerprint b/pkg/storage/testdata/mvcc_histories/export_fingerprint index dd1941c8ba45..5b2f45a63735 100644 --- a/pkg/storage/testdata/mvcc_histories/export_fingerprint +++ b/pkg/storage/testdata/mvcc_histories/export_fingerprint @@ -84,12 +84,12 @@ export fingerprint k=a end=z error: (*kvpb.LockConflictError:) conflicting locks on "a" run error -export fingerprint k=a end=z maxIntents=100 +export fingerprint k=a end=z maxLockConflicts=100 ---- error: (*kvpb.LockConflictError:) conflicting locks on "a", "d", "j", "l", "o" run error -export fingerprint k=a end=z maxIntents=3 +export fingerprint k=a end=z maxLockConflicts=3 ---- error: (*kvpb.LockConflictError:) conflicting locks on "a", "d", "j"