Skip to content

Commit

Permalink
kvserver, batcheval: pin Engine state during read-only command evalua…
Browse files Browse the repository at this point in the history
…tion

This commit makes it such that we eagerly pin the engine state of the `Reader`
created during the evaluation of read-only requests.

Today, reads will hold latches throughout the course of their evaluation
(particularly, while they do their `MVCCScan`). This commit paves the
way for us to move to a world where we avoid holding latches during the
MVCCScan. Additionally it also lets us make MVCC garbage collection latchless
as described in #55293.

There are a few notable changes in this patch:

1. Pinning the engine state eagerly runs into #70974. To resolve this, the
closed timestamp of the `Replica` is now captured at the time the `EvalContext`
is created, and not during the command evaluation of
`QueryResolvedTimestampRequest`.

2. `EvalContext` now has a `ImmutableEvalContext` embedded into it. The
`ImmutableEvalContext` is supposed to encapsulate state that must not change
after the `EvalContext` is created. The closed timestamp of the replica is part
of the `ImmutableEvalContext`.

3. `Replica` no longer fully implements the `EvalContext` interface. Instead,
it implements everything but `GetClosedTimestamp()` (which is implemented by
`ImmutableEvalContext` instead).

Relates to #55293
Resolves #55461
Resolves #70974

Release note: None
  • Loading branch information
aayushshah15 committed Mar 23, 2022
1 parent e02793d commit 099c920
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ func TestAddSSTableSSTTimestampToRequestTimestampRespectsClosedTS(t *testing.T)
require.NoError(t, err)
r, store, err := s.GetStores().(*kvserver.Stores).GetReplicaForRangeID(ctx, rd.RangeID)
require.NoError(t, err)
closedTS := r.GetClosedTimestamp(ctx)
closedTS := r.GetCurrentClosedTimestamp(ctx)
require.NotZero(t, closedTS)

