diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 437d5936c766..370b617b5dd7 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -4656,11 +4656,11 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) { type disablingClientStream struct { grpc.ClientStream - disabled *atomic.Value + disabled func() bool } func (cs *disablingClientStream) SendMsg(m interface{}) error { - if cs.disabled.Load().(bool) { + if cs.disabled() { return nil } return cs.ClientStream.SendMsg(m) @@ -4681,12 +4681,16 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing defer stopper.Stop(ctx) // disabled controls whether to disrupt DefaultClass streams. - var disabled atomic.Value + var disabled, disabledSystem atomic.Value disabled.Store(false) + disabledSystem.Store(false) knobs := rpc.ContextTestingKnobs{ StreamClientInterceptor: func(target string, class rpc.ConnectionClass) grpc.StreamClientInterceptor { - if class == rpc.SystemClass { - return nil + disabledFunc := func() bool { + if class == rpc.SystemClass { + return disabledSystem.Load().(bool) + } + return disabled.Load().(bool) } return func( ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, @@ -4697,7 +4701,7 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing return nil, err } return &disablingClientStream{ - disabled: &disabled, + disabled: disabledFunc, ClientStream: cs, }, nil } @@ -4731,8 +4735,13 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing defer tc.Stopper().Stop(ctx) // Make a key that's in the user data space. keyA := append(keys.SystemSQLCodec.TablePrefix(100), 'a') + // Split so that we can assign voters to the range and assign all three. tc.SplitRangeOrFatal(t, keyA) - desc := tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...) + tc.AddVotersOrFatal(t, keyA, tc.Targets(1, 2)...) + + // We need a key in the meta range that we can add voters to. This range can't be split. + keyLiveness := append(keys.NodeLivenessPrefix, 'a') + tc.AddVotersOrFatal(t, keys.NodeLivenessPrefix, tc.Targets(1, 2)...) // Create a test function so that we can run the test both immediately after // up-replicating and after a restart. runTest := func(t *testing.T) { @@ -4744,44 +4753,31 @@ func TestDefaultConnectionDisruptionDoesNotInterfereWithSystemTraffic(t *testing // Wait for all nodes to catch up. tc.WaitForValues(t, keyA, []int64{1, 1, 1}) disabled.Store(true) - repl1, err := store.GetReplica(desc.RangeID) - require.Nil(t, err) - // Transfer the lease on the range. Make sure there's no pending transfer. - var lease roachpb.Lease - testutils.SucceedsSoon(t, func() error { - var next roachpb.Lease - lease, next = repl1.GetLease() - if next != (roachpb.Lease{}) { - return fmt.Errorf("lease transfer in process, next = %v", next) - } - return nil - }) - // Use SucceedsSoon to deal with rare stress cases where the lease - // transfer may fail. - testutils.SucceedsSoon(t, func() error { - return tc.TransferRangeLease(*repl1.Desc(), roachpb.ReplicationTarget{ - NodeID: roachpb.NodeID(lease.Replica.StoreID), - StoreID: lease.Replica.StoreID}) - }) // Set a relatively short timeout so that this test doesn't take too long. // We should always hit it. withTimeout, cancel := context.WithTimeout(ctx, 20*time.Millisecond) defer cancel() - err = db.Put(withTimeout, keyA, 2) - require.True(t, testutils.IsError(err, "deadline exceeded"), err) - // Transfer the lease back to demonstrate that the system range is still live. - testutils.SucceedsSoon(t, func() error { - return tc.TransferRangeLease(desc, roachpb.ReplicationTarget{ - NodeID: roachpb.NodeID(lease.Replica.StoreID), - StoreID: lease.Replica.StoreID}) - }) + + // Write to the liveness range on the System class. + require.NoError(t, db.Put(withTimeout, keyLiveness, 2), "Expected success writing to liveness range") + + // Write to the standard range on the default class. + require.ErrorIs(t, db.Put(withTimeout, keyA, 2), context.DeadlineExceeded, + "Expected timeout writing to key range") + + // Write to the liveness range on the System class with system disabled to + // ensure the test is actually working. + disabledSystem.Store(true) + require.ErrorIs(t, db.Put(withTimeout, keyLiveness, 2), + context.DeadlineExceeded, "Expected timeout writing to liveness range") + disabledSystem.Store(false) // Heal the partition, the previous proposal may now succeed but it may have // have been canceled. disabled.Store(false) // Overwrite with a new value and ensure that it propagates. - require.NoError(t, db.Put(ctx, keyA, 3)) + require.NoError(t, db.Put(ctx, keyA, 3), "Expected success after healed partition") tc.WaitForValues(t, keyA, []int64{3, 3, 3}) } t.Run("initial_run", runTest)