From 099c92028966c637998b556152d95b0cff43a162 Mon Sep 17 00:00:00 2001 From: Aayush Shah Date: Wed, 9 Feb 2022 10:57:21 -0500 Subject: [PATCH] kvserver, batcheval: pin Engine state during read-only command evaluation This commit makes it such that we eagerly pin the engine state of the `Reader` created during the evaluation of read-only requests. Today, reads will hold latches throughout the course of their evaluation (particularly, while they do their `MVCCScan`). This commit paves the way for us to move to a world where we avoid holding latches during the MVCCScan. Additionally it also lets us make MVCC garbage collection latchless as described in #55293. There are a few notable changes in this patch: 1. Pinning the engine state eagerly runs into #70974. To resolve this, the closed timestamp of the `Replica` is now captured at the time the `EvalContext` is created, and not during the command evaluation of `QueryResolvedTimestampRequest`. 2. `EvalContext` now has a `ImmutableEvalContext` embedded into it. The `ImmutableEvalContext` is supposed to encapsulate state that must not change after the `EvalContext` is created. The closed timestamp of the replica is part of the `ImmutableEvalContext`. 3. `Replica` no longer fully implements the `EvalContext` interface. Instead, it implements everything but `GetClosedTimestamp()` (which is implemented by `ImmutableEvalContext` instead). Relates to https://github.com/cockroachdb/cockroach/issues/55293 Resolves https://github.com/cockroachdb/cockroach/issues/55461 Resolves https://github.com/cockroachdb/cockroach/issues/70974 Release note: None --- .../batcheval/cmd_add_sstable_test.go | 2 +- pkg/kv/kvserver/batcheval/eval_context.go | 18 ++++++++---- pkg/kv/kvserver/client_merge_test.go | 2 +- .../client_replica_circuit_breaker_test.go | 2 +- pkg/kv/kvserver/closed_timestamp_test.go | 4 +-- pkg/kv/kvserver/replica.go | 4 +-- .../replica_closedts_internal_test.go | 2 +- pkg/kv/kvserver/replica_eval_context.go | 27 ++++++++++++++++-- pkg/kv/kvserver/replica_follower_read.go | 25 ++++++++--------- pkg/kv/kvserver/replica_rangefeed.go | 2 +- pkg/kv/kvserver/replica_read.go | 6 ++++ pkg/kv/kvserver/replica_test.go | 4 +-- pkg/kv/kvserver/replica_tscache.go | 2 +- pkg/kv/kvserver/store.go | 4 +-- pkg/kv/kvserver/store_split.go | 2 +- pkg/storage/pebble.go | 25 +++++++++++++---- pkg/storage/pebble_batch.go | 28 ++++++++++++++----- pkg/storage/pebble_iterator.go | 15 ++++++++-- 18 files changed, 122 insertions(+), 52 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index 0a7f8c8f3ba8..0abd59c0fac7 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -1422,7 +1422,7 @@ func TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS(t *testing.T) require.NoError(t, err) r, store, err := s.GetStores().(*kvserver.Stores).GetReplicaForRangeID(ctx, rd.RangeID) require.NoError(t, err) - closedTS := r.GetClosedTimestamp(ctx) + closedTS := r.GetCurrentClosedTimestamp(ctx) require.NotZero(t, closedTS) // Add an SST writing below the closed timestamp. It should get pushed above it. diff --git a/pkg/kv/kvserver/batcheval/eval_context.go b/pkg/kv/kvserver/batcheval/eval_context.go index 212345adb615..b21365eec966 100644 --- a/pkg/kv/kvserver/batcheval/eval_context.go +++ b/pkg/kv/kvserver/batcheval/eval_context.go @@ -48,6 +48,8 @@ type Limiters struct { // underlying state. type EvalContext interface { fmt.Stringer + ImmutableEvalContext + ClusterSettings() *cluster.Settings EvalKnobs() kvserverbase.BatchEvalTestingKnobs @@ -110,12 +112,6 @@ type EvalContext interface { // requests on the range. GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary - // GetClosedTimestamp returns the current closed timestamp on the range. - // It is expected that a caller will have performed some action (either - // calling RevokeLease or WatchForMerge) to freeze further progression of - // the closed timestamp before calling this method. - GetClosedTimestamp(ctx context.Context) hlc.Timestamp - GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error) GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage, error) @@ -136,6 +132,16 @@ type EvalContext interface { GetResponseMemoryAccount() *mon.BoundAccount } +// ImmutableEvalContext is like EvalContext, but it encapsulates state that +// needs to be immutable during the course of command evaluation. +type ImmutableEvalContext interface { + // GetClosedTimestamp returns the current closed timestamp on the range. + // It is expected that a caller will have performed some action (either + // calling RevokeLease or WatchForMerge) to freeze further progression of + // the closed timestamp before calling this method. + GetClosedTimestamp(ctx context.Context) hlc.Timestamp +} + // MockEvalCtx is a dummy implementation of EvalContext for testing purposes. // For technical reasons, the interface is implemented by a wrapper .EvalContext(). type MockEvalCtx struct { diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index c2e662ceaeca..d4a6b0d0e10a 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -2149,7 +2149,7 @@ func TestStoreRangeMergeLHSLeaseTransfersAfterFreezeTime(t *testing.T) { } return nil }) - lhsClosedTS := lhsLeaseholder.GetClosedTimestamp(ctx) + lhsClosedTS := lhsLeaseholder.GetCurrentClosedTimestamp(ctx) require.NotEmpty(t, lhsClosedTS) // Finally, allow the merge to complete. It should complete successfully. diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index de6396073672..ab76c2bae643 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -976,7 +976,7 @@ func (cbt *circuitBreakerTest) FollowerRead(idx int) error { repl := cbt.repls[idx] get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */) ctx := context.Background() - ts := repl.GetClosedTimestamp(ctx) + ts := repl.GetCurrentClosedTimestamp(ctx) return cbt.SendCtxTS(ctx, idx, get, ts) } diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 862d12b24879..ac846b12e8c3 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -772,7 +772,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true; require.NoError(t, err) r, err := store.GetReplica(rightDesc.RangeID) require.NoError(t, err) - maxClosed := r.GetClosedTimestamp(ctx) + maxClosed := r.GetCurrentClosedTimestamp(ctx) // Note that maxClosed would not necessarily be below the freeze start if // this was a LEAD_FOR_GLOBAL_READS range. assert.True(t, maxClosed.LessEq(freezeStartTimestamp), @@ -807,7 +807,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true; mergedLeaseholder, err := leftLeaseholderStore.GetReplica(leftDesc.RangeID) require.NoError(t, err) writeTime := rhsLeaseStart.Prev() - require.True(t, mergedLeaseholder.GetClosedTimestamp(ctx).Less(writeTime)) + require.True(t, mergedLeaseholder.GetCurrentClosedTimestamp(ctx).Less(writeTime)) var baWrite roachpb.BatchRequest baWrite.Header.RangeID = leftDesc.RangeID baWrite.Header.Timestamp = writeTime diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index a5327382d5da..2880e095f137 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -703,8 +703,6 @@ type Replica struct { } } -var _ batcheval.EvalContext = &Replica{} - // String returns the string representation of the replica using an // inconsistent copy of the range descriptor. Therefore, String does not // require a lock and its output may not be atomic with other ongoing work in @@ -1232,7 +1230,7 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo { // NB: this acquires an RLock(). Reentrant RLocks are deadlock prone, so do // this first before RLocking below. Performance of this extra lock // acquisition is not a concern. - ri.ActiveClosedTimestamp = r.GetClosedTimestamp(ctx) + ri.ActiveClosedTimestamp = r.GetCurrentClosedTimestamp(ctx) // NB: numRangefeedRegistrations doesn't require Replica.mu to be locked. // However, it does require coordination between multiple goroutines, so diff --git a/pkg/kv/kvserver/replica_closedts_internal_test.go b/pkg/kv/kvserver/replica_closedts_internal_test.go index 97df78100077..f18c7d450d55 100644 --- a/pkg/kv/kvserver/replica_closedts_internal_test.go +++ b/pkg/kv/kvserver/replica_closedts_internal_test.go @@ -567,7 +567,7 @@ func TestReplicaClosedTimestamp(t *testing.T) { tc.repl.mu.state.RaftClosedTimestamp = test.raftClosed tc.repl.mu.state.LeaseAppliedIndex = uint64(test.applied) tc.repl.mu.Unlock() - require.Equal(t, test.expClosed, tc.repl.GetClosedTimestamp(ctx)) + require.Equal(t, test.expClosed, tc.repl.GetCurrentClosedTimestamp(ctx)) }) } } diff --git a/pkg/kv/kvserver/replica_eval_context.go b/pkg/kv/kvserver/replica_eval_context.go index 47744e471d5f..cf1597645707 100644 --- a/pkg/kv/kvserver/replica_eval_context.go +++ b/pkg/kv/kvserver/replica_eval_context.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -28,6 +29,26 @@ import ( // Do not introduce new uses of this. var todoSpanSet = &spanset.SpanSet{} +// evalContextImpl implements the batcheval.EvalContext interface. +type evalContextImpl struct { + *Replica + closedTS hlc.Timestamp +} + +func newEvalContextImpl(r *Replica) *evalContextImpl { + return &evalContextImpl{ + Replica: r, + closedTS: r.GetCurrentClosedTimestamp(context.TODO()), + } +} + +// GetClosedTimestamp implements the EvalContext interface. +func (ec *evalContextImpl) GetClosedTimestamp(_ context.Context) hlc.Timestamp { + return ec.closedTS +} + +var _ batcheval.EvalContext = &evalContextImpl{} + // NewReplicaEvalContext returns a batcheval.EvalContext to use for command // evaluation. The supplied SpanSet will be ignored except for race builds, in // which case state access is asserted against it. A SpanSet must always be @@ -36,11 +57,13 @@ func NewReplicaEvalContext(r *Replica, ss *spanset.SpanSet) batcheval.EvalContex if ss == nil { log.Fatalf(r.AnnotateCtx(context.Background()), "can't create a ReplicaEvalContext with assertions but no SpanSet") } + + ec := newEvalContextImpl(r) if util.RaceEnabled { return &SpanSetReplicaEvalContext{ - i: r, + i: ec, ss: *ss, } } - return r + return ec } diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index a189a6798280..3edd73aa51b5 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -81,7 +81,7 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B } requiredFrontier := ba.RequiredFrontier() - maxClosed := r.getClosedTimestampRLocked(ctx, requiredFrontier /* sufficient */) + maxClosed := r.getCurrentClosedTimestampLocked(ctx, requiredFrontier /* sufficient */) canServeFollowerRead := requiredFrontier.LessEq(maxClosed) tsDiff := requiredFrontier.GoTime().Sub(maxClosed.GoTime()) if !canServeFollowerRead { @@ -106,13 +106,13 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B return true } -// getClosedTimestampRLocked is like maxClosed, except that it requires r.mu to be -// rlocked. It also optionally takes a hint: if sufficient is not -// empty, getClosedTimestampRLocked might return a timestamp that's lower than the -// maximum closed timestamp that we know about, as long as the returned -// timestamp is still >= sufficient. This is a performance optimization because -// we can avoid consulting the ClosedTimestampReceiver. -func (r *Replica) getClosedTimestampRLocked( +// getCurrentClosedTimestampRLocked is like GetCurrentClosedTimestamp, except +// that it requires r.mu to be RLocked. It also optionally takes a hint: if +// sufficient is not empty, getClosedTimestampRLocked might return a timestamp +// that's lower than the maximum closed timestamp that we know about, as long as +// the returned timestamp is still >= sufficient. This is a performance +// optimization because we can avoid consulting the ClosedTimestampReceiver. +func (r *Replica) getCurrentClosedTimestampLocked( ctx context.Context, sufficient hlc.Timestamp, ) hlc.Timestamp { appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex) @@ -126,11 +126,10 @@ func (r *Replica) getClosedTimestampRLocked( return maxClosed } -// GetClosedTimestamp returns the maximum closed timestamp for this range. -// -// GetClosedTimestamp is part of the EvalContext interface. -func (r *Replica) GetClosedTimestamp(ctx context.Context) hlc.Timestamp { +// GetCurrentClosedTimestamp returns the current maximum closed timestamp for +// this range. +func (r *Replica) GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp { r.mu.RLock() defer r.mu.RUnlock() - return r.getClosedTimestampRLocked(ctx, hlc.Timestamp{} /* sufficient */) + return r.getCurrentClosedTimestampLocked(ctx, hlc.Timestamp{} /* sufficient */) } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 762fc3eec052..495185544a4a 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -435,7 +435,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked( // Check for an initial closed timestamp update immediately to help // initialize the rangefeed's resolved timestamp as soon as possible. - r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetClosedTimestamp(ctx)) + r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetCurrentClosedTimestamp(ctx)) return p } diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index ec19a2b92e00..985a0793e4c1 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -60,6 +60,12 @@ func (r *Replica) executeReadOnlyBatch( // may start relying on this, so we assert here. panic("expected consistent iterators") } + // Pin engine state eagerly so that all iterators created over this Reader are + // based off the state of the engine as of this point and are mutually + // consistent. + if err := rw.PinEngineStateForIterators(); err != nil { + return nil, g, roachpb.NewError(err) + } if util.RaceEnabled { rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index a9981dadaffa..fd735f7cbd1a 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -5359,7 +5359,7 @@ func TestAbortSpanError(t *testing.T) { t.Fatal(err) } - rec := &SpanSetReplicaEvalContext{tc.repl, *allSpans()} + rec := &SpanSetReplicaEvalContext{newEvalContextImpl(tc.repl), *allSpans()} pErr := checkIfTxnAborted(ctx, rec, tc.engine, txn) if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok { expected := txn.Clone() @@ -5769,7 +5769,7 @@ func TestResolveIntentPushTxnReplyTxn(t *testing.T) { // return args.PusherTxn. h = roachpb.Header{Timestamp: tc.Clock().Now()} var reply roachpb.PushTxnResponse - if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: tc.repl, Stats: &ms, Header: h, Args: &pa}, &reply); err != nil { + if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: newEvalContextImpl(tc.repl), Stats: &ms, Header: h, Args: &pa}, &reply); err != nil { t.Fatal(err) } else if reply.Txn != nil { t.Fatalf("expected nil response txn, but got %s", reply.Txn) diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 84888ccaab8e..f5f91509779d 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -578,7 +578,7 @@ func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary { // Forward the read summary by the range's closed timestamp, because any // replica could have served reads below this time. We also return the // closed timestamp separately, in case callers want it split out. - closedTS := r.GetClosedTimestamp(ctx) + closedTS := r.GetCurrentClosedTimestamp(ctx) sum.Merge(rspb.FromTimestamp(closedTS)) return sum } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 85a3739cedf7..bbcb293900f2 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2294,7 +2294,7 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) { if r == nil { continue } - r.handleClosedTimestampUpdate(ctx, r.GetClosedTimestamp(ctx)) + r.handleClosedTimestampUpdate(ctx, r.GetCurrentClosedTimestamp(ctx)) } case <-confCh: // Loop around to use the updated timer. @@ -3156,7 +3156,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { if w := metrics.LockTableMetrics.TopKLocksByWaitDuration[0].MaxWaitDurationNanos; w > maxLockWaitDurationNanos { maxLockWaitDurationNanos = w } - mc := rep.GetClosedTimestamp(ctx) + mc := rep.GetCurrentClosedTimestamp(ctx) if minMaxClosedTS.IsEmpty() || mc.Less(minMaxClosedTS) { minMaxClosedTS = mc } diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 45327e102047..f8b356734290 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -144,7 +144,7 @@ func splitPreApply( if initClosedTS == nil { initClosedTS = &hlc.Timestamp{} } - initClosedTS.Forward(r.GetClosedTimestamp(ctx)) + initClosedTS.Forward(r.GetCurrentClosedTimestamp(ctx)) if err := rsl.SetClosedTimestamp(ctx, readWriter, initClosedTS); err != nil { log.Fatalf(ctx, "%s", err) } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 2457a6798697..b2034a58b802 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1676,9 +1676,11 @@ type pebbleReadOnly struct { normalIter pebbleIterator prefixEngineIter pebbleIterator normalEngineIter pebbleIterator - iter cloneableIter - durability DurabilityRequirement - closed bool + + iter cloneableIter + unused bool + durability DurabilityRequirement + closed bool } var _ ReadWriter = &pebbleReadOnly{} @@ -1720,6 +1722,13 @@ func (p *pebbleReadOnly) Close() { panic("closing an already-closed pebbleReadOnly") } p.closed = true + if p.unused { + err := p.iter.Close() + if err != nil { + panic(err) + } + } + // Setting iter to nil is sufficient since it will be closed by one of the // subsequent destroy calls. p.iter = nil @@ -1838,11 +1847,12 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions if iter.iter != nil { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { - iter.init(p.parent.db, p.iter, opts, p.durability) + iter.init(p.parent.db, p.iter, p.unused, opts, p.durability) if p.iter == nil { // For future cloning. p.iter = iter.iter } + p.unused = false iter.reusable = true } @@ -1873,11 +1883,12 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator { if iter.iter != nil { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { - iter.init(p.parent.db, p.iter, opts, p.durability) + iter.init(p.parent.db, p.iter, p.unused, opts, p.durability) if p.iter == nil { // For future cloning. p.iter = iter.iter } + p.unused = false iter.reusable = true } @@ -1910,6 +1921,10 @@ func (p *pebbleReadOnly) PinEngineStateForIterators() error { o = &pebble.IterOptions{OnlyReadGuaranteedDurable: true} } p.iter = p.parent.db.NewIter(o) + // Since the iterator is being created just to pin the state of the engine + // for future iterators, we'll avoid cloning it the next time we want an + // iterator and instead just re-use what we created here. + p.unused = true } return nil } diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 371f430dd590..d3b5c6263d83 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -47,9 +47,11 @@ type pebbleBatch struct { normalIter pebbleIterator prefixEngineIter pebbleIterator normalEngineIter pebbleIterator - iter cloneableIter - writeOnly bool - closed bool + + iter cloneableIter + writeOnly bool + unused bool + closed bool wrappedIntentWriter intentDemuxWriter // scratch space for wrappedIntentWriter. @@ -104,6 +106,12 @@ func (p *pebbleBatch) Close() { } p.closed = true + if p.unused { + if err := p.iter.Close(); err != nil { + panic(err) + } + } + // Setting iter to nil is sufficient since it will be closed by one of the // subsequent destroy calls. p.iter = nil @@ -230,14 +238,15 @@ func (p *pebbleBatch) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions) M iter.setBounds(opts.LowerBound, opts.UpperBound) } else { if p.batch.Indexed() { - iter.init(p.batch, p.iter, opts, StandardDurability) + iter.init(p.batch, p.iter, p.unused, opts, StandardDurability) } else { - iter.init(p.db, p.iter, opts, StandardDurability) + iter.init(p.db, p.iter, p.unused, opts, StandardDurability) } if p.iter == nil { // For future cloning. p.iter = iter.iter } + p.unused = false } iter.inuse = true @@ -272,14 +281,15 @@ func (p *pebbleBatch) NewEngineIterator(opts IterOptions) EngineIterator { iter.setBounds(opts.LowerBound, opts.UpperBound) } else { if p.batch.Indexed() { - iter.init(p.batch, p.iter, opts, StandardDurability) + iter.init(p.batch, p.iter, p.unused, opts, StandardDurability) } else { - iter.init(p.db, p.iter, opts, StandardDurability) + iter.init(p.db, p.iter, p.unused, opts, StandardDurability) } if p.iter == nil { // For future cloning. p.iter = iter.iter } + p.unused = false } iter.inuse = true @@ -299,6 +309,10 @@ func (p *pebbleBatch) PinEngineStateForIterators() error { } else { p.iter = p.db.NewIter(nil) } + // Since the iterator is being created just to pin the state of the engine + // for future iterators, we'll avoid cloning it the next time we want an + // iterator and instead just re-use what we created here. + p.unused = true } return nil } diff --git a/pkg/storage/pebble_iterator.go b/pkg/storage/pebble_iterator.go index 1797cd4cb1de..8fc315668657 100644 --- a/pkg/storage/pebble_iterator.go +++ b/pkg/storage/pebble_iterator.go @@ -76,6 +76,7 @@ var pebbleIterPool = sync.Pool{ type cloneableIter interface { Clone() (*pebble.Iterator, error) + Close() error } type testingSetBoundsListener interface { @@ -91,7 +92,7 @@ func newPebbleIterator( ) *pebbleIterator { iter := pebbleIterPool.Get().(*pebbleIterator) iter.reusable = false // defensive - iter.init(handle, iterToClone, opts, durability) + iter.init(handle, iterToClone, false /* iterUnused */, opts, durability) return iter } @@ -106,6 +107,7 @@ func newPebbleIterator( func (p *pebbleIterator) init( handle pebble.Reader, iterToClone cloneableIter, + iterUnused bool, opts IterOptions, durability DurabilityRequirement, ) { @@ -183,8 +185,15 @@ func (p *pebbleIterator) init( if doClone { var err error - if p.iter, err = iterToClone.Clone(); err != nil { - panic(err) + if iterUnused { + // NB: If the iterator was never used (at the time of writing, this means + // that the iterator was created by `PinEngineStateForIterators()`), we + // don't need to clone it. + p.iter = iterToClone.(*pebble.Iterator) + } else { + if p.iter, err = iterToClone.Clone(); err != nil { + panic(err) + } } p.iter.SetBounds(p.options.LowerBound, p.options.UpperBound) } else {