From 7c70b0614d8e7aafa6dfa9beddb05b88d1173553 Mon Sep 17 00:00:00 2001 From: Andrew Baptist Date: Wed, 22 Jun 2022 14:49:24 -0400 Subject: [PATCH] kv: Fix test to correctly test the SystemClass Previously this test had several issues that are all addressed. First the system range it was writing to only had a single replica, so even if it was correctly written to, a network partition wouldn't test anything. Additionally, the address it was attempting to write to was a lease change, which is not in the system range. The reason this worked previously is that there was an early exist from the change lease code if the old and new leaseholders were the same, so it didn't actually attempt to write to the leaseholder name at all. This change addresses all this, but writing to a key in the liveness range and first replicating the liveness range to all nodes. Release note: None --- pkg/kv/kvserver/client_raft_test.go | 66 ++++++++++++++--------------- 1 file changed, 31 insertions(+), 35 deletions(-) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 437d5936c766..370b617b5dd7 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -4656,11 +4656,11 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) { type disablingClientStream struct { grpc.ClientStream - disabled *atomic.Value + disabled func() bool } func (cs *disablingClientStream) SendMsg(m interface{}) error { - if cs.disabled.Load().(bool) { + if cs.disabled() { return nil } return cs.ClientStream.SendMsg(m) @@ -4681,12 +4681,16 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing defer stopper.Stop(ctx) // disabled controls whether to disrupt DefaultClass streams. - var disabled atomic.Value + var disabled, disabledSystem atomic.Value disabled.Store(false) + disabledSystem.Store(false) knobs := rpc.ContextTestingKnobs{ StreamClientInterceptor: func(target string, class rpc.ConnectionClass) grpc.StreamClientInterceptor { - if class == rpc.SystemClass { - return nil + disabledFunc := func() bool { + if class == rpc.SystemClass { + return disabledSystem.Load().(bool) + } + return disabled.Load().(bool) } return func( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, @@ -4697,7 +4701,7 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing return nil, err } return &disablingClientStream{ - disabled: &disabled, + disabled: disabledFunc, ClientStream: cs, }, nil } @@ -4731,8 +4735,13 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing defer tc.Stopper().Stop(ctx) // Make a key that's in the user data space. keyA := append(keys.SystemSQLCodec.TablePrefix(100), 'a') + // Split so that we can assign voters to the range and assign all three. tc.SplitRangeOrFatal(t, keyA) - desc := tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...) + tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...) + + // We need a key in the meta range that we can add voters to. This range can't be split. + keyLiveness := append(keys.NodeLivenessPrefix, 'a') + tc.AddVotersOrFatal(t, keys.NodeLivenessPrefix, tc.Targets(1, 2)...) // Create a test function so that we can run the test both immediately after // up-replicating and after a restart. runTest := func(t *testing.T) { @@ -4744,44 +4753,31 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing // Wait for all nodes to catch up. tc.WaitForValues(t, keyA, []int64{1, 1, 1}) disabled.Store(true) - repl1, err := store.GetReplica(desc.RangeID) - require.Nil(t, err) - // Transfer the lease on the range. Make sure there's no pending transfer. - var lease roachpb.Lease - testutils.SucceedsSoon(t, func() error { - var next roachpb.Lease - lease, next = repl1.GetLease() - if next != (roachpb.Lease{}) { - return fmt.Errorf("lease transfer in process, next = %v", next) - } - return nil - }) - // Use SucceedsSoon to deal with rare stress cases where the lease - // transfer may fail. - testutils.SucceedsSoon(t, func() error { - return tc.TransferRangeLease(*repl1.Desc(), roachpb.ReplicationTarget{ - NodeID: roachpb.NodeID(lease.Replica.StoreID), - StoreID: lease.Replica.StoreID}) - }) // Set a relatively short timeout so that this test doesn't take too long. // We should always hit it. withTimeout, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - err = db.Put(withTimeout, keyA, 2) - require.True(t, testutils.IsError(err, "deadline exceeded"), err) - // Transfer the lease back to demonstrate that the system range is still live. - testutils.SucceedsSoon(t, func() error { - return tc.TransferRangeLease(desc, roachpb.ReplicationTarget{ - NodeID: roachpb.NodeID(lease.Replica.StoreID), - StoreID: lease.Replica.StoreID}) - }) + + // Write to the liveness range on the System class. + require.NoError(t, db.Put(withTimeout, keyLiveness, 2), "Expected success writing to liveness range") + + // Write to the standard range on the default class. + require.ErrorIs(t, db.Put(withTimeout, keyA, 2), context.DeadlineExceeded, + "Expected timeout writing to key range") + + // Write to the liveness range on the System class with system disabled to + // ensure the test is actually working. + disabledSystem.Store(true) + require.ErrorIs(t, db.Put(withTimeout, keyLiveness, 2), + context.DeadlineExceeded, "Expected timeout writing to liveness range") + disabledSystem.Store(false) // Heal the partition, the previous proposal may now succeed but it may have // have been canceled. disabled.Store(false) // Overwrite with a new value and ensure that it propagates. - require.NoError(t, db.Put(ctx, keyA, 3)) + require.NoError(t, db.Put(ctx, keyA, 3), "Expected success after healed partition") tc.WaitForValues(t, keyA, []int64{3, 3, 3}) } t.Run("initial_run", runTest)