Skip to content

Commit

Permalink
kv: remove EvalContext.Engine
Browse files Browse the repository at this point in the history
This change removes the Engine method from EvalContext. Removing this is
important, as its existence appears to undermine #55461 and make #66485
difficult.

The first place where this was used was in EndTxn's evaluation function.
I don't see any reason for this. In fact, we had a TODO to fix this, which
we could have addressed years ago.

The second place where this was used was in RecomputeStats's evaluation
function. There, it was used for two reasons. First, it was used because
`storage.Batch` used to not provide a consistent view of data. They now
do. It was also used to evade spanset assertions, which this commit
addresses in a better way.
  • Loading branch information
nvanbenschoten committed Jul 1, 2021
1 parent c4b372e commit d8ff91e
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 35 deletions.
9 changes: 3 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -949,11 +949,8 @@ func splitTriggerHelper(
// - node two becomes the lease holder for [c,e). Its timestamp cache does
// not know about the read at 'd' which happened at the beginning.
// - node two can illegally propose a write to 'd' at a lower timestamp.
//
// TODO(tschottdorf): why would this use r.store.Engine() and not the
// batch? We do the same thing for other usages of the state loader.
sl := MakeStateLoader(rec)
leftLease, err := sl.LoadLease(ctx, rec.Engine())
leftLease, err := sl.LoadLease(ctx, batch)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load lease")
}
Expand All @@ -970,7 +967,7 @@ func splitTriggerHelper(
}
rightLease := leftLease
rightLease.Replica = replica
gcThreshold, err := sl.LoadGCThreshold(ctx, rec.Engine())
gcThreshold, err := sl.LoadGCThreshold(ctx, batch)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load GCThreshold")
}
Expand Down Expand Up @@ -1001,7 +998,7 @@ func splitTriggerHelper(
truncStateType = stateloader.TruncatedStateLegacyReplicated
}

replicaVersion, err := sl.LoadVersion(ctx, rec.Engine())
replicaVersion, err := sl.LoadVersion(ctx, batch)
if err != nil {
return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to load GCThreshold")
}
Expand Down
25 changes: 7 additions & 18 deletions pkg/kv/kvserver/batcheval/cmd_recompute_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func init() {
}

func declareKeysRecomputeStats(
rs ImmutableRangeState, _ roachpb.Header, req roachpb.Request, latchSpans, _ *spanset.SpanSet,
rs ImmutableRangeState, _ roachpb.Header, _ roachpb.Request, latchSpans, _ *spanset.SpanSet,
) {
// We don't declare any user key in the range. This is OK since all we're doing is computing a
// stats delta, and applying this delta commutes with other operations on the same key space.
Expand All @@ -53,7 +53,7 @@ func declareKeysRecomputeStats(
// RecomputeStats recomputes the MVCCStats stored for this range and adjust them accordingly,
// returning the MVCCStats delta obtained in the process.
func RecomputeStats(
ctx context.Context, _ storage.Reader, cArgs CommandArgs, resp roachpb.Response,
ctx context.Context, reader storage.Reader, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {
desc := cArgs.EvalCtx.Desc()
args := cArgs.Args.(*roachpb.RecomputeStatsRequest)
Expand All @@ -64,27 +64,16 @@ func RecomputeStats(

args = nil // avoid accidental use below

// Open a snapshot from which we will read everything (including the
// MVCCStats). This is necessary because a batch does not provide us
// with a consistent view of the data -- reading from the batch, we
// could see skew between the stats recomputation and the MVCCStats
// we read from the range state if concurrent writes are inflight[1].
//
// Note that in doing so, we also circumvent the assertions (present in both
// the EvalContext and the batch in some builds) which check that all reads
// were previously declared. See the comment in `declareKeysRecomputeStats`
// for details on this.
//
// [1]: see engine.TestBatchReadLaterWrite.
snap := cArgs.EvalCtx.Engine().NewSnapshot()
defer snap.Close()
// Disable the assertions which check that all reads were previously declared.
// See the comment in `declareKeysRecomputeStats` for details on this.
reader = spanset.DisableReaderAssertions(reader)

actualMS, err := rditer.ComputeStatsForRange(desc, snap, cArgs.Header.Timestamp.WallTime)
actualMS, err := rditer.ComputeStatsForRange(desc, reader, cArgs.Header.Timestamp.WallTime)
if err != nil {
return result.Result{}, err
}

currentStats, err := MakeStateLoader(cArgs.EvalCtx).LoadMVCCStats(ctx, snap)
currentStats, err := MakeStateLoader(cArgs.EvalCtx).LoadMVCCStats(ctx, reader)
if err != nil {
return result.Result{}, err
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -51,7 +50,6 @@ type EvalContext interface {
ClusterSettings() *cluster.Settings
EvalKnobs() kvserverbase.BatchEvalTestingKnobs

Engine() storage.Engine
Clock() *hlc.Clock
DB() *kv.DB
AbortSpan() *abortspan.AbortSpan
Expand Down Expand Up @@ -172,9 +170,6 @@ func (m *mockEvalCtxImpl) ClusterSettings() *cluster.Settings {
func (m *mockEvalCtxImpl) EvalKnobs() kvserverbase.BatchEvalTestingKnobs {
return kvserverbase.BatchEvalTestingKnobs{}
}
func (m *mockEvalCtxImpl) Engine() storage.Engine {
panic("unimplemented")
}
func (m *mockEvalCtxImpl) Clock() *hlc.Clock {
return m.MockEvalCtx.Clock
}
Expand Down
6 changes: 0 additions & 6 deletions pkg/kv/kvserver/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -92,11 +91,6 @@ func (rec *SpanSetReplicaEvalContext) GetNodeLocality() roachpb.Locality {
return rec.i.GetNodeLocality()
}

// Engine returns the engine.
func (rec *SpanSetReplicaEvalContext) Engine() storage.Engine {
return rec.i.Engine()
}

// GetFirstIndex returns the first index.
func (rec *SpanSetReplicaEvalContext) GetFirstIndex() (uint64, error) {
return rec.i.GetFirstIndex()
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,16 @@ func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch
ts: ts,
}
}

// DisableReaderAssertions unwraps any storage.Reader implementations that may
// assert access against a given SpanSet.
func DisableReaderAssertions(reader storage.Reader) storage.Reader {
switch v := reader.(type) {
case ReadWriter:
return DisableReaderAssertions(v.r)
case *spanSetBatch:
return DisableReaderAssertions(v.r)
default:
return reader
}
}

0 comments on commit d8ff91e

Please sign in to comment.