Skip to content

Commit

Permalink
Merge #89886 #90456
Browse files Browse the repository at this point in the history
89886: kvserver: ensure follower reads correctly synchronize with splits r=arulajmani a=arulajmani

This patch fixes a bug in how follower reads are synchronized with the application of concurrent split operations. Reads on the leaseholder are serialized with concurrent split operations by latching. However, splits are simply applied on the follower, and as such, don't go through latching like they do on the leaseholder. Previously, this could lead to invalid reads in cases where the range split and the RHS was removed after the range descriptor's bounds were checked but before a storage snapshot was acquired.

This patch fixes this hazard by checking the range bounds after acquiring the storage snapshot (in addition to before, like we used to prior to this change). It also adds a couple of tests -- one exercising the exact scenario described in the associated issue and another that runs concurrent split/read operations without tightly controlling the synchronization between them.

Fixes #67016

Release note (bug fix): fixes a rare bug where concurrent follower read/split operations could lead to invalid read results.

90456: sqlsmith: do not error when UDTs have no members r=mgartner a=mgartner

Fixes #90433

Release note: None

Co-authored-by: Arul Ajmani <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
  • Loading branch information
3 people committed Oct 24, 2022
3 parents 8be4170 + e1f85b6 + 48bb457 commit cc4f15d
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 27 deletions.
43 changes: 19 additions & 24 deletions pkg/internal/sqlsmith/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree/treebin"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/errors"
"github.com/lib/pq/oid"
)

