diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index bdc17bb136c4..b00afd51f755 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -91,6 +91,11 @@ func declareKeysExport( ) { DefaultDeclareIsolatedKeys(rs, header, req, latchSpans, lockSpans, maxOffset) latchSpans.AddNonMVCC(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeGCThresholdKey(header.RangeID)}) + // Export requests will usually not hold latches during their evaluation. + // + // See call to `AssertAllowed()` in GetGCThreshold() to understand why we need + // to disable these assertions for export requests. + latchSpans.DisableUndeclaredAccessAssertions() } // evalExport dumps the requested keys into files of non-overlapping key ranges @@ -136,6 +141,10 @@ func evalExport( // *revisions* since the gc threshold, so noting that in the reply allows the // BACKUP to correctly note the supported time bounds for RESTORE AS OF SYSTEM // TIME. + // + // NOTE: Since export requests may not be holding latches during evaluation, + // this `GetGCThreshold()` call is going to potentially return a higher GC + // threshold than the pebble state we're evaluating over. This is copacetic. if args.MVCCFilter == roachpb.MVCCFilter_All { reply.StartTime = cArgs.EvalCtx.GetGCThreshold() } diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index 78b270643e55..8a6506ce11aa 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -53,13 +53,14 @@ func Get( var intent *roachpb.Intent var err error val, intent, err = storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - FailOnMoreRecent: args.KeyLocking != lock.None, - Uncertainty: cArgs.Uncertainty, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + FailOnMoreRecent: args.KeyLocking != lock.None, + Uncertainty: cArgs.Uncertainty, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, }) if err != nil { return result.Result{}, err diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index cb83a622c4f5..8ae129f7976e 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -40,18 +40,19 @@ func ReverseScan( var err error opts := storage.MVCCScanOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), - TargetBytes: h.TargetBytes, - AllowEmpty: h.AllowEmpty, - WholeRowsOfSize: h.WholeRowsOfSize, - FailOnMoreRecent: args.KeyLocking != lock.None, - Reverse: true, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + MaxKeys: h.MaxSpanRequestKeys, + MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + TargetBytes: h.TargetBytes, + AllowEmpty: h.AllowEmpty, + WholeRowsOfSize: h.WholeRowsOfSize, + FailOnMoreRecent: args.KeyLocking != lock.None, + Reverse: true, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, } switch args.ScanFormat { diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index ffac770c5232..19f2f6aa48c4 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -40,19 +40,20 @@ func Scan( var err error opts := storage.MVCCScanOptions{ - Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, - SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, - Txn: h.Txn, - Uncertainty: cArgs.Uncertainty, - MaxKeys: h.MaxSpanRequestKeys, - MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), - TargetBytes: h.TargetBytes, - AllowEmpty: h.AllowEmpty, - WholeRowsOfSize: h.WholeRowsOfSize, - FailOnMoreRecent: args.KeyLocking != lock.None, - Reverse: false, - MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), - LockTable: cArgs.Concurrency, + Inconsistent: h.ReadConsistency != roachpb.CONSISTENT, + SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked, + Txn: h.Txn, + Uncertainty: cArgs.Uncertainty, + MaxKeys: h.MaxSpanRequestKeys, + MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV), + TargetBytes: h.TargetBytes, + AllowEmpty: h.AllowEmpty, + WholeRowsOfSize: h.WholeRowsOfSize, + FailOnMoreRecent: args.KeyLocking != lock.None, + Reverse: false, + MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(), + LockTable: cArgs.Concurrency, + DontInterleaveIntents: cArgs.DontInterleaveIntents, } switch args.ScanFormat { diff --git a/pkg/kv/kvserver/batcheval/declare.go b/pkg/kv/kvserver/batcheval/declare.go index 52979d9d808f..2e230d3b20a5 100644 --- a/pkg/kv/kvserver/batcheval/declare.go +++ b/pkg/kv/kvserver/batcheval/declare.go @@ -121,7 +121,8 @@ type CommandArgs struct { Args roachpb.Request Now hlc.ClockTimestamp // *Stats should be mutated to reflect any writes made by the command. - Stats *enginepb.MVCCStats - Concurrency *concurrency.Guard - Uncertainty uncertainty.Interval + Stats *enginepb.MVCCStats + Concurrency *concurrency.Guard + Uncertainty uncertainty.Interval + DontInterleaveIntents bool } diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index e17bd407638f..b2a56e8f0dd9 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -252,17 +252,13 @@ func TestTxnPutOutOfOrder(t *testing.T) { restartKey = "restart" ) // Set up a filter to so that the get operation at Step 3 will return an error. - var numGets int32 + var shouldFailGet atomic.Value testingEvalFilter := func(filterArgs kvserverbase.FilterArgs) *roachpb.Error { if _, ok := filterArgs.Req.(*roachpb.GetRequest); ok && filterArgs.Req.Header().Key.Equal(roachpb.Key(key)) && filterArgs.Hdr.Txn == nil { - // The Reader executes two get operations, each of which triggers two get requests - // (the first request fails and triggers txn push, and then the second request - // succeeds). Returns an error for the fourth get request to avoid timestamp cache - // update after the third get operation pushes the txn timestamp. - if atomic.AddInt32(&numGets, 1) == 4 { + if shouldFail := shouldFailGet.Load(); shouldFail != nil && shouldFail.(bool) { return roachpb.NewErrorWithTxn(errors.Errorf("Test"), filterArgs.Hdr.Txn) } } @@ -401,6 +397,7 @@ func TestTxnPutOutOfOrder(t *testing.T) { manual.Increment(100) h.Timestamp = s.Clock().Now() + shouldFailGet.Store(true) if _, err := kv.SendWrappedWith( context.Background(), store.TestSender(), h, &roachpb.GetRequest{RequestHeader: requestHeader}, ); err == nil { @@ -4493,20 +4490,6 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { var txn2ID atomic.Value var txn2BBlockOnce sync.Once txn2BlockedC := make(chan chan struct{}) - postEvalFilter := func(args kvserverbase.FilterArgs) *roachpb.Error { - if txn := args.Hdr.Txn; txn != nil && txn.ID == txn2ID.Load() { - txn2BBlockOnce.Do(func() { - if !errors.HasType(args.Err, (*roachpb.WriteIntentError)(nil)) { - t.Errorf("expected WriteIntentError; got %v", args.Err) - } - - unblockCh := make(chan struct{}) - txn2BlockedC <- unblockCh - <-unblockCh - }) - } - return nil - } // Detect when txn4 discovers txn3's intent and begins to push. var txn4ID atomic.Value @@ -4527,10 +4510,20 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{ Store: &kvserver.StoreTestingKnobs{ - EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ - TestingPostEvalFilter: postEvalFilter, - }, TestingRequestFilter: requestFilter, + TestingConcurrencyRetryFilter: func(ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error) { + if txn := ba.Txn; txn != nil && txn.ID == txn2ID.Load() { + txn2BBlockOnce.Do(func() { + if !errors.HasType(pErr.GoError(), (*roachpb.WriteIntentError)(nil)) { + t.Errorf("expected WriteIntentError; got %v", pErr) + } + + unblockCh := make(chan struct{}) + txn2BlockedC <- unblockCh + <-unblockCh + }) + } + }, // Required by TestCluster.MoveRangeLeaseNonCooperatively. AllowLeaseRequestProposalsWhenNotLeader: true, }, @@ -4563,7 +4556,12 @@ func TestDiscoverIntentAcrossLeaseTransferAwayAndBack(t *testing.T) { _, err := txn2.Get(ctx, key) err2C <- err }() - txn2UnblockC := <-txn2BlockedC + var txn2UnblockC chan struct{} + select { + case txn2UnblockC = <-txn2BlockedC: + case <-time.After(10 * time.Second): + t.Fatal("timed out waiting for txn2 to block") + } // Transfer the lease to Server 1. Do so non-cooperatively instead of using // a lease transfer, because the cooperative lease transfer would get stuck diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index acb3fcf5189e..b26580f637b7 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -1593,6 +1593,18 @@ Note that the measurement does not include the duration for replicating the eval Measurement: "Nanoseconds", Unit: metric.Unit_NANOSECONDS, } + metaReplicaReadBatchDroppedLatchesBeforeEval = metric.Metadata{ + Name: "kv.replica_read_batch_evaluate.dropped_latches_before_eval", + Help: `Number of times read-only batches dropped latches before evaluation.`, + Measurement: "Batches", + Unit: metric.Unit_COUNT, + } + metaReplicaReadBatchWithoutInterleavingIter = metric.Metadata{ + Name: "kv.replica_read_batch_evaluate.without_interleaving_iter", + Help: `Number of read-only batches evaluated without an intent interleaving iter.`, + Measurement: "Batches", + Unit: metric.Unit_COUNT, + } ) // StoreMetrics is the set of metrics for a given store. @@ -1875,8 +1887,10 @@ type StoreMetrics struct { ReplicaCircuitBreakerCumTripped *metric.Counter // Replica batch evaluation metrics. - ReplicaReadBatchEvaluationLatency *metric.Histogram - ReplicaWriteBatchEvaluationLatency *metric.Histogram + ReplicaReadBatchEvaluationLatency *metric.Histogram + ReplicaWriteBatchEvaluationLatency *metric.Histogram + ReplicaReadBatchDroppedLatchesBeforeEval *metric.Counter + ReplicaReadBatchWithoutInterleavingIter *metric.Counter } type tenantMetricsRef struct { @@ -2388,6 +2402,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // Replica batch evaluation. ReplicaReadBatchEvaluationLatency: metric.NewLatency(metaReplicaReadBatchEvaluationLatency, histogramWindow), ReplicaWriteBatchEvaluationLatency: metric.NewLatency(metaReplicaWriteBatchEvaluationLatency, histogramWindow), + + ReplicaReadBatchDroppedLatchesBeforeEval: metric.NewCounter(metaReplicaReadBatchDroppedLatchesBeforeEval), + ReplicaReadBatchWithoutInterleavingIter: metric.NewCounter(metaReplicaReadBatchWithoutInterleavingIter), } { diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 68507e3dd765..48c85b745839 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -13,10 +13,12 @@ package kvserver import ( "bytes" "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" @@ -152,7 +154,7 @@ func evaluateBatch( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, - readOnly bool, + evalPath batchEvalPath, ) (_ *roachpb.BatchResponse, _ result.Result, retErr *roachpb.Error) { defer func() { // Ensure that errors don't carry the WriteTooOld flag set. The client @@ -175,7 +177,7 @@ func evaluateBatch( br := ba.CreateReply() // Optimize any contiguous sequences of put and conditional put ops. - if len(baReqs) >= optimizePutThreshold && !readOnly { + if len(baReqs) >= optimizePutThreshold && evalPath == readWrite { baReqs = optimizePuts(readWriter, baReqs, baHeader.DistinctSpans) } @@ -270,7 +272,8 @@ func evaluateBatch( // may carry a response transaction and in the case of WriteTooOldError // (which is sometimes deferred) it is fully populated. curResult, err := evaluateCommand( - ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui) + ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui, evalPath, + ) if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil { filterArgs := kvserverbase.FilterArgs{ @@ -480,6 +483,7 @@ func evaluateCommand( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, + evalPath batchEvalPath, ) (result.Result, error) { var err error var pd result.Result @@ -490,13 +494,14 @@ func evaluateCommand( now = st.Now } cArgs := batcheval.CommandArgs{ - EvalCtx: rec, - Header: h, - Args: args, - Now: now, - Stats: ms, - Concurrency: g, - Uncertainty: ui, + EvalCtx: rec, + Header: h, + Args: args, + Now: now, + Stats: ms, + Concurrency: g, + Uncertainty: ui, + DontInterleaveIntents: evalPath == readOnlyWithoutInterleavedIntents, } if cmd.EvalRW != nil { @@ -607,3 +612,67 @@ func canDoServersideRetry( } return tryBumpBatchTimestamp(ctx, ba, g, newTimestamp) } + +// canReadOnlyRequestDropLatchesBeforeEval determines whether the batch request +// can potentially resolve its conflicts upfront (by scanning just the lock +// table first), bump the ts cache, release latches and then proceed with +// evaluation. Only non-locking read requests that aren't being evaluated under +// the `OptimisticEval` path are eligible for this optimization. +func canReadOnlyRequestDropLatchesBeforeEval(ba *roachpb.BatchRequest, g *concurrency.Guard) bool { + if g == nil { + // NB: A nil guard indicates that the caller is not holding latches. + return false + } + switch ba.Header.ReadConsistency { + case roachpb.CONSISTENT: + // TODO(aayush): INCONSISTENT and READ_UNCOMMITTED reads do not care about + // resolving lock conflicts at all. Yet, they can still drop latches early and + // evaluate once they've pinned their pebble engine state. We should consider + // supporting this by letting these kinds of requests drop latches early while + // also skipping the initial validation step of scanning the lock table. + case roachpb.INCONSISTENT, roachpb.READ_UNCOMMITTED: + return false + default: + panic(fmt.Sprintf("unexpected ReadConsistency: %s", ba.Header.ReadConsistency)) + } + switch g.EvalKind { + case concurrency.PessimisticEval, concurrency.PessimisticAfterFailedOptimisticEval: + case concurrency.OptimisticEval: + // Requests going through the optimistic path are not allowed to drop their + // latches before evaluation since we do not know upfront the extent to + // which they will end up reading, and thus we cannot determine how much of + // the timestamp cache to update. + return false + default: + panic(fmt.Sprintf("unexpected EvalKind: %v", g.EvalKind)) + } + // Only non-locking reads are eligible. This is because requests that need to + // lock the keys that they end up reading need to be isolated against other + // conflicting requests during their execution. Thus, they cannot release + // their latches before evaluation. + if ba.IsLocking() { + return false + } + switch ba.WaitPolicy { + case lock.WaitPolicy_Block, lock.WaitPolicy_Error: + case lock.WaitPolicy_SkipLocked: + // SkipLocked requests should only bump the timestamp cache over the keys + // that they actually ended up reading, and not the keys they ended up + // skipping over. Thus, they are not allowed to drop their latches before + // evaluation. + return false + default: + panic(fmt.Sprintf("unexpected WaitPolicy: %s", ba.WaitPolicy)) + } + // We allow all non-locking, pessimistically evaluating read requests to try + // and resolve their conflicts upfront. + for _, req := range ba.Requests { + inner := req.GetInner() + switch inner.(type) { + case *roachpb.ExportRequest, *roachpb.GetRequest, *roachpb.ScanRequest, *roachpb.ReverseScanRequest: + default: + return false + } + } + return true +} diff --git a/pkg/kv/kvserver/replica_evaluate_test.go b/pkg/kv/kvserver/replica_evaluate_test.go index 312bde10a818..31f0241e2eba 100644 --- a/pkg/kv/kvserver/replica_evaluate_test.go +++ b/pkg/kv/kvserver/replica_evaluate_test.go @@ -674,6 +674,10 @@ func TestEvaluateBatch(t *testing.T) { var r resp r.d = d + evalPath := readWrite + if d.readOnly { + evalPath = readOnlyDefault + } r.br, r.res, r.pErr = evaluateBatch( ctx, d.idKey, @@ -681,10 +685,10 @@ func TestEvaluateBatch(t *testing.T) { d.MockEvalCtx.EvalContext(), &d.ms, &d.ba, - nil, /* g */ - nil, /* st */ + nil, + nil, uncertainty.Interval{}, - d.readOnly, + evalPath, ) tc.check(t, r) diff --git a/pkg/kv/kvserver/replica_gossip.go b/pkg/kv/kvserver/replica_gossip.go index 96c40a425b30..d2687dd149e5 100644 --- a/pkg/kv/kvserver/replica_gossip.go +++ b/pkg/kv/kvserver/replica_gossip.go @@ -97,7 +97,10 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked( defer rw.Close() br, result, pErr := - evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, &ba, nil /* g */, nil /* st */, uncertainty.Interval{}, true /* readOnly */) + evaluateBatch( + ctx, kvserverbase.CmdIDKey(""), rw, rec, nil /* ms */, &ba, + nil /* g */, nil /* st */, uncertainty.Interval{}, readOnlyDefault, + ) if pErr != nil { return errors.Wrapf(pErr.GoError(), "couldn't scan node liveness records in span %s", span) } diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index d0bc0f20e1b9..0b1c5a1f5437 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -23,12 +23,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" "github.com/kr/pretty" ) @@ -79,27 +81,40 @@ func (r *Replica) executeReadOnlyBatch( if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil { return nil, g, nil, roachpb.NewError(err) } - // TODO(nvanbenschoten): once all replicated intents are pulled into the - // concurrency manager's lock-table, we can be sure that if we reached this - // point, we will not conflict with any of them during evaluation. This in - // turn means that we can bump the timestamp cache *before* evaluation - // without risk of starving writes. Once we start doing that, we're free to - // release latches immediately after we acquire an engine iterator as long - // as we're performing a non-locking read. Note that this also requires that - // the request is not being optimistically evaluated (optimistic evaluation - // does not wait for latches or check locks). It would also be nice, but not - // required for correctness, that the read-only engine eagerly create an - // iterator (that is later cloned) while the latches are held, so that this - // request does not "see" the effect of any later requests that happen after - // the latches are released. + ok, stillNeedsInterleavedIntents, pErr := r.canDropLatchesBeforeEval(ctx, rw, ba, g, st) + if pErr != nil { + return nil, g, nil, pErr + } + evalPath := readOnlyDefault + if ok { + // Since the concurrency manager has sequenced this request all the intents + // that are in the concurrency manager's lock table, and we've scanned the + // replicated lock-table keyspace above in `canDropLatchesBeforeEval`, we + // can be sure that if we reached this point, we will not conflict with any + // of them during evaluation. This in turn means that we can bump the + // timestamp cache *before* evaluation without risk of starving writes. + // Consequently, we're free to release latches here since we've acquired a + // pebble iterator as long as we're performing a non-locking read (also + // checked in `canDropLatchesBeforeEval`). Note that this also requires that + // the request is not being optimistically evaluated (optimistic evaluation + // does not wait for latches or check locks). + log.VEventf(ctx, 3, "lock table scan complete without conflicts; dropping latches early") + r.store.metrics.ReplicaReadBatchDroppedLatchesBeforeEval.Inc(1) + if !stillNeedsInterleavedIntents { + r.store.metrics.ReplicaReadBatchWithoutInterleavingIter.Inc(1) + evalPath = readOnlyWithoutInterleavedIntents + } + r.updateTimestampCacheAndDropLatches(ctx, g, ba, nil /* br */, nil /* pErr */, st) + g = nil + } var result result.Result - br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui) + br, result, pErr = r.executeReadOnlyBatchWithServersideRefreshes(ctx, rw, rec, ba, g, &st, ui, evalPath) // If the request hit a server-side concurrency retry error, immediately // propagate the error. Don't assume ownership of the concurrency guard. if isConcurrencyRetryError(pErr) { - if g.EvalKind == concurrency.OptimisticEval { + if g != nil && g.EvalKind == concurrency.OptimisticEval { // Since this request was not holding latches, it could have raced with // intent resolution. So we can't trust it to add discovered locks, if // there is a latch conflict. This means that a discovered lock plus a @@ -120,7 +135,7 @@ func (r *Replica) executeReadOnlyBatch( return nil, g, nil, pErr } - if g.EvalKind == concurrency.OptimisticEval { + if g != nil && g.EvalKind == concurrency.OptimisticEval { if pErr == nil { // Gather the spans that were read -- we distinguish the spans in the // request from the spans that were actually read, using resume spans in @@ -150,17 +165,11 @@ func (r *Replica) executeReadOnlyBatch( if pErr == nil { pErr = r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local) } - - // Otherwise, update the timestamp cache and release the concurrency guard. - // Note: - // - The update to the timestamp cache is not gated on pErr == nil, - // since certain semantic errors (e.g. ConditionFailedError on CPut) - // require updating the timestamp cache (see updatesTSCacheOnErr). - // - For optimistic evaluation, used for limited scans, the update to the - // timestamp cache limits itself to the spans that were read, by using - // the ResumeSpans. - ec, g := endCmds{repl: r, g: g, st: st}, nil - ec.done(ctx, ba, br, pErr) + if g != nil { + // If we didn't already drop latches earlier, do so now. + r.updateTimestampCacheAndDropLatches(ctx, g, ba, br, pErr, st) + g = nil + } // Semi-synchronously process any intents that need resolving here in // order to apply back pressure on the client which generated them. The @@ -180,7 +189,11 @@ func (r *Replica) executeReadOnlyBatch( // prohibits any concurrent requests for the same range. See #17760. allowSyncProcessing := ba.ReadConsistency == roachpb.CONSISTENT && ba.WaitPolicy != lock.WaitPolicy_SkipLocked - if err := r.store.intentResolver.CleanupIntentsAsync(ctx, intents, allowSyncProcessing); err != nil { + if err := r.store.intentResolver.CleanupIntentsAsync( + ctx, + intents, + allowSyncProcessing, + ); err != nil { log.Warningf(ctx, "%v", err) } } @@ -196,6 +209,109 @@ func (r *Replica) executeReadOnlyBatch( return br, nil, nil, pErr } +// updateTimestampCacheAndDropLatches updates the timestamp cache and releases +// the concurrency guard. +// Note: +// - If `br` is nil, then this method assumes that latches are being released +// before evaluation of the request, and the timestamp cache is updated based +// only on the spans declared in the request. +// - The update to the timestamp cache is not gated on pErr == nil, since +// certain semantic errors (e.g. ConditionFailedError on CPut) require updating +// the timestamp cache (see updatesTSCacheOnErr). +// - For optimistic evaluation, used for limited scans, the update to the +// timestamp cache limits itself to the spans that were read, by using the +// ResumeSpans. +func (r *Replica) updateTimestampCacheAndDropLatches( + ctx context.Context, + g *concurrency.Guard, + ba *roachpb.BatchRequest, + br *roachpb.BatchResponse, + pErr *roachpb.Error, + st kvserverpb.LeaseStatus, +) { + ec := endCmds{repl: r, g: g, st: st} + ec.done(ctx, ba, br, pErr) +} + +var allowDroppingLatchesBeforeEval = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.transaction.dropping_latches_before_eval.enabled", + "if enabled, allows certain read-only KV requests to drop latches before they evaluate", + true, +) + +// canDropLatchesBeforeEval determines whether a given batch request can proceed +// with evaluation without continuing to hold onto its latches[1] and if so, +// whether the evaluation of the requests in the batch needs an intent +// interleaving iterator[2]. +// +// [1] whether the request can safely release latches at this point in the +// execution. +// For certain qualifying types of requests (certain types of read-only +// requests: see `canReadOnlyRequestDropLatchesBeforeEval`), this method +// performs a scan of the lock table keyspace corresponding to the latch spans +// declared by the BatchRequest. +// If no conflicting intents are found, then it is deemed safe for this request +// to release its latches at this point. This is because read-only requests +// evaluate over a stable pebble snapshot (see the call to +// `PinEngineStateForIterators` in `executeReadOnlyBatch`), so if there are no +// lock conflicts, the rest of the execution is guaranteed to be isolated from +// the effects of other requests. +// If any conflicting intents are found, then it returns a WriteIntentError +// which needs to be handled by the caller before proceeding. +// +// [2] if the request can drop its latches early, whether it needs an intent +// interleaving iterator to perform its evaluation. +// If the aforementioned lock table scan determines that any of the requests in +// the batch may need access to the intent history of a key, then an intent +// interleaving iterator is needed to perform the evaluation. +func (r *Replica) canDropLatchesBeforeEval( + ctx context.Context, + rw storage.ReadWriter, + ba *roachpb.BatchRequest, + g *concurrency.Guard, + st kvserverpb.LeaseStatus, +) (ok, stillNeedsIntentInterleaving bool, pErr *roachpb.Error) { + if !allowDroppingLatchesBeforeEval.Get(&r.store.cfg.Settings.SV) || + !canReadOnlyRequestDropLatchesBeforeEval(ba, g) { + // If the request does not qualify, we neither drop latches nor use a + // non-interleaving iterator. + return false /* ok */, true /* stillNeedsIntentInterleaving */, nil + } + + log.VEventf( + ctx, 3, "can drop latches early for batch (%v); scanning lock table first to detect conflicts", ba, + ) + + maxIntents := storage.MaxIntentsPerWriteIntentError.Get(&r.store.cfg.Settings.SV) + var intents []roachpb.Intent + // Check if any of the requests within the batch need to resolve any intents + // or if any of them need to use an intent interleaving iterator. + for _, req := range ba.Requests { + start, end := req.GetInner().Header().Key, req.GetInner().Header().EndKey + needsIntentInterleavingForThisRequest, err := storage.ScanConflictingIntents( + ctx, rw, ba.Txn, ba.Header.Timestamp, start, end, &intents, maxIntents, + ) + if err != nil { + return false /* ok */, true /* stillNeedsIntentInterleaving */, roachpb.NewError( + errors.Wrap(err, "scanning intents"), + ) + } + stillNeedsIntentInterleaving = stillNeedsIntentInterleaving || needsIntentInterleavingForThisRequest + if maxIntents != 0 && int64(len(intents)) >= maxIntents { + break + } + } + if len(intents) > 0 { + return false /* ok */, false /* stillNeedsIntentInterleaving */, maybeAttachLease( + roachpb.NewError(&roachpb.WriteIntentError{Intents: intents}), &st.Lease, + ) + } + // If there were no conflicts, then the request can drop its latches and + // proceed with evaluation. + return true /* ok */, stillNeedsIntentInterleaving, nil +} + // evalContextWithAccount wraps an EvalContext to provide a non-nil // mon.BoundAccount. This wrapping is conditional on various factors, and // specific to a request (see executeReadOnlyBatchWithServersideRefreshes), @@ -244,6 +360,19 @@ func (e evalContextWithAccount) GetResponseMemoryAccount() *mon.BoundAccount { return e.memAccount } +// batchEvalPath enumerates the different evaluation paths that can be taken by +// a batch. +type batchEvalPath int + +const ( + // readOnlyDefault is the default evaluation path taken by read only requests. + readOnlyDefault batchEvalPath = iota + // readOnlyWithoutInterleavedIntents indicates that the request does not need + // an intent interleaving iterator during its evaluation. + readOnlyWithoutInterleavedIntents + readWrite +) + // executeReadOnlyBatchWithServersideRefreshes invokes evaluateBatch and retries // at a higher timestamp in the event of some retriable errors if allowed by the // batch/txn. @@ -255,6 +384,7 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( g *concurrency.Guard, st *kvserverpb.LeaseStatus, ui uncertainty.Interval, + evalPath batchEvalPath, ) (br *roachpb.BatchResponse, res result.Result, pErr *roachpb.Error) { log.Event(ctx, "executing read-only batch") @@ -305,14 +435,25 @@ func (r *Replica) executeReadOnlyBatchWithServersideRefreshes( log.VEventf(ctx, 2, "server-side retry of batch") } now := timeutil.Now() - br, res, pErr = evaluateBatch(ctx, kvserverbase.CmdIDKey(""), rw, rec, nil, ba, g, st, ui, true /* readOnly */) + br, res, pErr = evaluateBatch( + ctx, kvserverbase.CmdIDKey(""), rw, rec, nil /* ms */, ba, g, st, ui, evalPath, + ) r.store.metrics.ReplicaReadBatchEvaluationLatency.RecordValue(timeutil.Since(now).Nanoseconds()) // Allow only one retry. if pErr == nil || retries > 0 { break } // If we can retry, set a higher batch timestamp and continue. - if !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{} /* deadline */) { + // + // Note that if the batch request has already released its latches (as + // indicated by the latch guard being nil) before this point, then it cannot + // retry at a higher timestamp because it is not isolated at higher + // timestamps. + latchesHeld := g != nil + if !latchesHeld || !canDoServersideRetry(ctx, pErr, ba, br, g, hlc.Timestamp{}) { + // TODO(aayush,arul): These metrics are incorrect at the moment since + // hitting this branch does not mean that we won't serverside retry, it + // just means that we will have to reacquire latches. r.store.Metrics().ReadEvaluationServerSideRetryFailure.Inc(1) break } else { diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 4fb51a8ccb66..51794f6f35f3 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -477,10 +477,22 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } - // The batch execution func returned a server-side concurrency retry - // error. It must have also handed back ownership of the concurrency - // guard without having already released the guard's latches. - g.AssertLatches() + // The batch execution func returned a server-side concurrency retry error. + // It may have either handed back ownership of the concurrency guard without + // having already released the guard's latches, or in case of certain types + // of read-only requests (see `canReadOnlyRequestDropLatchesBeforeEval`), it + // may have released the guard's latches. + dropLatchesAndLockWaitQueues := func(reuseLatchAndLockSpans bool) { + if g != nil { + latchSpans, lockSpans = nil, nil + if reuseLatchAndLockSpans { + latchSpans, lockSpans = g.TakeSpanSets() + } + r.concMgr.FinishReq(g) + g = nil + } + } + if filter := r.store.cfg.TestingKnobs.TestingConcurrencyRetryFilter; filter != nil { filter(ctx, *ba, pErr) } @@ -495,19 +507,18 @@ func (r *Replica) executeBatchWithConcurrencyRetries( switch t := pErr.GetDetail().(type) { case *roachpb.WriteIntentError: // Drop latches, but retain lock wait-queues. + g.AssertLatches() if g, pErr = r.handleWriteIntentError(ctx, ba, g, pErr, t); pErr != nil { return nil, nil, pErr } case *roachpb.TransactionPushError: // Drop latches, but retain lock wait-queues. + g.AssertLatches() if g, pErr = r.handleTransactionPushError(ctx, ba, g, pErr, t); pErr != nil { return nil, nil, pErr } case *roachpb.IndeterminateCommitError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues(true /* reuseLatchAndLockSpans */) // Then launch a task to handle the indeterminate commit error. No error // is returned if the transaction is recovered successfully to either a // COMMITTED or ABORTED state. @@ -515,13 +526,10 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.ReadWithinUncertaintyIntervalError: - // Drop latches and lock wait-queues. - r.concMgr.FinishReq(g) - g = nil // If the batch is able to perform a server-side retry in order to avoid // the uncertainty error, it will have a new timestamp. Force a refresh of // the latch and lock spans. - latchSpans, lockSpans = nil, nil + dropLatchesAndLockWaitQueues(false /* reuseLatchAndLockSpans */) // Attempt to adjust the batch's timestamp to avoid the uncertainty error // and allow for a server-side retry. For transactional requests, there // are strict conditions that must be met for this to be permitted. For @@ -534,10 +542,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.InvalidLeaseError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues(true /* reuseLatchAndLockSpans */) // Then attempt to acquire the lease if not currently held by any // replica or redirect to the current leaseholder if currently held // by a different replica. @@ -545,10 +550,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( return nil, nil, pErr } case *roachpb.MergeInProgressError: - // Drop latches and lock wait-queues. - latchSpans, lockSpans = g.TakeSpanSets() - r.concMgr.FinishReq(g) - g = nil + dropLatchesAndLockWaitQueues(true /* reuseLatchAndLockSpans */) // Then listen for the merge to complete. if pErr = r.handleMergeInProgressError(ctx, ba, pErr, t); pErr != nil { return nil, nil, pErr @@ -1246,7 +1248,9 @@ func (ec *endCmds) poison() { } // done releases the latches acquired by the command and updates the timestamp -// cache using the final timestamp of each command. +// cache using the final timestamp of each command. If `br` is nil, it is +// assumed that `done` is being called by a request that's dropping its latches +// before evaluation. // // No-op if the receiver has been zeroed out by a call to move. Idempotent and // is safe to call more than once. @@ -1259,10 +1263,10 @@ func (ec *endCmds) done( } defer ec.move() // clear - // Update the timestamp cache. Each request within the batch is considered - // in turn; only those marked as affecting the cache are processed. However, - // only do so if the request is consistent and was operating on the - // leaseholder under a valid range lease. + // Update the timestamp cache. Each request within the batch is considered in + // turn; only those marked as affecting the cache are processed. However, only + // do so if the request is consistent and was operating on the leaseholder + // under a valid range lease. if ba.ReadConsistency == roachpb.CONSISTENT && ec.st.State == kvserverpb.LeaseState_VALID { ec.repl.updateTimestampCache(ctx, &ec.st, ba, br, pErr) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 89418b8e753c..c4119ae168ef 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -2222,8 +2222,8 @@ func TestReplicaLatching(t *testing.T) { }{ // Read/read doesn't wait. {true, true, false, false}, - // A write doesn't wait for an earlier read (except for local keys). - {true, false, false, true}, + // A write doesn't wait for an earlier read. + {true, false, false, false}, // A read must wait for an earlier write. {false, true, true, true}, // Writes always wait for other writes. @@ -2547,7 +2547,7 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { blockReader.Store(false) blockWriter.Store(false) blockCh := make(chan struct{}, 1) - blockedCh := make(chan struct{}, 1) + waitForRequestBlocked := make(chan struct{}, 1) tc := testContext{} tsc := TestStoreConfig(nil) @@ -2558,10 +2558,10 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { return nil } if filterArgs.Req.Method() == roachpb.Get && blockReader.Load().(bool) { - blockedCh <- struct{}{} + waitForRequestBlocked <- struct{}{} <-blockCh } else if filterArgs.Req.Method() == roachpb.Put && blockWriter.Load().(bool) { - blockedCh <- struct{}{} + waitForRequestBlocked <- struct{}{} <-blockCh } return nil @@ -2579,17 +2579,79 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { interferes bool }{ // Reader & writer have same timestamps. - {makeTS(1, 0), makeTS(1, 0), roachpb.Key("a"), true, true}, - {makeTS(1, 0), makeTS(1, 0), roachpb.Key("b"), false, true}, - // Reader has earlier timestamp. - {makeTS(1, 0), makeTS(1, 1), roachpb.Key("c"), true, false}, - {makeTS(1, 0), makeTS(1, 1), roachpb.Key("d"), false, false}, - // Writer has earlier timestamp. - {makeTS(1, 1), makeTS(1, 0), roachpb.Key("e"), true, true}, - {makeTS(1, 1), makeTS(1, 0), roachpb.Key("f"), false, true}, - // Local keys always interfere. - {makeTS(1, 0), makeTS(1, 1), keys.RangeDescriptorKey(roachpb.RKey("a")), true, true}, - {makeTS(1, 0), makeTS(1, 1), keys.RangeDescriptorKey(roachpb.RKey("b")), false, true}, + // + // Reader goes first, but the reader does not need to hold latches during + // evaluation, so we expect no interference. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 0), + key: roachpb.Key("a"), + readerFirst: true, + interferes: false, + }, + // Writer goes first, but the writer does need to hold latches during + // evaluation, so it should block the reader. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 0), + key: roachpb.Key("b"), + readerFirst: false, + interferes: true, + }, + // Reader has earlier timestamp, so it doesn't interfere with the write + // that's in its future. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: roachpb.Key("c"), + readerFirst: true, + interferes: false, + }, + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: roachpb.Key("d"), + readerFirst: false, + interferes: false, + }, + // Writer has an earlier timestamp. We expect no interference for the writer + // as the reader will be evaluating over a pebble snapshot. We'd expect the + // writer to be able to continue without interference but to get bumped by + // the timestamp cache. + { + readerTS: makeTS(1, 1), + writerTS: makeTS(1, 0), + key: roachpb.Key("e"), + readerFirst: true, + interferes: false, + }, + // We expect the reader to block for the writer that's writing in the + // reader's past. + { + readerTS: makeTS(1, 1), + writerTS: makeTS(1, 0), + key: roachpb.Key("f"), + readerFirst: false, + interferes: true, + }, + // Even though local key accesses are NonMVCC, the reader should not block + // the writer because it should not need to hold its latches during + // evaluation. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: keys.RangeDescriptorKey(roachpb.RKey("a")), + readerFirst: true, + interferes: false, + }, + // The writer will block the reader since it holds NonMVCC latches during + // evaluation. + { + readerTS: makeTS(1, 0), + writerTS: makeTS(1, 1), + key: keys.RangeDescriptorKey(roachpb.RKey("b")), + interferes: true, + }, } for _, test := range testCases { t.Run(fmt.Sprintf("%+v", test), func(t *testing.T) { @@ -2613,7 +2675,8 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { _, pErr := tc.Sender().Send(context.Background(), baR) errCh <- pErr }() - <-blockedCh + // Wait for the above read to get blocked on blockCh. + <-waitForRequestBlocked go func() { _, pErr := tc.Sender().Send(context.Background(), baW) errCh <- pErr @@ -2624,7 +2687,9 @@ func TestReplicaLatchingTimestampNonInterference(t *testing.T) { _, pErr := tc.Sender().Send(context.Background(), baW) errCh <- pErr }() - <-blockedCh + // Wait for the above write to get blocked on blockCh while it's holding + // latches. + <-waitForRequestBlocked go func() { _, pErr := tc.Sender().Send(context.Background(), baR) errCh <- pErr diff --git a/pkg/kv/kvserver/replica_tscache.go b/pkg/kv/kvserver/replica_tscache.go index 1f18b87a2996..b587fa0d37d2 100644 --- a/pkg/kv/kvserver/replica_tscache.go +++ b/pkg/kv/kvserver/replica_tscache.go @@ -51,9 +51,11 @@ func (r *Replica) addToTSCacheChecked( r.store.tsCache.Add(start, end, ts, txnID) } -// updateTimestampCache updates the timestamp cache in order to set a low water -// mark for the timestamp at which mutations to keys overlapping the provided -// request can write, such that they don't re-write history. +// updateTimestampCache updates the timestamp cache in order to set a low +// watermark for the timestamp at which mutations to keys overlapping the +// provided request can write, such that they don't re-write history. It can be +// called before or after a batch is done evaluating. A nil `br` indicates that +// this method is being called before the batch is done evaluating. func (r *Replica) updateTimestampCache( ctx context.Context, st *kvserverpb.LeaseStatus, @@ -75,6 +77,7 @@ func (r *Replica) updateTimestampCache( if ba.Txn != nil { txnID = ba.Txn.ID } + beforeEval := br == nil && pErr == nil for i, union := range ba.Requests { req := union.GetInner() if !roachpb.UpdatesTimestampCache(req) { @@ -220,13 +223,14 @@ func (r *Replica) updateTimestampCache( addToTSCache(start, end, ts, txnID) } case *roachpb.GetRequest: - if resume := resp.(*roachpb.GetResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.GetResponse).ResumeSpan != nil { // The request did not evaluate. Ignore it. continue } addToTSCache(start, end, ts, txnID) case *roachpb.ScanRequest: - if resume := resp.(*roachpb.ScanResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.ScanResponse).ResumeSpan != nil { + resume := resp.(*roachpb.ScanResponse).ResumeSpan if start.Equal(resume.Key) { // The request did not evaluate. Ignore it. continue @@ -238,7 +242,8 @@ func (r *Replica) updateTimestampCache( } addToTSCache(start, end, ts, txnID) case *roachpb.ReverseScanRequest: - if resume := resp.(*roachpb.ReverseScanResponse).ResumeSpan; resume != nil { + if !beforeEval && resp.(*roachpb.ReverseScanResponse).ResumeSpan != nil { + resume := resp.(*roachpb.ReverseScanResponse).ResumeSpan if end.Equal(resume.EndKey) { // The request did not evaluate. Ignore it. continue diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 87a58723720c..3ce3b961df66 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -659,7 +659,7 @@ func (r *Replica) evaluateWriteBatchWrapper( ) (storage.Batch, *roachpb.BatchResponse, result.Result, *roachpb.Error) { batch, opLogger := r.newBatchedEngine(ba, g) now := timeutil.Now() - br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, g, st, ui, false /* readOnly */) + br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, ms, ba, g, st, ui, readWrite) r.store.metrics.ReplicaWriteBatchEvaluationLatency.RecordValue(timeutil.Since(now).Nanoseconds()) if pErr == nil { if opLogger != nil { diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index b868d803c672..6151bd06582e 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -1897,7 +1898,11 @@ func TestStoreScanResumeTSCache(t *testing.T) { ctx := context.Background() stopper := stop.NewStopper() defer stopper.Stop(ctx) - store, manualClock := createTestStore(ctx, t, testStoreOpts{createSystemRanges: true}, stopper) + manualClock := timeutil.NewManualTime(timeutil.Unix(0, 123)) + cfg := TestStoreConfig(hlc.NewClock(manualClock, time.Nanosecond)) + cfg.Settings = cluster.MakeTestingClusterSettings() + allowDroppingLatchesBeforeEval.Override(ctx, &cfg.Settings.SV, false) + store := createTestStoreWithConfig(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) // Write three keys at time t0. t0 := timeutil.Unix(1, 0) @@ -2108,8 +2113,8 @@ func TestStoreScanIntents(t *testing.T) { expFinish bool // do we expect the scan to finish? expCount int32 // how many times do we expect to scan? }{ - // Consistent which can push will make two loops. - {true, true, true, 2}, + // Consistent which can push will detect conflicts and resolve them. + {true, true, true, 1}, // Consistent but can't push will backoff and retry and not finish. {true, false, false, -1}, // Inconsistent and can push will make one loop, with async resolves. @@ -2197,6 +2202,111 @@ func TestStoreScanIntents(t *testing.T) { } } +// TestStoreScanIntentsRespectsLimit verifies that when reads are allowed to +// resolve their conflicts before eval (i.e. when they are allowed to drop their +// latches early), the scan for conflicting intents respects the max intent +// limits. +// +// The test proceeds as follows: a writer lays down more than +// `MaxIntentsPerWriteIntentErrorDefault` intents, and a reader is expected to +// encounter these intents and raise a `WriteIntentError` with exactly +// `MaxIntentsPerWriteIntentErrorDefault` intents in the error. +func TestStoreScanIntentsRespectsLimit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + skip.UnderRace( + t, "this test writes a ton of intents and tries to clean them up, too slow under race", + ) + + var interceptWriteIntentErrors atomic.Value + // `commitCh` is used to block the writer from committing until the reader has + // encountered the intents laid down by the writer. + commitCh := make(chan struct{}) + // intentsLaidDownCh is signalled when the writer is done laying down intents. + intentsLaidDownCh := make(chan struct{}) + tc := serverutils.StartNewTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &StoreTestingKnobs{ + TestingConcurrencyRetryFilter: func( + ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error, + ) { + if errors.HasType(pErr.GoError(), (*roachpb.WriteIntentError)(nil)) { + // Assert that the WriteIntentError has MaxIntentsPerWriteIntentErrorIntents. + if trap := interceptWriteIntentErrors.Load(); trap != nil && trap.(bool) { + require.Equal( + t, storage.MaxIntentsPerWriteIntentErrorDefault, + len(pErr.GetDetail().(*roachpb.WriteIntentError).Intents), + ) + interceptWriteIntentErrors.Store(false) + // Allow the writer to commit. + t.Logf("allowing writer to commit") + close(commitCh) + } + } + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + store, err := tc.Server(0).GetStores().(*Stores).GetStore(tc.Server(0).GetFirstStoreID()) + require.NoError(t, err) + var intentKeys []roachpb.Key + var wg sync.WaitGroup + wg.Add(2) + + // Lay down more than `MaxIntentsPerWriteIntentErrorDefault` intents. + go func() { + defer wg.Done() + txn := newTransaction( + "test", roachpb.Key("test-key"), roachpb.NormalUserPriority, tc.Server(0).Clock(), + ) + for j := 0; j < storage.MaxIntentsPerWriteIntentErrorDefault+10; j++ { + var key roachpb.Key + key = append(key, keys.ScratchRangeMin...) + key = append(key, []byte(fmt.Sprintf("%d", j))...) + intentKeys = append(intentKeys, key) + args := putArgs(key, []byte(fmt.Sprintf("value%07d", j))) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: txn}, &args) + require.Nil(t, pErr) + } + intentsLaidDownCh <- struct{}{} + <-commitCh // Wait for the test to tell us to commit the txn. + args, header := endTxnArgs(txn, true /* commit */) + _, pErr := kv.SendWrappedWith(ctx, store.TestSender(), header, &args) + require.Nil(t, pErr) + }() + + select { + case <-intentsLaidDownCh: + case <-time.After(testutils.DefaultSucceedsSoonDuration): + t.Fatal("timed out waiting for intents to be laid down") + } + + // Now, expect a conflicting reader to encounter the intents and raise a + // WriteIntentError with exactly `MaxIntentsPerWriteIntentErrorDefault` + // intents. See the TestingConcurrencyRetryFilter above. + var ba kv.Batch + for i := 0; i < storage.MaxIntentsPerWriteIntentErrorDefault+10; i += 10 { + for _, key := range intentKeys[i : i+10] { + args := getArgs(key) + ba.AddRawRequest(&args) + } + } + t.Logf("issuing gets while intercepting WriteIntentErrors") + interceptWriteIntentErrors.Store(true) + go func() { + defer wg.Done() + err := store.DB().Run(ctx, &ba) + require.NoError(t, err) + }() + + wg.Wait() +} + // TestStoreScanInconsistentResolvesIntents lays down 10 intents, // commits the txn without resolving intents, then does repeated // inconsistent reads until the data shows up, showing that the diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 700fa1fa360c..c40c8dee48b9 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1425,3 +1425,86 @@ func iterateOnReader( } return nil } + +// ScanConflictingIntents scans intents using only the separated intents lock +// table. The result set is added to the given `intents` slice. It ignores +// intents that do not conflict with `txn`. If it encounters intents that were +// written by `txn` that are either at a higher sequence number than txn's or at +// a lower sequence number but at a higher timestamp, `needIntentHistory` is set +// to true. This flag is used to signal to the caller that a subsequent scan +// over the MVCC key space (for the batch in question) will need to be performed +// using an intent interleaving iterator in order to be able to read the correct +// provisional value. +func ScanConflictingIntents( + ctx context.Context, + reader Reader, + txn *roachpb.Transaction, + ts hlc.Timestamp, + start, end roachpb.Key, + intents *[]roachpb.Intent, + maxIntents int64, +) (needIntentHistory bool, err error) { + if err := ctx.Err(); err != nil { + return false, err + } + + upperBoundUnset := len(end) == 0 // NB: Get requests do not set the end key. + if !upperBoundUnset && bytes.Compare(start, end) >= 0 { + return true, errors.AssertionFailedf("start key must be less than end key") + } + ltStart, _ := keys.LockTableSingleKey(start, nil) + opts := IterOptions{LowerBound: ltStart} + if upperBoundUnset { + opts.Prefix = true + } else { + ltEnd, _ := keys.LockTableSingleKey(end, nil) + opts.UpperBound = ltEnd + } + iter := reader.NewEngineIterator(opts) + defer iter.Close() + + var meta enginepb.MVCCMetadata + var ok bool + for ok, err = iter.SeekEngineKeyGE(EngineKey{Key: ltStart}); ok; ok, err = iter.NextEngineKey() { + if maxIntents != 0 && int64(len(*intents)) >= maxIntents { + break + } + if err = protoutil.Unmarshal(iter.UnsafeValue(), &meta); err != nil { + return false, err + } + if meta.Txn == nil { + return false, errors.Errorf("intent without transaction") + } + ownIntent := txn != nil && txn.ID == meta.Txn.ID + if ownIntent { + // If we ran into one of our own intents, check whether the intent has a + // higher (or equal) sequence number or a higher (or equal) timestamp. If + // either of these conditions is true, a corresponding scan over the MVCC + // key space will need access to the key's intent history in order to read + // the correct provisional value. So we set `needIntentHistory` to true. + if txn.Sequence <= meta.Txn.Sequence || ts.LessEq(meta.Timestamp.ToTimestamp()) { + needIntentHistory = true + } + continue + } + if conflictingIntent := meta.Timestamp.ToTimestamp().LessEq(ts); !conflictingIntent { + continue + } + key, err := iter.EngineKey() + if err != nil { + return false, err + } + lockedKey, err := keys.DecodeLockTableSingleKey(key.Key) + if err != nil { + return false, err + } + *intents = append(*intents, roachpb.MakeIntent(meta.Txn, lockedKey)) + } + if err != nil { + return false, err + } + if err := ctx.Err(); err != nil { + return false, err + } + return needIntentHistory, nil /* err */ +} diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 66fc8c4e1108..d717b4ec496d 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -54,12 +54,12 @@ const ( // minimum total for a single store node must be under 2048 for Windows // compatibility. MinimumMaxOpenFiles = 1700 - // Default value for maximum number of intents reported by ExportToSST - // and Scan operations in WriteIntentError is set to half of the maximum - // lock table size. - // This value is subject to tuning in real environment as we have more - // data available. - maxIntentsPerWriteIntentErrorDefault = 5000 + // MaxIntentsPerWriteIntentErrorDefault is the default value for maximum + // number of intents reported by ExportToSST and Scan operations in + // WriteIntentError is set to half of the maximum lock table size. + // This value is subject to tuning in real environment as we have more data + // available. + MaxIntentsPerWriteIntentErrorDefault = 5000 ) var minWALSyncInterval = settings.RegisterDurationSetting( @@ -76,7 +76,8 @@ var MaxIntentsPerWriteIntentError = settings.RegisterIntSetting( settings.TenantWritable, "storage.mvcc.max_intents_per_error", "maximum number of intents returned in error during export of scan requests", - maxIntentsPerWriteIntentErrorDefault) + MaxIntentsPerWriteIntentErrorDefault, +) var rocksdbConcurrency = envutil.EnvOrDefaultInt( "COCKROACH_ROCKSDB_CONCURRENCY", func() int { @@ -866,6 +867,14 @@ type MVCCGetOptions struct { // LockTable is used to determine whether keys are locked in the in-memory // lock table when scanning with the SkipLocked option. LockTable LockTableView + // DontInterleaveIntents, when set, makes it such that intent metadata is not + // interleaved with the results of the scan. Setting this option means that + // the underlying pebble iterator will only scan over the MVCC keyspace and + // will not use an `intentInterleavingIter`. It is only appropriate to use + // this when the caller does not need to know whether a given key is an intent + // or not. It is usually set by read-only requests that have resolved their + // conflicts before they begin their MVCC scan. + DontInterleaveIntents bool } func (opts *MVCCGetOptions) validate() error { @@ -878,6 +887,9 @@ func (opts *MVCCGetOptions) validate() error { if opts.Inconsistent && opts.FailOnMoreRecent { return errors.Errorf("cannot allow inconsistent reads with fail on more recent option") } + if opts.DontInterleaveIntents && opts.SkipLocked { + return errors.Errorf("cannot disable interleaved intents with skip locked option") + } return nil } @@ -886,12 +898,16 @@ func (opts *MVCCGetOptions) errOnIntents() bool { } // newMVCCIterator sets up a suitable iterator for high-level MVCC operations -// operating at the given timestamp. If timestamp is empty, the iterator is -// considered to be used for inline values, disabling intents and range keys. -// If rangeKeyMasking is true, IterOptions.RangeKeyMaskingBelow is set to the -// given timestamp. +// operating at the given timestamp. If timestamp is empty or if +// `noInterleavedIntents` is set, the iterator is considered to be used for +// inline values, disabling intents and range keys. If rangeKeyMasking is true, +// IterOptions.RangeKeyMaskingBelow is set to the given timestamp. func newMVCCIterator( - reader Reader, timestamp hlc.Timestamp, rangeKeyMasking bool, opts IterOptions, + reader Reader, + timestamp hlc.Timestamp, + rangeKeyMasking bool, + noInterleavedIntents bool, + opts IterOptions, ) MVCCIterator { // If reading inline then just return a plain MVCCIterator without intents. // However, we allow the caller to enable range keys, since they may be needed @@ -904,7 +920,11 @@ func newMVCCIterator( opts.RangeKeyMaskingBelow.IsEmpty() { opts.RangeKeyMaskingBelow = timestamp } - return reader.NewMVCCIterator(MVCCKeyAndIntentsIterKind, opts) + iterKind := MVCCKeyAndIntentsIterKind + if noInterleavedIntents { + iterKind = MVCCKeyIterKind + } + return reader.NewMVCCIterator(iterKind, opts) } // MVCCGet returns the most recent value for the specified key whose timestamp @@ -948,10 +968,12 @@ func newMVCCIterator( func MVCCGet( ctx context.Context, reader Reader, key roachpb.Key, timestamp hlc.Timestamp, opts MVCCGetOptions, ) (*roachpb.Value, *roachpb.Intent, error) { - iter := newMVCCIterator(reader, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + reader, timestamp, false /* rangeKeyMasking */, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() value, intent, err := mvccGet(ctx, iter, key, timestamp, opts) return value.ToPointer(), intent, err @@ -1291,10 +1313,12 @@ func MVCCPut( var iter MVCCIterator blind := ms == nil && timestamp.IsEmpty() if !blind { - iter = newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter = newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() } return mvccPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, value, txn, nil) @@ -1341,10 +1365,12 @@ func MVCCDelete( localTimestamp hlc.ClockTimestamp, txn *roachpb.Transaction, ) (foundKey bool, err error) { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() // TODO(yuzefovich): can we avoid the actual put if foundKey is false? @@ -2051,10 +2077,12 @@ func MVCCIncrement( txn *roachpb.Transaction, inc int64, ) (int64, error) { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() var int64Val int64 @@ -2128,10 +2156,12 @@ func MVCCConditionalPut( allowIfDoesNotExist CPutMissingBehavior, txn *roachpb.Transaction, ) error { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() return mvccConditionalPutUsingIter( @@ -2213,10 +2243,12 @@ func MVCCInitPut( failOnTombstones bool, txn *roachpb.Transaction, ) error { - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() return mvccInitPutUsingIter(ctx, rw, iter, ms, key, timestamp, localTimestamp, value, failOnTombstones, txn) } @@ -2811,10 +2843,12 @@ func MVCCDeleteRange( buf := newPutBuffer() defer buf.release() - iter := newMVCCIterator(rw, timestamp, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + iter := newMVCCIterator( + rw, timestamp, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer iter.Close() var keys []roachpb.Key @@ -2971,10 +3005,12 @@ func MVCCPredicateDeleteRange( // Create some reusable machinery for flushing a run with point tombstones // that is typically used in a single MVCCPut call. - pointTombstoneIter := newMVCCIterator(rw, endTime, false /* rangeKeyMasking */, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - Prefix: true, - }) + pointTombstoneIter := newMVCCIterator( + rw, endTime, false /* rangeKeyMasking */, false /* noInterleavedIntents */, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + Prefix: true, + }, + ) defer pointTombstoneIter.Close() pointTombstoneBuf := newPutBuffer() defer pointTombstoneBuf.release() @@ -3617,6 +3653,14 @@ type MVCCScanOptions struct { // LockTable is used to determine whether keys are locked in the in-memory // lock table when scanning with the SkipLocked option. LockTable LockTableView + // DontInterleaveIntents, when set, makes it such that intent metadata is not + // interleaved with the results of the scan. Setting this option means that + // the underlying pebble iterator will only scan over the MVCC keyspace and + // will not use an `intentInterleavingIter`. It is only appropriate to use + // this when the caller does not need to know whether a given key is an intent + // or not. It is usually set by read-only requests that have resolved their + // conflicts before they begin their MVCC scan. + DontInterleaveIntents bool } func (opts *MVCCScanOptions) validate() error { @@ -3629,6 +3673,9 @@ func (opts *MVCCScanOptions) validate() error { if opts.Inconsistent && opts.FailOnMoreRecent { return errors.Errorf("cannot allow inconsistent reads with fail on more recent option") } + if opts.DontInterleaveIntents && opts.SkipLocked { + return errors.Errorf("cannot disable interleaved intents with skip locked option") + } return nil } @@ -3715,11 +3762,13 @@ func MVCCScan( timestamp hlc.Timestamp, opts MVCCScanOptions, ) (MVCCScanResult, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() return mvccScanToKvs(ctx, iter, key, endKey, timestamp, opts) } @@ -3732,11 +3781,13 @@ func MVCCScanToBytes( timestamp hlc.Timestamp, opts MVCCScanOptions, ) (MVCCScanResult, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() return mvccScanToBytes(ctx, iter, key, endKey, timestamp, opts) } @@ -3781,11 +3832,13 @@ func MVCCIterate( opts MVCCScanOptions, f func(roachpb.KeyValue) error, ) ([]roachpb.Intent, error) { - iter := newMVCCIterator(reader, timestamp, !opts.Tombstones, IterOptions{ - KeyTypes: IterKeyTypePointsAndRanges, - LowerBound: key, - UpperBound: endKey, - }) + iter := newMVCCIterator( + reader, timestamp, !opts.Tombstones, opts.DontInterleaveIntents, IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: key, + UpperBound: endKey, + }, + ) defer iter.Close() var intents []roachpb.Intent diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 835735ee824b..0c7fb93a0f12 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -3529,4 +3529,17 @@ var charts = []sectionDescription{ }, }, }, + { + Organization: [][]string{{ReplicationLayer, "Batches"}}, + Charts: []chartDescription{ + { + Title: "Total number of attempts to evaluate read-only batches", + Metrics: []string{ + "kv.replica_read_batch_evaluate.total", + "kv.replica_read_batch_evaluate.dropped_latches_before_eval", + "kv.replica_read_batch_evaluate.without_interleaving_iter", + }, + }, + }, + }, }