Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.2: kvserver: avoid bootstrapping closedTS state with the lease start time #65823

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 40 additions & 97 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -114,84 +115,6 @@ func TestClosedTimestampCanServe(t *testing.T) {
}
}

// TestClosedTimestampCanServerThroughoutLeaseTransfer verifies that lease
// transfers does not prevent reading a value from a follower that was
// previously readable.
func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Limiting how long transactions can run does not work
// well with race unless we're extremely lenient, which
// drives up the test duration.
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration,
testingCloseFraction, aggressiveResolvedTimestampClusterArgs)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
baRead := makeReadBatchRequestForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})

// Once we know that we can read safely at this timestamp, we want to ensure
// that we can always read from this timestamp from all replicas even while
// lease transfers are ongoing. The test launches a goroutine to randomly
// trigger transfers at random intervals up to 50ms and ensures that there
// are no errors reading the same value from any replica throughout the
// duration of the test (testTime).
const testTime = 500 * time.Millisecond
const maxTransferWait = 50 * time.Millisecond
deadline := timeutil.Now().Add(testTime)
g, gCtx := errgroup.WithContext(ctx)

transferLeasesRandomlyUntilDeadline := func() error {
for timeutil.Now().Before(deadline) {
lh := getCurrentLeaseholder(t, tc, desc)
target := pickRandomTarget(tc, lh, desc)
if err := tc.TransferRangeLease(desc, target); err != nil {
return err
}
time.Sleep(time.Duration(rand.Intn(int(maxTransferWait))))
}
return nil
}
g.Go(transferLeasesRandomlyUntilDeadline)

// Attempt to send read requests to a replica in a tight loop until deadline
// is reached. If an error is seen on any replica then it is returned to the
// errgroup.
baRead = makeReadBatchRequestForDesc(desc, ts)
ensureCanReadFromReplicaUntilDeadline := func(r *kvserver.Replica) {
g.Go(func() error {
for timeutil.Now().Before(deadline) {
resp, pErr := r.Send(gCtx, baRead)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "on %s", r)
}
rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
// Should see the write.
if len(rows) != 1 {
return fmt.Errorf("expected one row, but got %d", len(rows))
}
}
return nil
})
}
for _, r := range repls {
ensureCanReadFromReplicaUntilDeadline(r)
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}

// TestClosedTimestampCanServeWithConflictingIntent validates that a read served
// from a follower replica will wait on conflicting intents and ensure that they
// are cleaned up if necessary to allow the read to proceed.
Expand Down Expand Up @@ -362,7 +285,7 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) {
ctx := context.Background()
// Set up the target duration to be very long and rely on lease transfers to
// drive MaxClosed.
tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, time.Hour,
tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration,
testingCloseFraction, aggressiveResolvedTimestampClusterArgs)
defer tc.Stopper().Stop(ctx)

Expand All @@ -377,6 +300,22 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) {
target := pickRandomTarget(tc, lh, desc)
require.Nil(t, tc.TransferRangeLease(desc, target))
baRead := makeReadBatchRequestForDesc(desc, ts)

// Poll the store for closed timestamp updates for timestamps greater than the
// `ts` of our read request.
g := ctxgroup.WithContext(ctx)
closedTimestampCh := make(chan ctpb.Entry, 1)
g.Go(func() (e error) {
pollForGreaterClosedTimestamp(t, tc, lh, desc, ts, closedTimestampCh)
return
})
log.Infof(ctx, "waiting for next closed timestamp update for the scratch range")
select {
case <-closedTimestampCh:
case <-time.After(30 * time.Second):
t.Fatal("failed to receive next closed timestamp update")
}

testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})
Expand Down Expand Up @@ -486,7 +425,7 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) {
rightLeaseholder roachpb.ReplicationTarget,
freezeStartTimestamp hlc.Timestamp,
leaseAcquisitionTrap *atomic.Value,
) (roachpb.ReplicationTarget, hlc.Timestamp, error)
) (roachpb.ReplicationTarget, error)

type testCase struct {
name string
Expand Down Expand Up @@ -608,6 +547,9 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) {
}
}()

