Skip to content

Commit

Permalink
Merge pull request #65823 from aayushshah15/stopForwardingClosedTSToL…
Browse files Browse the repository at this point in the history
…easeStartTime
  • Loading branch information
aayushshah15 authored Jul 20, 2021
2 parents 8e04bb2 + c2a08bd commit 8dad4d7
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 112 deletions.
137 changes: 40 additions & 97 deletions pkg/kv/kvserver/closed_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -114,84 +115,6 @@ func TestClosedTimestampCanServe(t *testing.T) {
}
}

// TestClosedTimestampCanServerThroughoutLeaseTransfer verifies that lease
// transfers does not prevent reading a value from a follower that was
// previously readable.
func TestClosedTimestampCanServeThroughoutLeaseTransfer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Limiting how long transactions can run does not work
// well with race unless we're extremely lenient, which
// drives up the test duration.
skip.UnderRace(t)

ctx := context.Background()
tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration,
testingCloseFraction, aggressiveResolvedTimestampClusterArgs)
defer tc.Stopper().Stop(ctx)

if _, err := db0.Exec(`INSERT INTO cttest.kv VALUES(1, $1)`, "foo"); err != nil {
t.Fatal(err)
}
ts := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
baRead := makeReadBatchRequestForDesc(desc, ts)
testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})

// Once we know that we can read safely at this timestamp, we want to ensure
// that we can always read from this timestamp from all replicas even while
// lease transfers are ongoing. The test launches a goroutine to randomly
// trigger transfers at random intervals up to 50ms and ensures that there
// are no errors reading the same value from any replica throughout the
// duration of the test (testTime).
const testTime = 500 * time.Millisecond
const maxTransferWait = 50 * time.Millisecond
deadline := timeutil.Now().Add(testTime)
g, gCtx := errgroup.WithContext(ctx)

transferLeasesRandomlyUntilDeadline := func() error {
for timeutil.Now().Before(deadline) {
lh := getCurrentLeaseholder(t, tc, desc)
target := pickRandomTarget(tc, lh, desc)
if err := tc.TransferRangeLease(desc, target); err != nil {
return err
}
time.Sleep(time.Duration(rand.Intn(int(maxTransferWait))))
}
return nil
}
g.Go(transferLeasesRandomlyUntilDeadline)

// Attempt to send read requests to a replica in a tight loop until deadline
// is reached. If an error is seen on any replica then it is returned to the
// errgroup.
baRead = makeReadBatchRequestForDesc(desc, ts)
ensureCanReadFromReplicaUntilDeadline := func(r *kvserver.Replica) {
g.Go(func() error {
for timeutil.Now().Before(deadline) {
resp, pErr := r.Send(gCtx, baRead)
if pErr != nil {
return errors.Wrapf(pErr.GoError(), "on %s", r)
}
rows := resp.Responses[0].GetInner().(*roachpb.ScanResponse).Rows
// Should see the write.
if len(rows) != 1 {
return fmt.Errorf("expected one row, but got %d", len(rows))
}
}
return nil
})
}
for _, r := range repls {
ensureCanReadFromReplicaUntilDeadline(r)
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}

// TestClosedTimestampCanServeWithConflictingIntent validates that a read served
// from a follower replica will wait on conflicting intents and ensure that they
// are cleaned up if necessary to allow the read to proceed.
Expand Down Expand Up @@ -362,7 +285,7 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) {
ctx := context.Background()
// Set up the target duration to be very long and rely on lease transfers to
// drive MaxClosed.
tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, time.Hour,
tc, db0, desc, repls := setupClusterForClosedTimestampTesting(ctx, t, testingTargetDuration,
testingCloseFraction, aggressiveResolvedTimestampClusterArgs)
defer tc.Stopper().Stop(ctx)

Expand All @@ -377,6 +300,22 @@ func TestClosedTimestampCantServeBasedOnMaxTimestamp(t *testing.T) {
target := pickRandomTarget(tc, lh, desc)
require.Nil(t, tc.TransferRangeLease(desc, target))
baRead := makeReadBatchRequestForDesc(desc, ts)

// Poll the store for closed timestamp updates for timestamps greater than the
// `ts` of our read request.
g := ctxgroup.WithContext(ctx)
closedTimestampCh := make(chan ctpb.Entry, 1)
g.Go(func() (e error) {
pollForGreaterClosedTimestamp(t, tc, lh, desc, ts, closedTimestampCh)
return
})
log.Infof(ctx, "waiting for next closed timestamp update for the scratch range")
select {
case <-closedTimestampCh:
case <-time.After(30 * time.Second):
t.Fatal("failed to receive next closed timestamp update")
}

testutils.SucceedsSoon(t, func() error {
return verifyCanReadFromAllRepls(ctx, t, baRead, repls, expectRows(1))
})
Expand Down Expand Up @@ -486,7 +425,7 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) {
rightLeaseholder roachpb.ReplicationTarget,
freezeStartTimestamp hlc.Timestamp,
leaseAcquisitionTrap *atomic.Value,
) (roachpb.ReplicationTarget, hlc.Timestamp, error)
) (roachpb.ReplicationTarget, error)

