diff --git a/pkg/kv/kvserver/replica_eval_context.go b/pkg/kv/kvserver/replica_eval_context.go index cf1597645707..207b54966cc0 100644 --- a/pkg/kv/kvserver/replica_eval_context.go +++ b/pkg/kv/kvserver/replica_eval_context.go @@ -32,18 +32,34 @@ var todoSpanSet = &spanset.SpanSet{} // evalContextImpl implements the batcheval.EvalContext interface. type evalContextImpl struct { *Replica - closedTS hlc.Timestamp + // NB: We cannot use the emptiness of `closedTS` to determine whether the + // closed timestamp was elided during the creation of this eval context, so we + // track it separately. + closedTSElided bool + closedTS hlc.Timestamp } -func newEvalContextImpl(r *Replica) *evalContextImpl { +func newEvalContextImpl(r *Replica, requireClosedTS bool) *evalContextImpl { + var closedTS hlc.Timestamp + if requireClosedTS { + // We elide this call to get the replica's current closed timestamp unless + // the request requires it, in order to avoid redundant mutex contention. + closedTS = r.GetCurrentClosedTimestamp(context.Background()) + } + return &evalContextImpl{ - Replica: r, - closedTS: r.GetCurrentClosedTimestamp(context.TODO()), + Replica: r, + closedTSElided: !requireClosedTS, + closedTS: closedTS, } } // GetClosedTimestamp implements the EvalContext interface. -func (ec *evalContextImpl) GetClosedTimestamp(_ context.Context) hlc.Timestamp { +func (ec *evalContextImpl) GetClosedTimestamp() hlc.Timestamp { + if ec.closedTSElided { + panic("closed timestamp was elided during eval context creation; does the" + + " request set the requiresClosedTimestamp flag?") + } return ec.closedTS } @@ -53,12 +69,14 @@ var _ batcheval.EvalContext = &evalContextImpl{} // evaluation. The supplied SpanSet will be ignored except for race builds, in // which case state access is asserted against it. A SpanSet must always be // passed. -func NewReplicaEvalContext(r *Replica, ss *spanset.SpanSet) batcheval.EvalContext { +func NewReplicaEvalContext( + r *Replica, ss *spanset.SpanSet, requireClosedTS bool, +) batcheval.EvalContext { if ss == nil { log.Fatalf(r.AnnotateCtx(context.Background()), "can't create a ReplicaEvalContext with assertions but no SpanSet") } - ec := newEvalContextImpl(r) + ec := newEvalContextImpl(r, requireClosedTS) if util.RaceEnabled { return &SpanSetReplicaEvalContext{ i: ec, diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index f22a76ced01f..1c21f9111cd4 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -174,7 +174,7 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked( ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(span)}) // Call evaluateBatch instead of Send to avoid reacquiring latches. - rec := NewReplicaEvalContext(r, todoSpanSet) + rec := NewReplicaEvalContext(r, todoSpanSet, false /* requireClosedTS */) rw := r.Engine().NewReadOnly(storage.StandardDurability) defer rw.Close() @@ -217,7 +217,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt ba.Timestamp = r.store.Clock().Now() ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(keys.SystemConfigSpan)}) // Call evaluateBatch instead of Send to avoid reacquiring latches. - rec := NewReplicaEvalContext(r, todoSpanSet) + rec := NewReplicaEvalContext(r, todoSpanSet, false /* requireClosedTS */) rw := r.Engine().NewReadOnly(storage.StandardDurability) defer rw.Close() diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 985a0793e4c1..592962c7be7c 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -49,7 +49,7 @@ func (r *Replica) executeReadOnlyBatch( ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset()) // Evaluate read-only batch command. - rec := NewReplicaEvalContext(r, g.LatchSpans()) + rec := NewReplicaEvalContext(r, g.LatchSpans(), ba.RequiresClosedTS()) // TODO(irfansharif): It's unfortunate that in this read-only code path, // we're stuck with a ReadWriter because of the way evaluateBatch is diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index fd735f7cbd1a..2f075e55ecc9 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -945,7 +945,7 @@ func TestReplicaLease(t *testing.T) { } { if _, err := batcheval.RequestLease(ctx, tc.store.Engine(), batcheval.CommandArgs{ - EvalCtx: NewReplicaEvalContext(tc.repl, allSpans()), + EvalCtx: NewReplicaEvalContext(tc.repl, allSpans(), false /* requireClosedTS */), Args: &roachpb.RequestLeaseRequest{ Lease: lease, }, @@ -4976,7 +4976,7 @@ func TestEndTxnDirectGC(t *testing.T) { var gr roachpb.GetResponse if _, err := batcheval.Get( ctx, tc.engine, batcheval.CommandArgs{ - EvalCtx: NewReplicaEvalContext(tc.repl, allSpans()), + EvalCtx: NewReplicaEvalContext(tc.repl, allSpans(), false /* requireClosedTS */), Args: &roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{ Key: keys.TransactionKey(txn.Key, txn.ID), }}, @@ -5359,7 +5359,7 @@ func TestAbortSpanError(t *testing.T) { t.Fatal(err) } - rec := &SpanSetReplicaEvalContext{newEvalContextImpl(tc.repl), *allSpans()} + rec := &SpanSetReplicaEvalContext{newEvalContextImpl(tc.repl, false /* requireClosedTS */), *allSpans()} pErr := checkIfTxnAborted(ctx, rec, tc.engine, txn) if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok { expected := txn.Clone() @@ -5769,7 +5769,10 @@ func TestResolveIntentPushTxnReplyTxn(t *testing.T) { // return args.PusherTxn. h = roachpb.Header{Timestamp: tc.Clock().Now()} var reply roachpb.PushTxnResponse - if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: newEvalContextImpl(tc.repl), Stats: &ms, Header: h, Args: &pa}, &reply); err != nil { + if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: newEvalContextImpl( + tc.repl, + false, /* requireClosedTS */ + ), Stats: &ms, Header: h, Args: &pa}, &reply); err != nil { t.Fatal(err) } else if reply.Txn != nil { t.Fatalf("expected nil response txn, but got %s", reply.Txn) @@ -8457,7 +8460,7 @@ func TestGCWithoutThreshold(t *testing.T) { if _, err := batcheval.GC(ctx, rw, batcheval.CommandArgs{ Args: &gc, - EvalCtx: NewReplicaEvalContext(tc.repl, &spans), + EvalCtx: NewReplicaEvalContext(tc.repl, &spans, false /* requireClosedTS */), }, &resp); err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 8b9ffc849c38..d2b070ee5060 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -424,7 +424,7 @@ func (r *Replica) evaluateWriteBatch( } ms := new(enginepb.MVCCStats) - rec := NewReplicaEvalContext(r, g.LatchSpans()) + rec := NewReplicaEvalContext(r, g.LatchSpans(), ba.RequiresClosedTS()) batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes( ctx, idKey, rec, ms, ba, ui, g, nil /* deadline */) return batch, *ms, br, res, pErr @@ -489,7 +489,7 @@ func (r *Replica) evaluate1PC( // Is this relying on the batch being write-only? ui := uncertainty.Interval{} - rec := NewReplicaEvalContext(r, g.LatchSpans()) + rec := NewReplicaEvalContext(r, g.LatchSpans(), ba.RequiresClosedTS()) var br *roachpb.BatchResponse var res result.Result var pErr *roachpb.Error diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 4d3c9cddfa42..7208776bee80 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -95,6 +95,7 @@ const ( needsRefresh // commands which require refreshes to avoid serializable retries canBackpressure // commands which deserve backpressure when a Range grows too large bypassesReplicaCircuitBreaker // commands which bypass the replica circuit breaker, i.e. opt out of fail-fast + requiresClosedTimestamp // commands which read a replica's closed timestamp ) // flagDependencies specifies flag dependencies, asserted by TestFlagCombinations. @@ -1404,9 +1405,11 @@ func (r *RefreshRangeRequest) flags() flag { return isRead | isTxn | isRange | updatesTSCache } -func (*SubsumeRequest) flags() flag { return isRead | isAlone | updatesTSCache } +func (*SubsumeRequest) flags() flag { + return isRead | isAlone | updatesTSCache | requiresClosedTimestamp +} func (*RangeStatsRequest) flags() flag { return isRead } -func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange } +func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTimestamp } func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange } func (*BarrierRequest) flags() flag { return isWrite | isRange } diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 20d65e9d4049..837e682091a8 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -275,6 +275,12 @@ func (ba *BatchRequest) Require1PC() bool { return etArg.Require1PC } +// RequiresClosedTS returns true if the batch contains a request that needs to +// read a replica's closed timestamp. +func (ba *BatchRequest) RequiresClosedTS() bool { + return ba.hasFlag(requiresClosedTimestamp) +} + // IsSingleAbortTxnRequest returns true iff the batch contains a single request, // and that request is an EndTxnRequest(commit=false). func (ba *BatchRequest) IsSingleAbortTxnRequest() bool {