Skip to content

Commit

Permalink
kvserver: allow certain read-only requests to drop latches before eva…
Browse files Browse the repository at this point in the history
…luation

This commit introduces a change to the way certain types of read-only requests
are evaluated. Traditionally, read-only requests have held their latches
throughout their execution. This commit allows certain qualifying reads to be
able to release their latches earlier.

At a high level, reads may attempt to resolve all conflicts upfront by
performing a sort of "validation" phase before they perform their MVCC scan.
This validation phase performs a scan of the lock table keyspace in order to
find any conflicting intents that may need to be resolved before the actual
evaluation of the request over the MVCC keyspace. If no conflicting intents are
found, then (since cockroachdb#76312), the
request is guaranteed to be fully isolated against all other concurrent
requests and can be allowed to release its latches at this point. This allows
the actual evaluation of the read (over the MVCC part of the keyspace) to
proceed without latches being held, which is the main motivation of this work.
This validation phase could be thought of as an extension to the validation
that the concurrency manager already performs when requests are sequenced
through it, by trying to detect any conflicting intents that have already been
pulled into the in-memory lock table.

Additionally, for certain types of requests that can drop their latches early,
and do not need to access the `IntentHistory` for any of their parent txn's
intents, this commit attempts to make their MVCC scan cheaper by eliminating
the need for an `intentInterleavingIterator`. This is enabled by the
observation that once the validation phase is complete, the only remaining
intents in the read's declared span must be intents belonging to the reader's
transaction. So if the reader doesn't need to read an intent that isn't the
latest intent on a key, then it doesn't need access to the key's
`IntentHistory` (which lives in the lock-table keyspace), and doesn't need to
use an `intentInterleavingIterator`.

Release note (performance improvement): certain types of reads will now have a
far smaller contention footprint with conflicting concurrent writers

Resolves cockroachdb#66485

Release justification: high benefit change to existing functionality, part of
22.2 roadmap
  • Loading branch information
aayushshah15 committed Oct 4, 2022
1 parent 677ef2c commit 64c1375
Show file tree
Hide file tree
Showing 19 changed files with 796 additions and 218 deletions.
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ func declareKeysExport(
) {
DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans, maxOffset)
latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeGCThresholdKey(header.RangeID)})
// Export requests will usually not hold latches during their evaluation.
//
// See call to `AssertAllowed()` in GetGCThreshold() to understand why we need
// to disable these assertions for export requests.
latchSpans.DisableUndeclaredAccessAssertions()
}