// Add an SST writing below the closed timestamp. It should get pushed above it.
Expand Down
18 changes: 12 additions & 6 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Limiters struct {
// underlying state.
type EvalContext interface {
fmt.Stringer
ImmutableEvalContext

ClusterSettings() *cluster.Settings
EvalKnobs() kvserverbase.BatchEvalTestingKnobs

Expand Down Expand Up @@ -110,12 +112,6 @@ type EvalContext interface {
// requests on the range.
GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary

// GetClosedTimestamp returns the current closed timestamp on the range.
// It is expected that a caller will have performed some action (either
// calling RevokeLease or WatchForMerge) to freeze further progression of
// the closed timestamp before calling this method.
GetClosedTimestamp(ctx context.Context) hlc.Timestamp

GetExternalStorage(ctx context.Context, dest roachpb.ExternalStorage) (cloud.ExternalStorage, error)
GetExternalStorageFromURI(ctx context.Context, uri string, user security.SQLUsername) (cloud.ExternalStorage,
error)
Expand All @@ -136,6 +132,16 @@ type EvalContext interface {
GetResponseMemoryAccount() *mon.BoundAccount
}

// ImmutableEvalContext is like EvalContext, but it encapsulates state that
// needs to be immutable during the course of command evaluation.
type ImmutableEvalContext interface {
// GetClosedTimestamp returns the current closed timestamp on the range.
// It is expected that a caller will have performed some action (either
// calling RevokeLease or WatchForMerge) to freeze further progression of
// the closed timestamp before calling this method.
GetClosedTimestamp(ctx context.Context) hlc.Timestamp
}

// MockEvalCtx is a dummy implementation of EvalContext for testing purposes.
// For technical reasons, the interface is implemented by a wrapper .EvalContext().
type MockEvalCtx struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2149,7 +2149,7 @@ func TestStoreRangeMergeLHSLeaseTransfersAfterFreezeTime(t *testing.T) {
}
return nil
})
lhsClosedTS := lhsLeaseholder.GetClosedTimestamp(ctx)
lhsClosedTS := lhsLeaseholder.GetCurrentClosedTimestamp(ctx)
require.NotEmpty(t, lhsClosedTS)

// Finally, allow the merge to complete. It should complete successfully.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ func (cbt *circuitBreakerTest) FollowerRead(idx int) error {
repl := cbt.repls[idx]
get := roachpb.NewGet(repl.Desc().StartKey.AsRawKey(), false /* forUpdate */)
ctx := context.Background()
ts := repl.GetClosedTimestamp(ctx)
ts := repl.GetCurrentClosedTimestamp(ctx)
return cbt.SendCtxTS(ctx, idx, get, ts)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true;
require.NoError(t, err)
r, err := store.GetReplica(rightDesc.RangeID)
require.NoError(t, err)
maxClosed := r.GetClosedTimestamp(ctx)
maxClosed := r.GetCurrentClosedTimestamp(ctx)
// Note that maxClosed would not necessarily be below the freeze start if
// this was a LEAD_FOR_GLOBAL_READS range.
assert.True(t, maxClosed.LessEq(freezeStartTimestamp),
Expand Down Expand Up @@ -807,7 +807,7 @@ SET CLUSTER SETTING kv.closed_timestamp.follower_reads_enabled = true;
mergedLeaseholder, err := leftLeaseholderStore.GetReplica(leftDesc.RangeID)
require.NoError(t, err)
writeTime := rhsLeaseStart.Prev()
require.True(t, mergedLeaseholder.GetClosedTimestamp(ctx).Less(writeTime))
require.True(t, mergedLeaseholder.GetCurrentClosedTimestamp(ctx).Less(writeTime))
var baWrite roachpb.BatchRequest
baWrite.Header.RangeID = leftDesc.RangeID
baWrite.Header.Timestamp = writeTime
Expand Down
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,8 +703,6 @@ type Replica struct {
}
}

var _ batcheval.EvalContext = &Replica{}

// String returns the string representation of the replica using an
// inconsistent copy of the range descriptor. Therefore, String does not
// require a lock and its output may not be atomic with other ongoing work in
Expand Down Expand Up @@ -1232,7 +1230,7 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo {
// NB: this acquires an RLock(). Reentrant RLocks are deadlock prone, so do
// this first before RLocking below. Performance of this extra lock
// acquisition is not a concern.
ri.ActiveClosedTimestamp = r.GetClosedTimestamp(ctx)
ri.ActiveClosedTimestamp = r.GetCurrentClosedTimestamp(ctx)

// NB: numRangefeedRegistrations doesn't require Replica.mu to be locked.
// However, it does require coordination between multiple goroutines, so
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_closedts_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ func TestReplicaClosedTimestamp(t *testing.T) {
tc.repl.mu.state.RaftClosedTimestamp = test.raftClosed
tc.repl.mu.state.LeaseAppliedIndex = uint64(test.applied)
tc.repl.mu.Unlock()
require.Equal(t, test.expClosed, tc.repl.GetClosedTimestamp(ctx))
require.Equal(t, test.expClosed, tc.repl.GetCurrentClosedTimestamp(ctx))
})
}
}
Expand Down
27 changes: 25 additions & 2 deletions pkg/kv/kvserver/replica_eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand All @@ -28,6 +29,26 @@ import (
// Do not introduce new uses of this.
var todoSpanSet = &spanset.SpanSet{}

// evalContextImpl implements the batcheval.EvalContext interface.
type evalContextImpl struct {
*Replica
closedTS hlc.Timestamp
}

func newEvalContextImpl(r *Replica) *evalContextImpl {
return &evalContextImpl{
Replica: r,
closedTS: r.GetCurrentClosedTimestamp(context.TODO()),
}
}

// GetClosedTimestamp implements the EvalContext interface.
func (ec *evalContextImpl) GetClosedTimestamp(_ context.Context) hlc.Timestamp {
return ec.closedTS
}

var _ batcheval.EvalContext = &evalContextImpl{}

// NewReplicaEvalContext returns a batcheval.EvalContext to use for command
// evaluation. The supplied SpanSet will be ignored except for race builds, in
// which case state access is asserted against it. A SpanSet must always be
Expand All @@ -36,11 +57,13 @@ func NewReplicaEvalContext(r *Replica, ss *spanset.SpanSet) batcheval.EvalContex
if ss == nil {
log.Fatalf(r.AnnotateCtx(context.Background()), "can't create a ReplicaEvalContext with assertions but no SpanSet")
}

ec := newEvalContextImpl(r)
if util.RaceEnabled {
return &SpanSetReplicaEvalContext{
i: r,
i: ec,
ss: *ss,
}
}
return r
return ec
}
25 changes: 12 additions & 13 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B
}