type testCase struct {
name string
Expand Down Expand Up @@ -608,6 +547,9 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) {
}
}()

// freezeStartTimestamp indicates the low water mark for closed timestamp
// updates beyond which we expect none of the followers to be able to serve
// follower reads until the merge is complete.
var freezeStartTimestamp hlc.Timestamp
// We now have the RHS in its subsumed state.
select {
Expand All @@ -623,38 +565,38 @@ func TestClosedTimestampInactiveAfterSubsumption(t *testing.T) {
testingTargetDuration)); err != nil {
t.Fatal(err)
}
// inactiveClosedTSBoundary indicates the low water mark for closed
// timestamp updates beyond which we expect none of the followers to be able
// to serve follower reads until the merge is complete.
inactiveClosedTSBoundary := freezeStartTimestamp
if callback != nil {
newRightLeaseholder, ts, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder,
newRightLeaseholder, err := callback(ctx, t, tc, g, rightDesc, rightLeaseholder,
freezeStartTimestamp, &leaseAcquisitionTrap)
if err != nil {
t.Fatal(err)
}
rightLeaseholder, inactiveClosedTSBoundary = newRightLeaseholder, ts
rightLeaseholder = newRightLeaseholder
}
// Poll the store for closed timestamp updates for timestamps greater than
// our `inactiveClosedTSBoundary`.
// our `freezeStartTimestamp`.
closedTimestampCh := make(chan ctpb.Entry, 1)
g.Go(func() (e error) {
pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, inactiveClosedTSBoundary, closedTimestampCh)
pollForGreaterClosedTimestamp(t, tc, rightLeaseholder, rightDesc, freezeStartTimestamp, closedTimestampCh)
return
})
// We expect that none of the closed timestamp updates greater than
// `inactiveClosedTSBoundary` will be actionable by the RHS follower
// `freezeStartTimestamp` will be actionable by the RHS follower
// replicas.
log.Infof(ctx, "waiting for next closed timestamp update for the RHS")
select {
case <-closedTimestampCh:
case <-time.After(30 * time.Second):
t.Fatal("failed to receive next closed timestamp update")
}
baReadAfterLeaseTransfer := makeReadBatchRequestForDesc(rightDesc, inactiveClosedTSBoundary.Next())
baReadAfterSubsume := makeReadBatchRequestForDesc(rightDesc, freezeStartTimestamp.Next())
rightReplFollowers := getFollowerReplicas(ctx, t, tc, rightDesc, rightLeaseholder)
log.Infof(ctx, "sending read requests from followers after the inactiveClosedTSBoundary")
verifyNotLeaseHolderErrors(t, baReadAfterLeaseTransfer, rightReplFollowers, 2 /* expectedNLEs */)
log.Infof(
ctx,
"sending read requests to followers after the freezeStartTimestamp (at ts: %s)",
freezeStartTimestamp.Next(),
)
verifyNotLeaseHolderErrors(t, baReadAfterSubsume, rightReplFollowers, 2 /* expectedNLEs */)
}

