diff --git a/pkg/kv/kvserver/closed_timestamp_test.go b/pkg/kv/kvserver/closed_timestamp_test.go index 0cf42b19bbaa..b3fca88ee288 100644 --- a/pkg/kv/kvserver/closed_timestamp_test.go +++ b/pkg/kv/kvserver/closed_timestamp_test.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -114,84 +115,6 @@ func TestClosedTimestampCanServe(t *testing.T) { } } -// TestClosedTimestampCanServerThroughoutLeaseTransfer verifies that lease -// transfers does not prevent reading a value from a follower that was -// previously readable. -func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - // Limiting how long transactions can run does not work - // well with race unless we're extremely lenient, which - // drives up the test duration. - skip.UnderRace(t) - - ctx := context.Background() - tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, - testingCloseFraction, aggressiveResolvedTimestampClusterArgs) - defer tc.Stopper().Stop(ctx) - - if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil { - t.Fatal(err) - } - ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} - baRead := makeReadBatchRequestForDesc(desc, ts) - testutils.SucceedsSoon(t, func() error { - return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) - }) - - // Once we know that we can read safely at this timestamp, we want to ensure - // that we can always read from this timestamp from all replicas even while - // lease transfers are ongoing. The test launches a goroutine to randomly - // trigger transfers at random intervals up to 50ms and ensures that there - // are no errors reading the same value from any replica throughout the - // duration of the test (testTime). - const testTime = 500 * time.Millisecond - const maxTransferWait = 50 * time.Millisecond - deadline := timeutil.Now().Add(testTime) - g, gCtx := errgroup.WithContext(ctx) - - transferLeasesRandomlyUntilDeadline := func() error { - for timeutil.Now().Before(deadline) { - lh := getCurrentLeaseholder(t, tc, desc) - target := pickRandomTarget(tc, lh, desc) - if err := tc.TransferRangeLease(desc, target); err != nil { - return err - } - time.Sleep(time.Duration(rand.Intn(int(maxTransferWait)))) - } - return nil - } - g.Go(transferLeasesRandomlyUntilDeadline) - - // Attempt to send read requests to a replica in a tight loop until deadline - // is reached. If an error is seen on any replica then it is returned to the - // errgroup. - baRead = makeReadBatchRequestForDesc(desc, ts) - ensureCanReadFromReplicaUntilDeadline := func(r *kvserver.Replica) { - g.Go(func() error { - for timeutil.Now().Before(deadline) { - resp, pErr := r.Send(gCtx, baRead) - if pErr != nil { - return errors.Wrapf(pErr.GoError(), "on %s", r) - } - rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows - // Should see the write. - if len(rows) != 1 { - return fmt.Errorf("expected one row, but got %d", len(rows)) - } - } - return nil - }) - } - for _, r := range repls { - ensureCanReadFromReplicaUntilDeadline(r) - } - if err := g.Wait(); err != nil { - t.Fatal(err) - } -} - // TestClosedTimestampCanServeWithConflictingIntent validates that a read served // from a follower replica will wait on conflicting intents and ensure that they // are cleaned up if necessary to allow the read to proceed. @@ -362,7 +285,7 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) { ctx := context.Background() // Set up the target duration to be very long and rely on lease transfers to // drive MaxClosed. - tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, time.Hour, + tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration, testingCloseFraction, aggressiveResolvedTimestampClusterArgs) defer tc.Stopper().Stop(ctx) @@ -377,6 +300,22 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) { target := pickRandomTarget(tc, lh, desc) require.Nil(t, tc.TransferRangeLease(desc, target)) baRead := makeReadBatchRequestForDesc(desc, ts) + + // Poll the store for closed timestamp updates for timestamps greater than the + // `ts` of our read request. + g := ctxgroup.WithContext(ctx) + closedTimestampCh := make(chan ctpb.Entry, 1) + g.Go(func() (e error) { + pollForGreaterClosedTimestamp(t, tc, lh, desc, ts, closedTimestampCh) + return + }) + log.Infof(ctx, "waiting for next closed timestamp update for the scratch range") + select { + case <-closedTimestampCh: + case <-time.After(30 * time.Second): + t.Fatal("failed to receive next closed timestamp update") + } + testutils.SucceedsSoon(t, func() error { return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1)) }) @@ -486,7 +425,7 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { rightLeaseholder roachpb.ReplicationTarget, freezeStartTimestamp hlc.Timestamp, leaseAcquisitionTrap *atomic.Value, - ) (roachpb.ReplicationTarget, hlc.Timestamp, error) + ) (roachpb.ReplicationTarget, error) type testCase struct { name string @@ -608,6 +547,9 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { } }() + // freezeStartTimestamp indicates the low water mark for closed timestamp + // updates beyond which we expect none of the followers to be able to serve + // follower reads until the merge is complete. var freezeStartTimestamp hlc.Timestamp // We now have the RHS in its subsumed state. select { @@ -623,27 +565,23 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { testingTargetDuration)); err != nil { t.Fatal(err) } - // inactiveClosedTSBoundary indicates the low water mark for closed - // timestamp updates beyond which we expect none of the followers to be able - // to serve follower reads until the merge is complete. - inactiveClosedTSBoundary := freezeStartTimestamp if callback != nil { - newRightLeaseholder, ts, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder, + newRightLeaseholder, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder, freezeStartTimestamp, &leaseAcquisitionTrap) if err != nil { t.Fatal(err) } - rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts + rightLeaseholder = newRightLeaseholder } // Poll the store for closed timestamp updates for timestamps greater than - // our `inactiveClosedTSBoundary`. + // our `freezeStartTimestamp`. closedTimestampCh := make(chan ctpb.Entry, 1) g.Go(func() (e error) { - pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, inactiveClosedTSBoundary, closedTimestampCh) + pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, freezeStartTimestamp, closedTimestampCh) return }) // We expect that none of the closed timestamp updates greater than - // `inactiveClosedTSBoundary` will be actionable by the RHS follower + // `freezeStartTimestamp` will be actionable by the RHS follower // replicas. log.Infof(ctx, "waiting for next closed timestamp update for the RHS") select { @@ -651,10 +589,14 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) { case <-time.After(30 * time.Second): t.Fatal("failed to receive next closed timestamp update") } - baReadAfterLeaseTransfer := makeReadBatchRequestForDesc(rightDesc, inactiveClosedTSBoundary.Next()) + baReadAfterSubsume := makeReadBatchRequestForDesc(rightDesc, freezeStartTimestamp.Next()) rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder) - log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary") - verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */) + log.Infof( + ctx, + "sending read requests to followers after the freezeStartTimestamp (at ts: %s)", + freezeStartTimestamp.Next(), + ) + verifyNotLeaseHolderErrors(t, baReadAfterSubsume, rightReplFollowers, 2 /* expectedNLEs */) } for _, test := range tests { @@ -676,7 +618,7 @@ func forceLeaseTransferOnSubsumedRange( rightLeaseholder roachpb.ReplicationTarget, freezeStartTimestamp hlc.Timestamp, leaseAcquisitionTrap *atomic.Value, -) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp, err error) { +) (newLeaseholder roachpb.ReplicationTarget, err error) { oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rightLeaseholder) // Co-operative lease transfers will block while a range is subsumed, so we // pause the node liveness heartbeats until a lease transfer occurs. @@ -731,17 +673,18 @@ func forceLeaseTransferOnSubsumedRange( if storeID != newRightLeaseholder.StoreID() { err = errors.Newf("expected store %d to try to acquire the lease; got a request from store %d instead", newRightLeaseholder.StoreID(), storeID) - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + return roachpb.ReplicationTarget{}, err } case <-time.After(30 * time.Second): err = errors.New("failed to receive lease acquisition request") - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + return roachpb.ReplicationTarget{}, err } rightLeaseholder = roachpb.ReplicationTarget{ NodeID: newRightLeaseholder.NodeID(), StoreID: newRightLeaseholder.StoreID(), } oldLeaseholderStore = getTargetStoreOrFatal(t, tc, rightLeaseholder) + var leaseStart hlc.Timestamp err = retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error { newLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease() if newLease.Sequence == oldLease.Sequence { @@ -755,10 +698,10 @@ func forceLeaseTransferOnSubsumedRange( } if !freezeStartTimestamp.LessEq(leaseStart) { err = errors.New("freeze timestamp greater than the start time of the new lease") - return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err + return roachpb.ReplicationTarget{}, err } - return rightLeaseholder, leaseStart, nil + return rightLeaseholder, nil } // mergeFilter provides a method (SuspendMergeTrigger) that can be installed as diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 410585944cbf..02795630db2f 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -290,7 +290,7 @@ func (r *Replica) LastAssignedLeaseIndex() uint64 { // MaxClosed returns the maximum closed timestamp known to the Replica. func (r *Replica) MaxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) { - return r.maxClosed(ctx) + return r.maxClosed() } // SetQuotaPool allows the caller to set a replica's quota pool initialized to diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 54992c6bf24b..3177538ec6cf 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1025,7 +1025,7 @@ func (r *Replica) State() kvserverpb.RangeInfo { // NB: this acquires an RLock(). Reentrant RLocks are deadlock prone, so do // this first before RLocking below. Performance of this extra lock // acquisition is not a concern. - ri.ActiveClosedTimestamp, _ = r.maxClosed(context.Background()) + ri.ActiveClosedTimestamp, _ = r.maxClosed() // NB: numRangefeedRegistrations doesn't require Replica.mu to be locked. // However, it does require coordination between multiple goroutines, so diff --git a/pkg/kv/kvserver/replica_follower_read.go b/pkg/kv/kvserver/replica_follower_read.go index b295bc689768..270e0e586432 100644 --- a/pkg/kv/kvserver/replica_follower_read.go +++ b/pkg/kv/kvserver/replica_follower_read.go @@ -71,7 +71,7 @@ func (r *Replica) canServeFollowerRead( ts.Forward(ba.Txn.MaxTimestamp) } - maxClosed, _ := r.maxClosed(ctx) + maxClosed, _ := r.maxClosed() canServeFollowerRead := ts.LessEq(maxClosed) tsDiff := ts.GoTime().Sub(maxClosed.GoTime()) if !canServeFollowerRead { @@ -107,18 +107,25 @@ func (r *Replica) canServeFollowerRead( return nil } -// maxClosed returns the maximum closed timestamp for this range. -// It is computed as the most recent of the known closed timestamp for the -// current lease holder for this range as tracked by the closed timestamp -// subsystem and the start time of the current lease. It is safe to use the -// start time of the current lease because leasePostApply bumps the timestamp -// cache forward to at least the new lease start time. Using this combination -// allows the closed timestamp mechanism to be robust to lease transfers. +// maxClosed returns the maximum closed timestamp for this range. It is computed +// as the most recent of the known closed timestamp for the current lease holder +// for this range as tracked by the closed timestamp subsystem. +// // If the ok return value is false, the Replica is a member of a range which // uses an expiration-based lease. Expiration-based leases do not support the // closed timestamp subsystem. A zero-value timestamp will be returned if ok // is false. -func (r *Replica) maxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) { +// +// NB: It is unsafe to use the start time of the current lease as a lower bound +// on `maxClosed()` because, even though `leasePostApply()` bumps the timestamp +// cache forward to at least the new lease start time, the range may have been +// in the middle of a merge when the lease transfer occurred. Since +// `SubsumeRequest` is not a replicated command, other replicas do not have +// access to the subsumption time. We choose to be pessimistic here and allow +// closed timestamp state to be temporarily reset after a lease transfer, in +// order to avoid potentially serving follower reads past the subsumption time +// of a range. +func (r *Replica) maxClosed() (_ hlc.Timestamp, ok bool) { r.mu.RLock() lai := r.mu.state.LeaseAppliedIndex lease := *r.mu.state.Lease @@ -129,7 +136,6 @@ func (r *Replica) maxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) { } maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed( lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai)) - maxClosed.Forward(lease.Start) maxClosed.Forward(initialMaxClosed) return maxClosed, true } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 37cd12bc9395..05ff47c7f99b 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -595,7 +595,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(ctx context.Context) { } // Determine what the maximum closed timestamp is for this replica. - closedTS, _ := r.maxClosed(ctx) + closedTS, _ := r.maxClosed() // If the closed timestamp is sufficiently stale, signal that we want an // update to the leaseholder so that it will eventually begin to progress diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 7766c84c5df0..092848c19f65 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2550,7 +2550,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { if wps, dur := rep.writeStats.avgQPS(); dur >= MinStatsDuration { averageWritesPerSecond += wps } - mc, ok := rep.maxClosed(ctx) + mc, ok := rep.maxClosed() if ok && (minMaxClosedTS.IsEmpty() || mc.Less(minMaxClosedTS)) { minMaxClosedTS = mc } diff --git a/pkg/kv/kvserver/store_split.go b/pkg/kv/kvserver/store_split.go index 038f2246a976..badf29b5e84b 100644 --- a/pkg/kv/kvserver/store_split.go +++ b/pkg/kv/kvserver/store_split.go @@ -149,7 +149,7 @@ func splitPreApply( // the hazard and ensures that no replica on the RHS is created with an // initialMaxClosed that could be violated by a proposal on the RHS's // initial leaseholder. See #44878. - initialMaxClosed, _ := r.maxClosed(ctx) + initialMaxClosed, _ := r.maxClosed() rightRepl.mu.Lock() rightRepl.mu.initialMaxClosed = initialMaxClosed rightRepl.mu.Unlock()