Skip to content

Commit

Permalink
kvserver: make most requests bypass closed timestamp computation
Browse files Browse the repository at this point in the history
In #76312, we're making it such that a replica's closed timestamp is computed
during the creation of the batch request's evaluation context. This is to
ensure that a replica's closed timestamp is captured before we pin the state of
its storage engine (since otherwise, the closed timestamp could be _newer_ than
the state of the engine the command is evaluating over). See: #70974.

However, computing the replica's current closed timestamp on every command
evaluation can be expensive due to the mutual exclusion involved (it requires
that we hold `Replica.mu` as well as a few locks inside the closed timestamp
side transport). Since only the `QueryResolvedTimestampRequest` and
`SubsumeRequest` need to read a replica's closed timestamp, this commit makes
it such that we elide this work for every other request.

Release note: None
  • Loading branch information
aayushshah15 committed Mar 26, 2022
1 parent 254142f commit 5795da6
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 19 deletions.
32 changes: 25 additions & 7 deletions pkg/kv/kvserver/replica_eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,34 @@ var todoSpanSet = &spanset.SpanSet{}
// evalContextImpl implements the batcheval.EvalContext interface.
type evalContextImpl struct {
*Replica
closedTS hlc.Timestamp
// NB: We cannot use the emptiness of `closedTS` to determine whether the
// closed timestamp was elided during the creation of this eval context, so we
// track it separately.
closedTSElided bool
closedTS hlc.Timestamp
}