// freezeStartTimestamp indicates the low water mark for closed timestamp
// updates beyond which we expect none of the followers to be able to serve
// follower reads until the merge is complete.
var freezeStartTimestamp hlc.Timestamp
// We now have the RHS in its subsumed state.
select {
Expand All @@ -623,38 +565,38 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) {
testingTargetDuration)); err != nil {
t.Fatal(err)
}
// inactiveClosedTSBoundary indicates the low water mark for closed
// timestamp updates beyond which we expect none of the followers to be able
// to serve follower reads until the merge is complete.
inactiveClosedTSBoundary := freezeStartTimestamp
if callback != nil {
newRightLeaseholder, ts, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder,
newRightLeaseholder, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder,
freezeStartTimestamp, &leaseAcquisitionTrap)
if err != nil {
t.Fatal(err)
}
rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts
rightLeaseholder = newRightLeaseholder
}
// Poll the store for closed timestamp updates for timestamps greater than
// our `inactiveClosedTSBoundary`.
// our `freezeStartTimestamp`.
closedTimestampCh := make(chan ctpb.Entry, 1)
g.Go(func() (e error) {
pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, inactiveClosedTSBoundary, closedTimestampCh)
pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, freezeStartTimestamp, closedTimestampCh)
return
})
// We expect that none of the closed timestamp updates greater than
// `inactiveClosedTSBoundary` will be actionable by the RHS follower
// `freezeStartTimestamp` will be actionable by the RHS follower
// replicas.
log.Infof(ctx, "waiting for next closed timestamp update for the RHS")
select {
case <-closedTimestampCh:
case <-time.After(30 * time.Second):
t.Fatal("failed to receive next closed timestamp update")
}
baReadAfterLeaseTransfer := makeReadBatchRequestForDesc(rightDesc, inactiveClosedTSBoundary.Next())
baReadAfterSubsume := makeReadBatchRequestForDesc(rightDesc, freezeStartTimestamp.Next())
rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder)
log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary")
verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */)
log.Infof(
ctx,
"sending read requests to followers after the freezeStartTimestamp (at ts: %s)",
freezeStartTimestamp.Next(),
)
verifyNotLeaseHolderErrors(t, baReadAfterSubsume, rightReplFollowers, 2 /* expectedNLEs */)
}

for _, test := range tests {
Expand All @@ -676,7 +618,7 @@ func forceLeaseTransferOnSubsumedRange(
rightLeaseholder roachpb.ReplicationTarget,
freezeStartTimestamp hlc.Timestamp,
leaseAcquisitionTrap *atomic.Value,
) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp, err error) {
) (newLeaseholder roachpb.ReplicationTarget, err error) {
oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rightLeaseholder)
// Co-operative lease transfers will block while a range is subsumed, so we
// pause the node liveness heartbeats until a lease transfer occurs.
Expand Down Expand Up @@ -731,17 +673,18 @@ func forceLeaseTransferOnSubsumedRange(
if storeID != newRightLeaseholder.StoreID() {
err = errors.Newf("expected store %d to try to acquire the lease; got a request from store %d instead",
newRightLeaseholder.StoreID(), storeID)
return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err
return roachpb.ReplicationTarget{}, err
}
case <-time.After(30 * time.Second):
err = errors.New("failed to receive lease acquisition request")
return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err
return roachpb.ReplicationTarget{}, err
}
rightLeaseholder = roachpb.ReplicationTarget{
NodeID: newRightLeaseholder.NodeID(),
StoreID: newRightLeaseholder.StoreID(),
}
oldLeaseholderStore = getTargetStoreOrFatal(t, tc, rightLeaseholder)
var leaseStart hlc.Timestamp
err = retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
newLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease()
if newLease.Sequence == oldLease.Sequence {
Expand All @@ -755,10 +698,10 @@ func forceLeaseTransferOnSubsumedRange(
}
if !freezeStartTimestamp.LessEq(leaseStart) {
err = errors.New("freeze timestamp greater than the start time of the new lease")
return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err
return roachpb.ReplicationTarget{}, err
}

return rightLeaseholder, leaseStart, nil
return rightLeaseholder, nil
}

// mergeFilter provides a method (SuspendMergeTrigger) that can be installed as
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (r *Replica) LastAssignedLeaseIndex() uint64 {

// MaxClosed returns the maximum closed timestamp known to the Replica.
func (r *Replica) MaxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) {
return r.maxClosed(ctx)
return r.maxClosed()
}

