Skip to content

Commit

Permalink
kv: Fix test to correctly test the SystemClass
Browse files Browse the repository at this point in the history
Previously this test had several issues that are all addressed.
First the system range it was writing to only had a single replica, so
even if it was correctly written to, a network partition wouldn't test
anything. Additionally, the address it was attempting to write to was
a lease change, which is not in the system range. The reason this worked
previously is that there was an early exist from the change lease code
if the old and new leaseholders were the same, so it didn't actually
attempt to write to the leaseholder name at all.

This change addresses all this, but writing to a key in the liveness
range and first replicating the liveness range to all nodes.

Release note: None
  • Loading branch information
andrewbaptist committed Jun 23, 2022
1 parent 76cbd88 commit 7c70b06
Showing 1 changed file with 31 additions and 35 deletions.
66 changes: 31 additions & 35 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4656,11 +4656,11 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) {

type disablingClientStream struct {
grpc.ClientStream
disabled *atomic.Value
disabled func() bool
}

func (cs *disablingClientStream) SendMsg(m interface{}) error {
if cs.disabled.Load().(bool) {
if cs.disabled() {
return nil
}
return cs.ClientStream.SendMsg(m)
Expand All @@ -4681,12 +4681,16 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing
defer stopper.Stop(ctx)

// disabled controls whether to disrupt DefaultClass streams.
var disabled atomic.Value
var disabled, disabledSystem atomic.Value
disabled.Store(false)
disabledSystem.Store(false)
knobs := rpc.ContextTestingKnobs{
StreamClientInterceptor: func(target string, class rpc.ConnectionClass) grpc.StreamClientInterceptor {
if class == rpc.SystemClass {
return nil
disabledFunc := func() bool {
if class == rpc.SystemClass {
return disabledSystem.Load().(bool)
}
return disabled.Load().(bool)
}
return func(
ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn,
Expand All @@ -4697,7 +4701,7 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing
return nil, err
}
return &disablingClientStream{
disabled: &disabled,
disabled: disabledFunc,
ClientStream: cs,
}, nil
}
Expand Down Expand Up @@ -4731,8 +4735,13 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing
defer tc.Stopper().Stop(ctx)
// Make a key that's in the user data space.
keyA := append(keys.SystemSQLCodec.TablePrefix(100), 'a')
// Split so that we can assign voters to the range and assign all three.
tc.SplitRangeOrFatal(t, keyA)
desc := tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...)
tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...)

// We need a key in the meta range that we can add voters to. This range can't be split.
keyLiveness := append(keys.NodeLivenessPrefix, 'a')
tc.AddVotersOrFatal(t, keys.NodeLivenessPrefix, tc.Targets(1, 2)...)
// Create a test function so that we can run the test both immediately after
// up-replicating and after a restart.
runTest := func(t *testing.T) {
Expand All @@ -4744,44 +4753,31 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing
// Wait for all nodes to catch up.
tc.WaitForValues(t, keyA, []int64{1, 1, 1})
disabled.Store(true)
repl1, err := store.GetReplica(desc.RangeID)
require.Nil(t, err)
// Transfer the lease on the range. Make sure there's no pending transfer.
var lease roachpb.Lease
testutils.SucceedsSoon(t, func() error {
var next roachpb.Lease
lease, next = repl1.GetLease()
if next != (roachpb.Lease{}) {
return fmt.Errorf("lease transfer in process, next = %v", next)
}
return nil
})

// Use SucceedsSoon to deal with rare stress cases where the lease
// transfer may fail.
testutils.SucceedsSoon(t, func() error {
return tc.TransferRangeLease(*repl1.Desc(), roachpb.ReplicationTarget{
NodeID: roachpb.NodeID(lease.Replica.StoreID),
StoreID: lease.Replica.StoreID})
})
// Set a relatively short timeout so that this test doesn't take too long.
// We should always hit it.
withTimeout, cancel := context.WithTimeout(ctx, 20*time.Millisecond)
defer cancel()
err = db.Put(withTimeout, keyA, 2)
require.True(t, testutils.IsError(err, "deadline exceeded"), err)
// Transfer the lease back to demonstrate that the system range is still live.
testutils.SucceedsSoon(t, func() error {
return tc.TransferRangeLease(desc, roachpb.ReplicationTarget{
NodeID: roachpb.NodeID(lease.Replica.StoreID),
StoreID: lease.Replica.StoreID})
})

// Write to the liveness range on the System class.
require.NoError(t, db.Put(withTimeout, keyLiveness, 2), "Expected success writing to liveness range")

// Write to the standard range on the default class.
require.ErrorIs(t, db.Put(withTimeout, keyA, 2), context.DeadlineExceeded,
"Expected timeout writing to key range")

// Write to the liveness range on the System class with system disabled to
// ensure the test is actually working.
disabledSystem.Store(true)
require.ErrorIs(t, db.Put(withTimeout, keyLiveness, 2),
context.DeadlineExceeded, "Expected timeout writing to liveness range")
disabledSystem.Store(false)

// Heal the partition, the previous proposal may now succeed but it may have
// have been canceled.
disabled.Store(false)
// Overwrite with a new value and ensure that it propagates.
require.NoError(t, db.Put(ctx, keyA, 3))
require.NoError(t, db.Put(ctx, keyA, 3), "Expected success after healed partition")
tc.WaitForValues(t, keyA, []int64{3, 3, 3})
}
t.Run("initial_run", runTest)
Expand Down

0 comments on commit 7c70b06

Please sign in to comment.