requiredFrontier := ba.RequiredFrontier()
maxClosed := r.getClosedTimestampRLocked(ctx, requiredFrontier /* sufficient */)
maxClosed := r.getCurrentClosedTimestampLocked(ctx, requiredFrontier /* sufficient */)
canServeFollowerRead := requiredFrontier.LessEq(maxClosed)
tsDiff := requiredFrontier.GoTime().Sub(maxClosed.GoTime())
if !canServeFollowerRead {
Expand All @@ -106,13 +106,13 @@ func (r *Replica) canServeFollowerReadRLocked(ctx context.Context, ba *roachpb.B
return true
}

// getClosedTimestampRLocked is like maxClosed, except that it requires r.mu to be
// rlocked. It also optionally takes a hint: if sufficient is not
// empty, getClosedTimestampRLocked might return a timestamp that's lower than the
// maximum closed timestamp that we know about, as long as the returned
// timestamp is still >= sufficient. This is a performance optimization because
// we can avoid consulting the ClosedTimestampReceiver.
func (r *Replica) getClosedTimestampRLocked(
// getCurrentClosedTimestampRLocked is like GetCurrentClosedTimestamp, except
// that it requires r.mu to be RLocked. It also optionally takes a hint: if
// sufficient is not empty, getClosedTimestampRLocked might return a timestamp
// that's lower than the maximum closed timestamp that we know about, as long as
// the returned timestamp is still >= sufficient. This is a performance
// optimization because we can avoid consulting the ClosedTimestampReceiver.
func (r *Replica) getCurrentClosedTimestampLocked(
ctx context.Context, sufficient hlc.Timestamp,
) hlc.Timestamp {
appliedLAI := ctpb.LAI(r.mu.state.LeaseAppliedIndex)
Expand All @@ -126,11 +126,10 @@ func (r *Replica) getClosedTimestampRLocked(
return maxClosed
}

// GetClosedTimestamp returns the maximum closed timestamp for this range.
//
// GetClosedTimestamp is part of the EvalContext interface.
func (r *Replica) GetClosedTimestamp(ctx context.Context) hlc.Timestamp {
// GetCurrentClosedTimestamp returns the current maximum closed timestamp for
// this range.
func (r *Replica) GetCurrentClosedTimestamp(ctx context.Context) hlc.Timestamp {
r.mu.RLock()
defer r.mu.RUnlock()
return r.getClosedTimestampRLocked(ctx, hlc.Timestamp{} /* sufficient */)
return r.getCurrentClosedTimestampLocked(ctx, hlc.Timestamp{} /* sufficient */)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(

// Check for an initial closed timestamp update immediately to help
// initialize the rangefeed's resolved timestamp as soon as possible.
r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetClosedTimestamp(ctx))
r.handleClosedTimestampUpdateRaftMuLocked(ctx, r.GetCurrentClosedTimestamp(ctx))

return p
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func (r *Replica) executeReadOnlyBatch(
// may start relying on this, so we assert here.
panic("expected consistent iterators")
}
// Pin engine state eagerly so that all iterators created over this Reader are
// based off the state of the engine as of this point and are mutually
// consistent.
if err := rw.PinEngineStateForIterators(); err != nil {
return nil, g, roachpb.NewError(err)
}
if util.RaceEnabled {
rw = spanset.NewReadWriterAt(rw, g.LatchSpans(), ba.Timestamp)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5359,7 +5359,7 @@ func TestAbortSpanError(t *testing.T) {
t.Fatal(err)
}

rec := &SpanSetReplicaEvalContext{tc.repl, *allSpans()}
rec := &SpanSetReplicaEvalContext{newEvalContextImpl(tc.repl), *allSpans()}
pErr := checkIfTxnAborted(ctx, rec, tc.engine, txn)
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {
expected := txn.Clone()
Expand Down Expand Up @@ -5769,7 +5769,7 @@ func TestResolveIntentPushTxnReplyTxn(t *testing.T) {
// return args.PusherTxn.
h = roachpb.Header{Timestamp: tc.Clock().Now()}
var reply roachpb.PushTxnResponse
if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: tc.repl, Stats: &ms, Header: h, Args: &pa}, &reply); err != nil {
if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: newEvalContextImpl(tc.repl), Stats: &ms, Header: h, Args: &pa}, &reply); err != nil {
t.Fatal(err)
} else if reply.Txn != nil {
t.Fatalf("expected nil response txn, but got %s", reply.Txn)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (r *Replica) GetCurrentReadSummary(ctx context.Context) rspb.ReadSummary {
// Forward the read summary by the range's closed timestamp, because any
// replica could have served reads below this time. We also return the
// closed timestamp separately, in case callers want it split out.
closedTS := r.GetClosedTimestamp(ctx)
closedTS := r.GetCurrentClosedTimestamp(ctx)
sum.Merge(rspb.FromTimestamp(closedTS))
return sum
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2294,7 +2294,7 @@ func (s *Store) startRangefeedUpdater(ctx context.Context) {
if r == nil {
continue
}
r.handleClosedTimestampUpdate(ctx, r.GetClosedTimestamp(ctx))
r.handleClosedTimestampUpdate(ctx, r.GetCurrentClosedTimestamp(ctx))
}
case <-confCh:
// Loop around to use the updated timer.
Expand Down Expand Up @@ -3156,7 +3156,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
if w := metrics.LockTableMetrics.TopKLocksByWaitDuration[0].MaxWaitDurationNanos; w > maxLockWaitDurationNanos {
maxLockWaitDurationNanos = w
}
mc := rep.GetClosedTimestamp(ctx)
mc := rep.GetCurrentClosedTimestamp(ctx)
if minMaxClosedTS.IsEmpty() || mc.Less(minMaxClosedTS) {
minMaxClosedTS = mc
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func splitPreApply(
if initClosedTS == nil {
initClosedTS = &hlc.Timestamp{}
}
initClosedTS.Forward(r.GetClosedTimestamp(ctx))
initClosedTS.Forward(r.GetCurrentClosedTimestamp(ctx))
if err := rsl.SetClosedTimestamp(ctx, readWriter, initClosedTS); err != nil {
log.Fatalf(ctx, "%s", err)
}
Expand Down
25 changes: 20 additions & 5 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -1676,9 +1676,11 @@ type pebbleReadOnly struct {
normalIter pebbleIterator
prefixEngineIter pebbleIterator
normalEngineIter pebbleIterator
iter cloneableIter
durability DurabilityRequirement
closed bool

iter cloneableIter
unused bool
durability DurabilityRequirement
closed bool
}

var _ ReadWriter = &pebbleReadOnly{}
Expand Down Expand Up @@ -1720,6 +1722,13 @@ func (p *pebbleReadOnly) Close() {
panic("closing an already-closed pebbleReadOnly")
}
p.closed = true
if p.unused {
err := p.iter.Close()
if err != nil {
panic(err)
}
}

// Setting iter to nil is sufficient since it will be closed by one of the
// subsequent destroy calls.
p.iter = nil
Expand Down Expand Up @@ -1838,11 +1847,12 @@ func (p *pebbleReadOnly) NewMVCCIterator(iterKind MVCCIterKind, opts IterOptions
if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
} else {
iter.init(p.parent.db, p.iter, opts, p.durability)
iter.init(p.parent.db, p.iter, p.unused, opts, p.durability)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
}
p.unused = false
iter.reusable = true
}

Expand Down Expand Up @@ -1873,11 +1883,12 @@ func (p *pebbleReadOnly) NewEngineIterator(opts IterOptions) EngineIterator {
if iter.iter != nil {
iter.setBounds(opts.LowerBound, opts.UpperBound)
} else {
iter.init(p.parent.db, p.iter, opts, p.durability)
iter.init(p.parent.db, p.iter, p.unused, opts, p.durability)
if p.iter == nil {
// For future cloning.
p.iter = iter.iter
}
p.unused = false
iter.reusable = true
}

Expand Down Expand Up @@ -1910,6 +1921,10 @@ func (p *pebbleReadOnly) PinEngineStateForIterators() error {
o = &pebble.IterOptions{OnlyReadGuaranteedDurable: true}
}
p.iter = p.parent.db.NewIter(o)
// Since the iterator is being created just to pin the state of the engine
// for future iterators, we'll avoid cloning it the next time we want an
// iterator and instead just re-use what we created here.
p.unused = true
}
return nil
}
Expand Down
Loading

0 comments on commit 099c920

Please sign in to comment.