func newEvalContextImpl(r *Replica) *evalContextImpl {
func newEvalContextImpl(r *Replica, requireClosedTS bool) *evalContextImpl {
var closedTS hlc.Timestamp
if requireClosedTS {
// We elide this call to get the replica's current closed timestamp unless
// the request requires it, in order to avoid redundant mutex contention.
closedTS = r.GetCurrentClosedTimestamp(context.Background())
}

return &evalContextImpl{
Replica: r,
closedTS: r.GetCurrentClosedTimestamp(context.TODO()),
Replica: r,
closedTSElided: !requireClosedTS,
closedTS: closedTS,
}
}

// GetClosedTimestamp implements the EvalContext interface.
func (ec *evalContextImpl) GetClosedTimestamp(_ context.Context) hlc.Timestamp {
func (ec *evalContextImpl) GetClosedTimestamp() hlc.Timestamp {
if ec.closedTSElided {
panic("closed timestamp was elided during eval context creation; does the" +
" request set the requiresClosedTimestamp flag?")
}
return ec.closedTS
}

Expand All @@ -53,12 +69,14 @@ var _ batcheval.EvalContext = &evalContextImpl{}
// evaluation. The supplied SpanSet will be ignored except for race builds, in
// which case state access is asserted against it. A SpanSet must always be
// passed.
func NewReplicaEvalContext(r *Replica, ss *spanset.SpanSet) batcheval.EvalContext {
func NewReplicaEvalContext(
r *Replica, ss *spanset.SpanSet, requireClosedTS bool,
) batcheval.EvalContext {
if ss == nil {
log.Fatalf(r.AnnotateCtx(context.Background()), "can't create a ReplicaEvalContext with assertions but no SpanSet")
}

ec := newEvalContextImpl(r)
ec := newEvalContextImpl(r, requireClosedTS)
if util.RaceEnabled {
return &SpanSetReplicaEvalContext{
i: ec,
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked(
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(span)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
rec := NewReplicaEvalContext(r, todoSpanSet, false /* requireClosedTS */)
rw := r.Engine().NewReadOnly(storage.StandardDurability)
defer rw.Close()

Expand Down Expand Up @@ -217,7 +217,7 @@ func (r *Replica) loadSystemConfig(ctx context.Context) (*config.SystemConfigEnt
ba.Timestamp = r.store.Clock().Now()
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(keys.SystemConfigSpan)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(r, todoSpanSet)
rec := NewReplicaEvalContext(r, todoSpanSet, false /* requireClosedTS */)
rw := r.Engine().NewReadOnly(storage.StandardDurability)
defer rw.Close()

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *Replica) executeReadOnlyBatch(
ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset())

// Evaluate read-only batch command.
rec := NewReplicaEvalContext(r, g.LatchSpans())
rec := NewReplicaEvalContext(r, g.LatchSpans(), ba.RequiresClosedTS())

// TODO(irfansharif): It's unfortunate that in this read-only code path,
// we're stuck with a ReadWriter because of the way evaluateBatch is
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func TestReplicaLease(t *testing.T) {
} {
if _, err := batcheval.RequestLease(ctx, tc.store.Engine(),
batcheval.CommandArgs{
EvalCtx: NewReplicaEvalContext(tc.repl, allSpans()),
EvalCtx: NewReplicaEvalContext(tc.repl, allSpans(), false /* requireClosedTS */),
Args: &roachpb.RequestLeaseRequest{
Lease: lease,
},
Expand Down Expand Up @@ -4976,7 +4976,7 @@ func TestEndTxnDirectGC(t *testing.T) {
var gr roachpb.GetResponse
if _, err := batcheval.Get(
ctx, tc.engine, batcheval.CommandArgs{
EvalCtx: NewReplicaEvalContext(tc.repl, allSpans()),
EvalCtx: NewReplicaEvalContext(tc.repl, allSpans(), false /* requireClosedTS */),
Args: &roachpb.GetRequest{RequestHeader: roachpb.RequestHeader{
Key: keys.TransactionKey(txn.Key, txn.ID),
}},
Expand Down Expand Up @@ -5359,7 +5359,7 @@ func TestAbortSpanError(t *testing.T) {
t.Fatal(err)
}

rec := &SpanSetReplicaEvalContext{newEvalContextImpl(tc.repl), *allSpans()}
rec := &SpanSetReplicaEvalContext{newEvalContextImpl(tc.repl, false /* requireClosedTS */), *allSpans()}
pErr := checkIfTxnAborted(ctx, rec, tc.engine, txn)
if _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {
expected := txn.Clone()
Expand Down Expand Up @@ -5769,7 +5769,10 @@ func TestResolveIntentPushTxnReplyTxn(t *testing.T) {
// return args.PusherTxn.
h = roachpb.Header{Timestamp: tc.Clock().Now()}
var reply roachpb.PushTxnResponse
if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: newEvalContextImpl(tc.repl), Stats: &ms, Header: h, Args: &pa}, &reply); err != nil {
if _, err := batcheval.PushTxn(ctx, b, batcheval.CommandArgs{EvalCtx: newEvalContextImpl(
tc.repl,
false, /* requireClosedTS */
), Stats: &ms, Header: h, Args: &pa}, &reply); err != nil {
t.Fatal(err)
} else if reply.Txn != nil {
t.Fatalf("expected nil response txn, but got %s", reply.Txn)
Expand Down Expand Up @@ -8457,7 +8460,7 @@ func TestGCWithoutThreshold(t *testing.T) {

if _, err := batcheval.GC(ctx, rw, batcheval.CommandArgs{
Args: &gc,
EvalCtx: NewReplicaEvalContext(tc.repl, &spans),
EvalCtx: NewReplicaEvalContext(tc.repl, &spans, false /* requireClosedTS */),
}, &resp); err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (r *Replica) evaluateWriteBatch(
}

ms := new(enginepb.MVCCStats)
rec := NewReplicaEvalContext(r, g.LatchSpans())
rec := NewReplicaEvalContext(r, g.LatchSpans(), ba.RequiresClosedTS())
batch, br, res, pErr := r.evaluateWriteBatchWithServersideRefreshes(
ctx, idKey, rec, ms, ba, ui, g, nil /* deadline */)
return batch, *ms, br, res, pErr
Expand Down Expand Up @@ -489,7 +489,7 @@ func (r *Replica) evaluate1PC(
// Is this relying on the batch being write-only?
ui := uncertainty.Interval{}

rec := NewReplicaEvalContext(r, g.LatchSpans())
rec := NewReplicaEvalContext(r, g.LatchSpans(), ba.RequiresClosedTS())
var br *roachpb.BatchResponse
var res result.Result
var pErr *roachpb.Error
Expand Down
7 changes: 5 additions & 2 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const (
needsRefresh // commands which require refreshes to avoid serializable retries
canBackpressure // commands which deserve backpressure when a Range grows too large
bypassesReplicaCircuitBreaker // commands which bypass the replica circuit breaker, i.e. opt out of fail-fast
requiresClosedTimestamp // commands which read a replica's closed timestamp
)

// flagDependencies specifies flag dependencies, asserted by TestFlagCombinations.
Expand Down Expand Up @@ -1404,9 +1405,11 @@ func (r *RefreshRangeRequest) flags() flag {
return isRead | isTxn | isRange | updatesTSCache
}

func (*SubsumeRequest) flags() flag { return isRead | isAlone | updatesTSCache }
func (*SubsumeRequest) flags() flag {
return isRead | isAlone | updatesTSCache | requiresClosedTimestamp
}
func (*RangeStatsRequest) flags() flag { return isRead }
func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange }
func (*QueryResolvedTimestampRequest) flags() flag { return isRead | isRange | requiresClosedTimestamp }
func (*ScanInterleavedIntentsRequest) flags() flag { return isRead | isRange }
func (*BarrierRequest) flags() flag { return isWrite | isRange }

Expand Down
6 changes: 6 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ func (ba *BatchRequest) Require1PC() bool {
return etArg.Require1PC
}

// RequiresClosedTS returns true if the batch contains a request that needs to
// read a replica's closed timestamp.
func (ba *BatchRequest) RequiresClosedTS() bool {
return ba.hasFlag(requiresClosedTimestamp)
}

// IsSingleAbortTxnRequest returns true iff the batch contains a single request,
// and that request is an EndTxnRequest(commit=false).
func (ba *BatchRequest) IsSingleAbortTxnRequest() bool {
Expand Down

0 comments on commit 5795da6

Please sign in to comment.