Skip to content

Commit

Permalink
kvserver: ensure follower reads correctly synchronize with splits
Browse files Browse the repository at this point in the history
This patch fixes a bug in how follower reads are synchronized with the
application of concurrent split operations. Reads on the leaseholder
are serialized with concurrent split operations by latching. However,
splits are simply applied on the follower, and as such, don't go through
latching like they do on the leaseholder. Previously, this could lead to
invalid reads in cases where the range split and the RHS was removed
after the range descriptor's bounds were checked but before a storage
snapshot was acquired.

This patch fixes this hazard by checking the range bounds after
acquiring the storage snapshot (in addition to before, like we used to
prior to this change). It also adds a couple of tests -- one exercising
the exact scenario described in the associated issue and another that
runs concurrent split/read operations without tightly controlling the
synchronization between them.

Fixes cockroachdb#67016

Release note (bug fix): fixes a rare bug where concurrent follower
read/split operations could lead to invalid read results.
  • Loading branch information
arulajmani committed Nov 8, 2022
1 parent 72b5f06 commit d69d3a9
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 3 deletions.
20 changes: 18 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1463,10 +1463,26 @@ func (r *Replica) checkExecutionCanProceedBeforeStorageSnapshot(
// iterator. An error indicates that the request's timestamp is below the
// Replica's GC threshold.
func (r *Replica) checkExecutionCanProceedAfterStorageSnapshot(
ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus,
ctx context.Context, ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus,
) error {
rSpan, err := keys.Range(ba.Requests)
if err != nil {
return err
}

r.mu.RLock()
defer r.mu.RUnlock()

// Ensure the request is entirely contained within the range's key bounds
// (even) after the storage engine has been pinned by the iterator. Given we
// perform this check before acquiring a storage snapshot, this is only ever
// meaningful in the context of follower reads. This is because latches on
// followers don't provide the synchronization with concurrent splits like
// they do on leaseholders.
if err := r.checkSpanInRangeRLocked(ctx, rSpan); err != nil {
return err
}

// NB: For read-only requests, the GC threshold check is performed after the
// state of the storage engine has been pinned by the iterator. This is
// because GC requests don't acquire latches at the timestamp they are garbage
Expand All @@ -1492,7 +1508,7 @@ func (r *Replica) checkExecutionCanProceedRWOrAdmin(
if err != nil {
return kvserverpb.LeaseStatus{}, err
}
if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil {
if err := r.checkExecutionCanProceedAfterStorageSnapshot(ctx, ba, st); err != nil {
return kvserverpb.LeaseStatus{}, err
}
return st, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (r *Replica) executeReadOnlyBatch(
return nil, g, nil, roachpb.NewError(err)
}

if fn := r.store.TestingKnobs().PreStorageSnapshotButChecksCompleteInterceptor; fn != nil {
fn(r)
}

// Compute the transaction's local uncertainty limit using observed
// timestamps, which can help avoid uncertainty restarts.
ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset())
Expand Down Expand Up @@ -76,7 +80,7 @@ func (r *Replica) executeReadOnlyBatch(
}
defer rw.Close()

if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil {
if err := r.checkExecutionCanProceedAfterStorageSnapshot(ctx, ba, st); err != nil {
return nil, g, nil, roachpb.NewError(err)
}
// TODO(nvanbenschoten): once all replicated intents are pulled into the
Expand Down
257 changes: 257 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14032,3 +14032,260 @@ func TestStoreTenantMetricsAndRateLimiterRefcount(t *testing.T) {
tc.store.tenantRateLimiters.Release(tenLimiter)
}()
}

// TestRangeSplitRacesWithRead performs a range split and repeatedly reads a
// span that straddles both the LHS and RHS post split. We ensure that as long
// as the read wins it observes the entire result set; if (once) the split wins
// the read should return the appropriate error. However, it should never be
// possible for the read to return without error and with a partial result (e.g.
// just the post split LHS). This would indicate a bug in the synchronization
// between read and split operations.
//
// We include subtests for both follower reads and reads served from the
// leaseholder.
func TestRangeSplitRacesWithRead(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "followerRead", func(t *testing.T, followerRead bool) {
ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
key := tc.ScratchRange(t)
key = key[:len(key):len(key)] // bound capacity, avoid aliasing
desc := tc.LookupRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))

var stores []*Store
for i := 0; i < tc.NumServers(); i++ {
server := tc.Server(i)
store, err := server.GetStores().(*Stores).GetStore(server.GetFirstStoreID())
require.NoError(t, err)
stores = append(stores, store)
}
writer := stores[0]
reader := stores[0]
if followerRead {
reader = stores[1]
}

keyA := append(key, byte('a'))
keyB := append(key, byte('b'))
keyC := append(key, byte('c'))
keyD := append(key, byte('d'))
splitKey := keyB

now := tc.Server(0).Clock().Now()
ts1 := now.Add(1, 0)
h1 := roachpb.Header{RangeID: desc.RangeID, Timestamp: ts1}

val := []byte("value")
for _, k := range [][]byte{keyA, keyC} {
pArgs := putArgs(k, val)
_, pErr := kv.SendWrappedWith(ctx, writer, h1, &pArgs)
require.Nil(t, pErr)
}

// If the test wants to read from a follower, drop the closed timestamp
// duration and then wait until the follower can serve requests at ts1.
if followerRead {
_, err := tc.ServerConn(0).Exec(
`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
var ba roachpb.BatchRequest
ba.RangeID = desc.RangeID
ba.ReadConsistency = roachpb.INCONSISTENT
ba.Add(&roachpb.QueryResolvedTimestampRequest{
RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()},
})
br, pErr := reader.Send(ctx, ba)
require.Nil(t, pErr)
rts := br.Responses[0].GetQueryResolvedTimestamp().ResolvedTS
if rts.Less(ts1) {
return errors.Errorf("resolved timestamp %s < %s", rts, ts1)
}
return nil
})
}

read := func() {
scanArgs := scanArgs(keyA, keyD)
for {
resp, pErr := kv.SendWrappedWith(ctx, reader, h1, scanArgs)
if pErr == nil {
t.Logf("read won the race: %v", resp)
require.NotNil(t, resp)
res := resp.(*roachpb.ScanResponse).Rows
require.Equal(t, 2, len(res))
require.Equal(t, keyA, res[0].Key)
require.Equal(t, keyC, res[1].Key)
} else {
t.Logf("read lost the race: %v", pErr)
mismatchErr := &roachpb.RangeKeyMismatchError{}
require.ErrorAs(t, pErr.GoError(), &mismatchErr)
return
}
}
}

split := func() {
splitArgs := &roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Key: splitKey,
},
SplitKey: splitKey,
}
_, pErr := kv.SendWrappedWith(ctx, writer, h1, splitArgs)
require.Nil(t, pErr, "err: %v", pErr.GoError())
rhsDesc := tc.LookupRangeOrFatal(t, splitKey.Next())
// Remove the RHS from the reader.
if followerRead {
tc.RemoveVotersOrFatal(t, roachpb.Key(rhsDesc.StartKey), tc.Target(1))
} else {
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(1))
tc.RemoveVotersOrFatal(t, roachpb.Key(rhsDesc.StartKey), tc.Target(0))
}
}

var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); split() }()
go func() { defer wg.Done(); read() }()
wg.Wait()
})
}

// TestRangeSplitAndRHSRemovalRacesWithFollowerReads acts as a regression test
// for the hazard described in
// https://github.com/cockroachdb/cockroach/issues/67016.
//
// Specifically, the test sets up the following scenario:
// - Follower read begins and checks the request is contained entirely within
// the range's bounds. A storage snapshot isn't acquired just quite yet.
// - The range is split such that the follower read is no longer within the post
// split range; the post-split RHS replica is removed from the node serving the
// follower read.
// - Follower read resumes. The expectation is for the follower read to fail
// with a RangeKeyMismatchError.
func TestRangeSplitAndRHSRemovalRacesWithFollowerRead(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
startSplit := make(chan struct{})
unblockRead := make(chan struct{})
scratchRangeID := roachpb.RangeID(-1)
tc := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: map[int]base.TestServerArgs{
1: {
Knobs: base.TestingKnobs{
Store: &StoreTestingKnobs{
PreStorageSnapshotButChecksCompleteInterceptor: func(r *Replica) {
if r.GetRangeID() != scratchRangeID {
return
}
close(startSplit)
<-unblockRead
},
},
},
},
},
})
defer tc.Stopper().Stop(ctx)
key := tc.ScratchRange(t)
key = key[:len(key):len(key)] // bound capacity, avoid aliasing
desc := tc.LookupRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))

var stores []*Store
for i := 0; i < tc.NumServers(); i++ {
server := tc.Server(i)
store, err := server.GetStores().(*Stores).GetStore(server.GetFirstStoreID())
require.NoError(t, err)
stores = append(stores, store)
}
writer := stores[0]
reader := stores[1]

keyA := append(key, byte('a'))
keyB := append(key, byte('b'))
keyC := append(key, byte('c'))
keyD := append(key, byte('d'))
splitKey := keyB

now := tc.Server(0).Clock().Now()
ts1 := now.Add(1, 0)
h1 := roachpb.Header{RangeID: desc.RangeID, Timestamp: ts1}

val := []byte("value")
for _, k := range [][]byte{keyA, keyC} {
pArgs := putArgs(k, val)
_, pErr := kv.SendWrappedWith(ctx, writer, h1, &pArgs)
require.Nil(t, pErr)
}

// Drop the closed timestamp duration and wait until the follower can serve
// requests at ts1.
_, err := tc.ServerConn(0).Exec(
`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
var ba roachpb.BatchRequest
ba.RangeID = desc.RangeID
ba.ReadConsistency = roachpb.INCONSISTENT
ba.Add(&roachpb.QueryResolvedTimestampRequest{
RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()},
})
br, pErr := reader.Send(ctx, ba)
require.Nil(t, pErr)
rts := br.Responses[0].GetQueryResolvedTimestamp().ResolvedTS
if rts.Less(ts1) {
return errors.Errorf("resolved timestamp %s < %s", rts, ts1)
}
return nil
})

// Set this thing after we've checked for the resolved timestamp, as we don't
// want the QueryResolvedTimestampRequest to block.
scratchRangeID = desc.RangeID

read := func() {
scanArgs := scanArgs(keyA, keyD)
_, pErr := kv.SendWrappedWith(ctx, reader, h1, scanArgs)
require.NotNil(t, pErr)
mismatchErr := &roachpb.RangeKeyMismatchError{}
require.ErrorAs(t, pErr.GoError(), &mismatchErr)
}

split := func() {
select {
case <-startSplit:
case <-time.After(5 * time.Second):
panic("timed out waiting for read to block")
}
splitArgs := &roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Key: splitKey,
},
SplitKey: splitKey,
}
_, pErr := kv.SendWrappedWith(ctx, writer, h1, splitArgs)
require.Nil(t, pErr, "err: %v", pErr.GoError())
rhsDesc := tc.LookupRangeOrFatal(t, splitKey.Next())
tc.RemoveVotersOrFatal(t, roachpb.Key(rhsDesc.StartKey), tc.Target(1))
close(unblockRead)
}

var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); split() }()
go func() { defer wg.Done(); read() }()
wg.Wait()
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ type StoreTestingKnobs struct {
// MVCCGCQueueLeaseCheckInterceptor intercepts calls to Replica.LeaseStatusAt when
// making high priority replica scans.
MVCCGCQueueLeaseCheckInterceptor func(ctx context.Context, replica *Replica, now hlc.ClockTimestamp) bool

// PreStorageSnapshotButChecksCompleteInterceptor intercepts calls to
// Replica.executeReadOnlyBatch after checks have successfully determined
// execution can proceed but a storage snapshot has not been acquired.
PreStorageSnapshotButChecksCompleteInterceptor func(replica *Replica)
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit d69d3a9

Please sign in to comment.