Expand Down Expand Up @@ -222,29 +221,25 @@ FROM
members = append(members, string(tree.MustBeDString(d)))
}
}
// Try to construct type information from the resulting row.
switch {
case len(members) > 0:
typ := types.MakeEnum(catid.TypeIDToOID(descpb.ID(id)), 0 /* arrayTypeID */)
typ.TypeMeta = types.UserDefinedTypeMetadata{
Name: &types.UserDefinedTypeName{
Schema: scName,
Name: name,
},
EnumData: &types.EnumMetadata{
LogicalRepresentations: members,
// The physical representations don't matter in this case, but the
// enum related code in tree expects that the length of
// PhysicalRepresentations is equal to the length of LogicalRepresentations.
PhysicalRepresentations: make([][]byte, len(members)),
IsMemberReadOnly: make([]bool, len(members)),
},
}
key := tree.MakeSchemaQualifiedTypeName(scName, name)
udtMapping[key] = typ
default:
return nil, errors.Newf("unsupported SQLSmith type kind: %s", string(membersRaw))
}
// Construct type information from the resulting row. Note that the UDT
// may have no members (e.g., `CREATE TYPE t AS ENUM ()`).
typ := types.MakeEnum(catid.TypeIDToOID(descpb.ID(id)), 0 /* arrayTypeID */)
typ.TypeMeta = types.UserDefinedTypeMetadata{
Name: &types.UserDefinedTypeName{
Schema: scName,
Name: name,
},
EnumData: &types.EnumMetadata{
LogicalRepresentations: members,
// The physical representations don't matter in this case, but the
// enum related code in tree expects that the length of
// PhysicalRepresentations is equal to the length of LogicalRepresentations.
PhysicalRepresentations: make([][]byte, len(members)),
IsMemberReadOnly: make([]bool, len(members)),
},
}
key := tree.MakeSchemaQualifiedTypeName(scName, name)
udtMapping[key] = typ
}
var udts []*types.T
for _, t := range udtMapping {
Expand Down
20 changes: 18 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1465,10 +1465,26 @@ func (r *Replica) checkExecutionCanProceedBeforeStorageSnapshot(
// iterator. An error indicates that the request's timestamp is below the
// Replica's GC threshold.
func (r *Replica) checkExecutionCanProceedAfterStorageSnapshot(
ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus,
ctx context.Context, ba *roachpb.BatchRequest, st kvserverpb.LeaseStatus,
) error {
rSpan, err := keys.Range(ba.Requests)
if err != nil {
return err
}

r.mu.RLock()
defer r.mu.RUnlock()

// Ensure the request is entirely contained within the range's key bounds
// (even) after the storage engine has been pinned by the iterator. Given we
// perform this check before acquiring a storage snapshot, this is only ever
// meaningful in the context of follower reads. This is because latches on
// followers don't provide the synchronization with concurrent splits like
// they do on leaseholders.
if err := r.checkSpanInRangeRLocked(ctx, rSpan); err != nil {
return err
}

// NB: For read-only requests, the GC threshold check is performed after the
// state of the storage engine has been pinned by the iterator. This is
// because GC requests don't acquire latches at the timestamp they are garbage
Expand All @@ -1494,7 +1510,7 @@ func (r *Replica) checkExecutionCanProceedRWOrAdmin(
if err != nil {
return kvserverpb.LeaseStatus{}, err
}
if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil {
if err := r.checkExecutionCanProceedAfterStorageSnapshot(ctx, ba, st); err != nil {
return kvserverpb.LeaseStatus{}, err
}
return st, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/replica_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (r *Replica) executeReadOnlyBatch(
return nil, g, nil, roachpb.NewError(err)
}

if fn := r.store.TestingKnobs().PreStorageSnapshotButChecksCompleteInterceptor; fn != nil {
fn(r)
}

// Compute the transaction's local uncertainty limit using observed
// timestamps, which can help avoid uncertainty restarts.
ui := uncertainty.ComputeInterval(&ba.Header, st, r.Clock().MaxOffset())
Expand Down Expand Up @@ -76,7 +80,7 @@ func (r *Replica) executeReadOnlyBatch(
}
defer rw.Close()

if err := r.checkExecutionCanProceedAfterStorageSnapshot(ba, st); err != nil {
if err := r.checkExecutionCanProceedAfterStorageSnapshot(ctx, ba, st); err != nil {
return nil, g, nil, roachpb.NewError(err)
}
// TODO(nvanbenschoten): once all replicated intents are pulled into the
Expand Down
257 changes: 257 additions & 0 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13810,3 +13810,260 @@ func TestStoreTenantMetricsAndRateLimiterRefcount(t *testing.T) {
tc.store.tenantRateLimiters.Release(tenLimiter)
}()
}

// TestRangeSplitRacesWithRead performs a range split and repeatedly reads a
// span that straddles both the LHS and RHS post split. We ensure that as long
// as the read wins it observes the entire result set; if (once) the split wins
// the read should return the appropriate error. However, it should never be
// possible for the read to return without error and with a partial result (e.g.
// just the post split LHS). This would indicate a bug in the synchronization
// between read and split operations.
//
// We include subtests for both follower reads and reads served from the
// leaseholder.
func TestRangeSplitRacesWithRead(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testutils.RunTrueAndFalse(t, "followerRead", func(t *testing.T, followerRead bool) {
ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)
key := tc.ScratchRange(t)
key = key[:len(key):len(key)] // bound capacity, avoid aliasing
desc := tc.LookupRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))

var stores []*Store
for i := 0; i < tc.NumServers(); i++ {
server := tc.Server(i)
store, err := server.GetStores().(*Stores).GetStore(server.GetFirstStoreID())
require.NoError(t, err)
stores = append(stores, store)
}
writer := stores[0]
reader := stores[0]
if followerRead {
reader = stores[1]
}

keyA := append(key, byte('a'))
keyB := append(key, byte('b'))
keyC := append(key, byte('c'))
keyD := append(key, byte('d'))
splitKey := keyB

now := tc.Server(0).Clock().Now()
ts1 := now.Add(1, 0)
h1 := roachpb.Header{RangeID: desc.RangeID, Timestamp: ts1}

val := []byte("value")
for _, k := range [][]byte{keyA, keyC} {
pArgs := putArgs(k, val)
_, pErr := kv.SendWrappedWith(ctx, writer, h1, &pArgs)
require.Nil(t, pErr)
}

// If the test wants to read from a follower, drop the closed timestamp
// duration and then wait until the follower can serve requests at ts1.
if followerRead {
_, err := tc.ServerConn(0).Exec(
`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
var ba roachpb.BatchRequest
ba.RangeID = desc.RangeID
ba.ReadConsistency = roachpb.INCONSISTENT
ba.Add(&roachpb.QueryResolvedTimestampRequest{
RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()},
})
br, pErr := reader.Send(ctx, ba)
require.Nil(t, pErr)
rts := br.Responses[0].GetQueryResolvedTimestamp().ResolvedTS
if rts.Less(ts1) {
return errors.Errorf("resolved timestamp %s < %s", rts, ts1)
}
return nil
})
}

read := func() {
scanArgs := scanArgs(keyA, keyD)
for {
resp, pErr := kv.SendWrappedWith(ctx, reader, h1, scanArgs)
if pErr == nil {
t.Logf("read won the race: %v", resp)
require.NotNil(t, resp)
res := resp.(*roachpb.ScanResponse).Rows
require.Equal(t, 2, len(res))
require.Equal(t, keyA, res[0].Key)
require.Equal(t, keyC, res[1].Key)
} else {
t.Logf("read lost the race: %v", pErr)
mismatchErr := &roachpb.RangeKeyMismatchError{}
require.ErrorAs(t, pErr.GoError(), &mismatchErr)
return
}
}
}

split := func() {
splitArgs := &roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Key: splitKey,
},
SplitKey: splitKey,
}
_, pErr := kv.SendWrappedWith(ctx, writer, h1, splitArgs)
require.Nil(t, pErr, "err: %v", pErr.GoError())
rhsDesc := tc.LookupRangeOrFatal(t, splitKey.Next())
// Remove the RHS from the reader.
if followerRead {
tc.RemoveVotersOrFatal(t, roachpb.Key(rhsDesc.StartKey), tc.Target(1))
} else {
tc.TransferRangeLeaseOrFatal(t, rhsDesc, tc.Target(1))
tc.RemoveVotersOrFatal(t, roachpb.Key(rhsDesc.StartKey), tc.Target(0))
}
}

var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); split() }()
go func() { defer wg.Done(); read() }()
wg.Wait()
})
}

// TestRangeSplitAndRHSRemovalRacesWithFollowerReads acts as a regression test
// for the hazard described in
// https://github.com/cockroachdb/cockroach/issues/67016.
//
// Specifically, the test sets up the following scenario:
// - Follower read begins and checks the request is contained entirely within
// the range's bounds. A storage snapshot isn't acquired just quite yet.
// - The range is split such that the follower read is no longer within the post
// split range; the post-split RHS replica is removed from the node serving the
// follower read.
// - Follower read resumes. The expectation is for the follower read to fail
// with a RangeKeyMismatchError.
func TestRangeSplitAndRHSRemovalRacesWithFollowerRead(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
startSplit := make(chan struct{})
unblockRead := make(chan struct{})
scratchRangeID := roachpb.RangeID(-1)
tc := serverutils.StartNewTestCluster(t, 2, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: map[int]base.TestServerArgs{
1: {
Knobs: base.TestingKnobs{
Store: &StoreTestingKnobs{
PreStorageSnapshotButChecksCompleteInterceptor: func(r *Replica) {
if r.GetRangeID() != scratchRangeID {
return
}
close(startSplit)
<-unblockRead
},
},
},
},
},
})
defer tc.Stopper().Stop(ctx)
key := tc.ScratchRange(t)
key = key[:len(key):len(key)] // bound capacity, avoid aliasing
desc := tc.LookupRangeOrFatal(t, key)
tc.AddVotersOrFatal(t, key, tc.Target(1))

var stores []*Store
for i := 0; i < tc.NumServers(); i++ {
server := tc.Server(i)
store, err := server.GetStores().(*Stores).GetStore(server.GetFirstStoreID())
require.NoError(t, err)
stores = append(stores, store)
}
writer := stores[0]
reader := stores[1]

keyA := append(key, byte('a'))
keyB := append(key, byte('b'))
keyC := append(key, byte('c'))
keyD := append(key, byte('d'))
splitKey := keyB

now := tc.Server(0).Clock().Now()
ts1 := now.Add(1, 0)
h1 := roachpb.Header{RangeID: desc.RangeID, Timestamp: ts1}

val := []byte("value")
for _, k := range [][]byte{keyA, keyC} {
pArgs := putArgs(k, val)
_, pErr := kv.SendWrappedWith(ctx, writer, h1, &pArgs)
require.Nil(t, pErr)
}

// Drop the closed timestamp duration and wait until the follower can serve
// requests at ts1.
_, err := tc.ServerConn(0).Exec(
`SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`)
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
var ba roachpb.BatchRequest
ba.RangeID = desc.RangeID
ba.ReadConsistency = roachpb.INCONSISTENT
ba.Add(&roachpb.QueryResolvedTimestampRequest{
RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()},
})
br, pErr := reader.Send(ctx, ba)
require.Nil(t, pErr)
rts := br.Responses[0].GetQueryResolvedTimestamp().ResolvedTS
if rts.Less(ts1) {
return errors.Errorf("resolved timestamp %s < %s", rts, ts1)
}
return nil
})

// Set this thing after we've checked for the resolved timestamp, as we don't
// want the QueryResolvedTimestampRequest to block.
scratchRangeID = desc.RangeID

read := func() {
scanArgs := scanArgs(keyA, keyD)
_, pErr := kv.SendWrappedWith(ctx, reader, h1, scanArgs)
require.NotNil(t, pErr)
mismatchErr := &roachpb.RangeKeyMismatchError{}
require.ErrorAs(t, pErr.GoError(), &mismatchErr)
}

split := func() {
select {
case <-startSplit:
case <-time.After(5 * time.Second):
panic("timed out waiting for read to block")
}
splitArgs := &roachpb.AdminSplitRequest{
RequestHeader: roachpb.RequestHeader{
Key: splitKey,
},
SplitKey: splitKey,
}
_, pErr := kv.SendWrappedWith(ctx, writer, h1, splitArgs)
require.Nil(t, pErr, "err: %v", pErr.GoError())
rhsDesc := tc.LookupRangeOrFatal(t, splitKey.Next())
tc.RemoveVotersOrFatal(t, roachpb.Key(rhsDesc.StartKey), tc.Target(1))
close(unblockRead)
}

var wg sync.WaitGroup
wg.Add(2)
go func() { defer wg.Done(); split() }()
go func() { defer wg.Done(); read() }()
wg.Wait()
}
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,11 @@ type StoreTestingKnobs struct {
// 1 byte, resulting in each key having its own block. This can provoke bugs
// in time-bound iterators.
SmallEngineBlocks bool

// PreStorageSnapshotButChecksCompleteInterceptor intercepts calls to
// Replica.executeReadOnlyBatch after checks have successfully determined
// execution can proceed but a storage snapshot has not been acquired.
PreStorageSnapshotButChecksCompleteInterceptor func(replica *Replica)
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit cc4f15d

Please sign in to comment.