// SetQuotaPool allows the caller to set a replica's quota pool initialized to
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func (r *Replica) State() kvserverpb.RangeInfo {
// NB: this acquires an RLock(). Reentrant RLocks are deadlock prone, so do
// this first before RLocking below. Performance of this extra lock
// acquisition is not a concern.
ri.ActiveClosedTimestamp, _ = r.maxClosed(context.Background())
ri.ActiveClosedTimestamp, _ = r.maxClosed()

// NB: numRangefeedRegistrations doesn't require Replica.mu to be locked.
// However, it does require coordination between multiple goroutines, so
Expand Down
26 changes: 16 additions & 10 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (r *Replica) canServeFollowerRead(
ts.Forward(ba.Txn.MaxTimestamp)
}

maxClosed, _ := r.maxClosed(ctx)
maxClosed, _ := r.maxClosed()
canServeFollowerRead := ts.LessEq(maxClosed)
tsDiff := ts.GoTime().Sub(maxClosed.GoTime())
if !canServeFollowerRead {
Expand Down Expand Up @@ -107,18 +107,25 @@ func (r *Replica) canServeFollowerRead(
return nil
}

// maxClosed returns the maximum closed timestamp for this range.
// It is computed as the most recent of the known closed timestamp for the
// current lease holder for this range as tracked by the closed timestamp
// subsystem and the start time of the current lease. It is safe to use the
// start time of the current lease because leasePostApply bumps the timestamp
// cache forward to at least the new lease start time. Using this combination
// allows the closed timestamp mechanism to be robust to lease transfers.
// maxClosed returns the maximum closed timestamp for this range. It is computed
// as the most recent of the known closed timestamp for the current lease holder
// for this range as tracked by the closed timestamp subsystem.
//
// If the ok return value is false, the Replica is a member of a range which
// uses an expiration-based lease. Expiration-based leases do not support the
// closed timestamp subsystem. A zero-value timestamp will be returned if ok
// is false.
func (r *Replica) maxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) {
//
// NB: It is unsafe to use the start time of the current lease as a lower bound
// on `maxClosed()` because, even though `leasePostApply()` bumps the timestamp
// cache forward to at least the new lease start time, the range may have been
// in the middle of a merge when the lease transfer occurred. Since
// `SubsumeRequest` is not a replicated command, other replicas do not have
// access to the subsumption time. We choose to be pessimistic here and allow
// closed timestamp state to be temporarily reset after a lease transfer, in
// order to avoid potentially serving follower reads past the subsumption time
// of a range.
func (r *Replica) maxClosed() (_ hlc.Timestamp, ok bool) {
r.mu.RLock()
lai := r.mu.state.LeaseAppliedIndex
lease := *r.mu.state.Lease
Expand All @@ -129,7 +136,6 @@ func (r *Replica) maxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) {
}
maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed(
lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai))
maxClosed.Forward(lease.Start)
maxClosed.Forward(initialMaxClosed)
return maxClosed, true
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(ctx context.Context) {
}

// Determine what the maximum closed timestamp is for this replica.
closedTS, _ := r.maxClosed(ctx)
closedTS, _ := r.maxClosed()

// If the closed timestamp is sufficiently stale, signal that we want an
// update to the leaseholder so that it will eventually begin to progress
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2550,7 +2550,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
if wps, dur := rep.writeStats.avgQPS(); dur >= MinStatsDuration {
averageWritesPerSecond += wps
}
mc, ok := rep.maxClosed(ctx)
mc, ok := rep.maxClosed()
if ok && (minMaxClosedTS.IsEmpty() || mc.Less(minMaxClosedTS)) {
minMaxClosedTS = mc
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func splitPreApply(
// the hazard and ensures that no replica on the RHS is created with an
// initialMaxClosed that could be violated by a proposal on the RHS's
// initial leaseholder. See #44878.
initialMaxClosed, _ := r.maxClosed(ctx)
initialMaxClosed, _ := r.maxClosed()
rightRepl.mu.Lock()
rightRepl.mu.initialMaxClosed = initialMaxClosed
rightRepl.mu.Unlock()
Expand Down