// evalExport dumps the requested keys into files of non-overlapping key ranges
Expand Down Expand Up @@ -136,6 +141,10 @@ func evalExport(
// *revisions* since the gc threshold, so noting that in the reply allows the
// BACKUP to correctly note the supported time bounds for RESTORE AS OF SYSTEM
// TIME.
//
// NOTE: Since export requests may not be holding latches during evaluation,
// this `GetGCThreshold()` call is going to potentially return a higher GC
// threshold than the pebble state we're evaluating over. This is copacetic.
if args.MVCCFilter == roachpb.MVCCFilter_All {
reply.StartTime = cArgs.EvalCtx.GetGCThreshold()
}
Expand Down
15 changes: 8 additions & 7 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,14 @@ func Get(
var intent *roachpb.Intent
var err error
val, intent, err = storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
Uncertainty: cArgs.Uncertainty,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
Uncertainty: cArgs.Uncertainty,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
})
if err != nil {
return result.Result{}, err
Expand Down
25 changes: 13 additions & 12 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ func ReverseScan(
var err error

opts := storage.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
AllowEmpty: h.AllowEmpty,
WholeRowsOfSize: h.WholeRowsOfSize,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
AllowEmpty: h.AllowEmpty,
WholeRowsOfSize: h.WholeRowsOfSize,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
}

switch args.ScanFormat {
Expand Down
27 changes: 14 additions & 13 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,20 @@ func Scan(
var err error

opts := storage.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
Uncertainty: cArgs.Uncertainty,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
AllowEmpty: h.AllowEmpty,
WholeRowsOfSize: h.WholeRowsOfSize,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: false,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
Uncertainty: cArgs.Uncertainty,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
TargetBytes: h.TargetBytes,
AllowEmpty: h.AllowEmpty,
WholeRowsOfSize: h.WholeRowsOfSize,
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: false,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
DontInterleaveIntents: cArgs.DontInterleaveIntents,
}

switch args.ScanFormat {
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ type CommandArgs struct {
Args roachpb.Request
Now hlc.ClockTimestamp
// *Stats should be mutated to reflect any writes made by the command.
Stats *enginepb.MVCCStats
Concurrency *concurrency.Guard
Uncertainty uncertainty.Interval
Stats *enginepb.MVCCStats
Concurrency *concurrency.Guard
Uncertainty uncertainty.Interval
DontInterleaveIntents bool
}
46 changes: 22 additions & 24 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,17 +252,13 @@ func TestTxnPutOutOfOrder(t *testing.T) {
restartKey = "restart"
)
// Set up a filter to so that the get operation at Step 3 will return an error.
var numGets int32
var shouldFailGet atomic.Value

testingEvalFilter := func(filterArgs kvserverbase.FilterArgs) *roachpb.Error {
if _, ok := filterArgs.Req.(*roachpb.GetRequest); ok &&
filterArgs.Req.Header().Key.Equal(roachpb.Key(key)) &&
filterArgs.Hdr.Txn == nil {
// The Reader executes two get operations, each of which triggers two get requests
// (the first request fails and triggers txn push, and then the second request
// succeeds). Returns an error for the fourth get request to avoid timestamp cache
// update after the third get operation pushes the txn timestamp.
if atomic.AddInt32(&numGets, 1) == 4 {
if shouldFail := shouldFailGet.Load(); shouldFail != nil && shouldFail.(bool) {
return roachpb.NewErrorWithTxn(errors.Errorf("Test"), filterArgs.Hdr.Txn)
}
}
Expand Down Expand Up @@ -401,6 +397,7 @@ func TestTxnPutOutOfOrder(t *testing.T) {
manual.Increment(100)

h.Timestamp = s.Clock().Now()
shouldFailGet.Store(true)
if _, err := kv.SendWrappedWith(
context.Background(), store.TestSender(), h, &roachpb.GetRequest{RequestHeader: requestHeader},
); err == nil {
Expand Down Expand Up @@ -4493,20 +4490,6 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) {
var txn2ID atomic.Value
var txn2BBlockOnce sync.Once
txn2BlockedC := make(chan chan struct{})
postEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error {
if txn := args.Hdr.Txn; txn != nil && txn.ID == txn2ID.Load() {
txn2BBlockOnce.Do(func() {
if !errors.HasType(args.Err, (*roachpb.WriteIntentError)(nil)) {
t.Errorf("expected WriteIntentError; got %v", args.Err)
}

unblockCh := make(chan struct{})
txn2BlockedC <- unblockCh
<-unblockCh
})
}
return nil
}

// Detect when txn4 discovers txn3's intent and begins to push.
var txn4ID atomic.Value
Expand All @@ -4527,10 +4510,20 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) {
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
EvalKnobs: kvserverbase.BatchEvalTestingKnobs{
TestingPostEvalFilter: postEvalFilter,
},
TestingRequestFilter: requestFilter,
TestingConcurrencyRetryFilter: func(ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error) {
if txn := ba.Txn; txn != nil && txn.ID == txn2ID.Load() {
txn2BBlockOnce.Do(func() {
if !errors.HasType(pErr.GoError(), (*roachpb.WriteIntentError)(nil)) {
t.Errorf("expected WriteIntentError; got %v", pErr)
}

unblockCh := make(chan struct{})
txn2BlockedC <- unblockCh
<-unblockCh
})
}
},
// Required by TestCluster.MoveRangeLeaseNonCooperatively.
AllowLeaseRequestProposalsWhenNotLeader: true,
},
Expand Down Expand Up @@ -4563,7 +4556,12 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) {
_, err := txn2.Get(ctx, key)
err2C <- err
}()
txn2UnblockC := <-txn2BlockedC
var txn2UnblockC chan struct{}
select {
case txn2UnblockC = <-txn2BlockedC:
case <-time.After(10 * time.Second):
t.Fatal("timed out waiting for txn2 to block")
}

// Transfer the lease to Server 1. Do so non-cooperatively instead of using
// a lease transfer, because the cooperative lease transfer would get stuck
Expand Down
21 changes: 19 additions & 2 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,18 @@ Note that the measurement does not include the duration for replicating the eval
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaReplicaReadBatchDroppedLatchesBeforeEval = metric.Metadata{
Name: "kv.replica_read_batch_evaluate.dropped_latches_before_eval",
Help: `Number of times read-only batches dropped latches before evaluation.`,
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
metaReplicaReadBatchWithoutInterleavingIter = metric.Metadata{
Name: "kv.replica_read_batch_evaluate.without_interleaving_iter",
Help: `Number of read-only batches evaluated without an intent interleaving iter.`,
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}
)

// StoreMetrics is the set of metrics for a given store.
Expand Down Expand Up @@ -1875,8 +1887,10 @@ type StoreMetrics struct {
ReplicaCircuitBreakerCumTripped *metric.Counter

// Replica batch evaluation metrics.
ReplicaReadBatchEvaluationLatency *metric.Histogram
ReplicaWriteBatchEvaluationLatency *metric.Histogram
ReplicaReadBatchEvaluationLatency *metric.Histogram
ReplicaWriteBatchEvaluationLatency *metric.Histogram
ReplicaReadBatchDroppedLatchesBeforeEval *metric.Counter
ReplicaReadBatchWithoutInterleavingIter *metric.Counter
}

type tenantMetricsRef struct {
Expand Down Expand Up @@ -2388,6 +2402,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
// Replica batch evaluation.
ReplicaReadBatchEvaluationLatency: metric.NewLatency(metaReplicaReadBatchEvaluationLatency, histogramWindow),
ReplicaWriteBatchEvaluationLatency: metric.NewLatency(metaReplicaWriteBatchEvaluationLatency, histogramWindow),

ReplicaReadBatchDroppedLatchesBeforeEval: metric.NewCounter(metaReplicaReadBatchDroppedLatchesBeforeEval),
ReplicaReadBatchWithoutInterleavingIter: metric.NewCounter(metaReplicaReadBatchWithoutInterleavingIter),
}

{
Expand Down
89 changes: 79 additions & 10 deletions pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ package kvserver
import (
"bytes"
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
Expand Down Expand Up @@ -152,7 +154,7 @@ func evaluateBatch(
g *concurrency.Guard,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
readOnly bool,
evalPath batchEvalPath,
) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) {
defer func() {
// Ensure that errors don't carry the WriteTooOld flag set. The client
Expand All @@ -175,7 +177,7 @@ func evaluateBatch(
br := ba.CreateReply()

// Optimize any contiguous sequences of put and conditional put ops.
if len(baReqs) >= optimizePutThreshold && !readOnly {
if len(baReqs) >= optimizePutThreshold && evalPath == readWrite {
baReqs = optimizePuts(readWriter, baReqs, baHeader.DistinctSpans)
}

Expand Down Expand Up @@ -270,7 +272,8 @@ func evaluateBatch(
// may carry a response transaction and in the case of WriteTooOldError
// (which is sometimes deferred) it is fully populated.
curResult, err := evaluateCommand(
ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui)
ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui, evalPath,
)

if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil {
filterArgs := kvserverbase.FilterArgs{
Expand Down Expand Up @@ -480,6 +483,7 @@ func evaluateCommand(
g *concurrency.Guard,
st *kvserverpb.LeaseStatus,
ui uncertainty.Interval,
evalPath batchEvalPath,
) (result.Result, error) {
var err error
var pd result.Result
Expand All @@ -490,13 +494,14 @@ func evaluateCommand(
now = st.Now
}
cArgs := batcheval.CommandArgs{
EvalCtx: rec,
Header: h,
Args: args,
Now: now,
Stats: ms,
Concurrency: g,
Uncertainty: ui,
EvalCtx: rec,
Header: h,
Args: args,
Now: now,
Stats: ms,
Concurrency: g,
Uncertainty: ui,
DontInterleaveIntents: evalPath == readOnlyWithoutInterleavedIntents,
}

if cmd.EvalRW != nil {
Expand Down Expand Up @@ -607,3 +612,67 @@ func canDoServersideRetry(
}
return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp)
}

// canReadOnlyRequestDropLatchesBeforeEval determines whether the batch request
// can potentially resolve its conflicts upfront (by scanning just the lock
// table first), bump the ts cache, release latches and then proceed with
// evaluation. Only non-locking read requests that aren't being evaluated under
// the `OptimisticEval` path are eligible for this optimization.
func canReadOnlyRequestDropLatchesBeforeEval(ba *roachpb.BatchRequest, g *concurrency.Guard) bool {
if g == nil {
// NB: A nil guard indicates that the caller is not holding latches.
return false
}
switch ba.Header.ReadConsistency {
case roachpb.CONSISTENT:
// TODO(aayush): INCONSISTENT and READ_UNCOMMITTED reads do not care about
// resolving lock conflicts at all. Yet, they can still drop latches early and
// evaluate once they've pinned their pebble engine state. We should consider
// supporting this by letting these kinds of requests drop latches early while
// also skipping the initial validation step of scanning the lock table.
case roachpb.INCONSISTENT, roachpb.READ_UNCOMMITTED:
return false
default:
panic(fmt.Sprintf("unexpected ReadConsistency: %s", ba.Header.ReadConsistency))
}
switch g.EvalKind {
case concurrency.PessimisticEval, concurrency.PessimisticAfterFailedOptimisticEval:
case concurrency.OptimisticEval:
// Requests going through the optimistic path are not allowed to drop their
// latches before evaluation since we do not know upfront the extent to
// which they will end up reading, and thus we cannot determine how much of
// the timestamp cache to update.
return false
default:
panic(fmt.Sprintf("unexpected EvalKind: %v", g.EvalKind))
}
// Only non-locking reads are eligible. This is because requests that need to
// lock the keys that they end up reading need to be isolated against other
// conflicting requests during their execution. Thus, they cannot release
// their latches before evaluation.
if ba.IsLocking() {
return false
}
switch ba.WaitPolicy {
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
case lock.WaitPolicy_SkipLocked:
// SkipLocked requests should only bump the timestamp cache over the keys
// that they actually ended up reading, and not the keys they ended up
// skipping over. Thus, they are not allowed to drop their latches before
// evaluation.
return false
default:
panic(fmt.Sprintf("unexpected WaitPolicy: %s", ba.WaitPolicy))
}
// We allow all non-locking, pessimistically evaluating read requests to try
// and resolve their conflicts upfront.
for _, req := range ba.Requests {
inner := req.GetInner()
switch inner.(type) {
case *roachpb.ExportRequest, *roachpb.GetRequest, *roachpb.ScanRequest, *roachpb.ReverseScanRequest:
default:
return false
}
}
return true
}
Loading

0 comments on commit 64c1375

Please sign in to comment.