for _, test := range tests {
Expand All @@ -676,7 +618,7 @@ func forceLeaseTransferOnSubsumedRange(
rightLeaseholder roachpb.ReplicationTarget,
freezeStartTimestamp hlc.Timestamp,
leaseAcquisitionTrap *atomic.Value,
) (newLeaseholder roachpb.ReplicationTarget, leaseStart hlc.Timestamp, err error) {
) (newLeaseholder roachpb.ReplicationTarget, err error) {
oldLeaseholderStore := getTargetStoreOrFatal(t, tc, rightLeaseholder)
// Co-operative lease transfers will block while a range is subsumed, so we
// pause the node liveness heartbeats until a lease transfer occurs.
Expand Down Expand Up @@ -731,17 +673,18 @@ func forceLeaseTransferOnSubsumedRange(
if storeID != newRightLeaseholder.StoreID() {
err = errors.Newf("expected store %d to try to acquire the lease; got a request from store %d instead",
newRightLeaseholder.StoreID(), storeID)
return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err
return roachpb.ReplicationTarget{}, err
}
case <-time.After(30 * time.Second):
err = errors.New("failed to receive lease acquisition request")
return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err
return roachpb.ReplicationTarget{}, err
}
rightLeaseholder = roachpb.ReplicationTarget{
NodeID: newRightLeaseholder.NodeID(),
StoreID: newRightLeaseholder.StoreID(),
}
oldLeaseholderStore = getTargetStoreOrFatal(t, tc, rightLeaseholder)
var leaseStart hlc.Timestamp
err = retry.ForDuration(testutils.DefaultSucceedsSoonDuration, func() error {
newLease, _ := oldLeaseholderStore.LookupReplica(rightDesc.StartKey).GetLease()
if newLease.Sequence == oldLease.Sequence {
Expand All @@ -755,10 +698,10 @@ func forceLeaseTransferOnSubsumedRange(
}
if !freezeStartTimestamp.LessEq(leaseStart) {
err = errors.New("freeze timestamp greater than the start time of the new lease")
return roachpb.ReplicationTarget{}, hlc.Timestamp{}, err
return roachpb.ReplicationTarget{}, err
}

return rightLeaseholder, leaseStart, nil
return rightLeaseholder, nil
}

// mergeFilter provides a method (SuspendMergeTrigger) that can be installed as
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func (r *Replica) LastAssignedLeaseIndex() uint64 {

// MaxClosed returns the maximum closed timestamp known to the Replica.
func (r *Replica) MaxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) {
return r.maxClosed(ctx)
return r.maxClosed()
}

// SetQuotaPool allows the caller to set a replica's quota pool initialized to
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,7 +1025,7 @@ func (r *Replica) State() kvserverpb.RangeInfo {
// NB: this acquires an RLock(). Reentrant RLocks are deadlock prone, so do
// this first before RLocking below. Performance of this extra lock
// acquisition is not a concern.
ri.ActiveClosedTimestamp, _ = r.maxClosed(context.Background())
ri.ActiveClosedTimestamp, _ = r.maxClosed()

// NB: numRangefeedRegistrations doesn't require Replica.mu to be locked.
// However, it does require coordination between multiple goroutines, so
Expand Down
26 changes: 16 additions & 10 deletions pkg/kv/kvserver/replica_follower_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (r *Replica) canServeFollowerRead(
ts.Forward(ba.Txn.MaxTimestamp)
}

maxClosed, _ := r.maxClosed(ctx)
maxClosed, _ := r.maxClosed()
canServeFollowerRead := ts.LessEq(maxClosed)
tsDiff := ts.GoTime().Sub(maxClosed.GoTime())
if !canServeFollowerRead {
Expand Down Expand Up @@ -107,18 +107,25 @@ func (r *Replica) canServeFollowerRead(
return nil
}

// maxClosed returns the maximum closed timestamp for this range.
// It is computed as the most recent of the known closed timestamp for the
// current lease holder for this range as tracked by the closed timestamp
// subsystem and the start time of the current lease. It is safe to use the
// start time of the current lease because leasePostApply bumps the timestamp
// cache forward to at least the new lease start time. Using this combination
// allows the closed timestamp mechanism to be robust to lease transfers.
// maxClosed returns the maximum closed timestamp for this range. It is computed
// as the most recent of the known closed timestamp for the current lease holder
// for this range as tracked by the closed timestamp subsystem.
//
// If the ok return value is false, the Replica is a member of a range which
// uses an expiration-based lease. Expiration-based leases do not support the
// closed timestamp subsystem. A zero-value timestamp will be returned if ok
// is false.
func (r *Replica) maxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) {
//
// NB: It is unsafe to use the start time of the current lease as a lower bound
// on `maxClosed()` because, even though `leasePostApply()` bumps the timestamp
// cache forward to at least the new lease start time, the range may have been
// in the middle of a merge when the lease transfer occurred. Since
// `SubsumeRequest` is not a replicated command, other replicas do not have
// access to the subsumption time. We choose to be pessimistic here and allow
// closed timestamp state to be temporarily reset after a lease transfer, in
// order to avoid potentially serving follower reads past the subsumption time
// of a range.
func (r *Replica) maxClosed() (_ hlc.Timestamp, ok bool) {
r.mu.RLock()
lai := r.mu.state.LeaseAppliedIndex
lease := *r.mu.state.Lease
Expand All @@ -129,7 +136,6 @@ func (r *Replica) maxClosed(ctx context.Context) (_ hlc.Timestamp, ok bool) {
}
maxClosed := r.store.cfg.ClosedTimestamp.Provider.MaxClosed(
lease.Replica.NodeID, r.RangeID, ctpb.Epoch(lease.Epoch), ctpb.LAI(lai))
maxClosed.Forward(lease.Start)
maxClosed.Forward(initialMaxClosed)
return maxClosed, true
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ func (r *Replica) handleClosedTimestampUpdateRaftMuLocked(ctx context.Context) {
}

// Determine what the maximum closed timestamp is for this replica.
closedTS, _ := r.maxClosed(ctx)
closedTS, _ := r.maxClosed()

// If the closed timestamp is sufficiently stale, signal that we want an
// update to the leaseholder so that it will eventually begin to progress
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2550,7 +2550,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
if wps, dur := rep.writeStats.avgQPS(); dur >= MinStatsDuration {
averageWritesPerSecond += wps
}
mc, ok := rep.maxClosed(ctx)
mc, ok := rep.maxClosed()
if ok && (minMaxClosedTS.IsEmpty() || mc.Less(minMaxClosedTS)) {
minMaxClosedTS = mc
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func splitPreApply(
// the hazard and ensures that no replica on the RHS is created with an
// initialMaxClosed that could be violated by a proposal on the RHS's
// initial leaseholder. See #44878.
initialMaxClosed, _ := r.maxClosed(ctx)
initialMaxClosed, _ := r.maxClosed()
rightRepl.mu.Lock()
rightRepl.mu.initialMaxClosed = initialMaxClosed
rightRepl.mu.Unlock()
Expand Down

0 comments on commit 8dad4d7

Please sign in to comment.