Skip to content

Commit

Permalink
kvserver: add TestFlowControlSendQueueRangeRelocate test
Browse files Browse the repository at this point in the history
Add a flow control integration test
`TestFlowControlSendQueueRangeRelocate`. The test has 6 variations,
either transferring or not transferring the lease on-top:

```
We use three relocate variations (*=lh,^=send_queue):
- [n1*,n2 ,n3^,n4 ,n5] -> [n2 ,n3^,n4 ,n5 ,n6*] (transfer_lease)
  - The leader and leaseholder is relocated.
- [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n4 ,n5 ,n6 ]
  - The replica with a send queue is relocated.
- [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n3^,n4 ,n6 ]
  - The replica without a send queue is relocated.
```

Part of: cockroachdb#132614
Release note: None
  • Loading branch information
kvoli committed Nov 26, 2024
1 parent e8c843d commit 07ce9fd
Show file tree
Hide file tree
Showing 7 changed files with 1,210 additions and 14 deletions.
295 changes: 281 additions & 14 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4536,13 +4536,6 @@ func TestFlowControlSendQueue(t *testing.T) {
}
}

mkStream := func(serverIdx int) kvflowcontrol.Stream {
return kvflowcontrol.Stream{
StoreID: roachpb.StoreID(serverIdx + 1),
TenantID: roachpb.SystemTenantID,
}
}

settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
Expand Down Expand Up @@ -4703,7 +4696,7 @@ func TestFlowControlSendQueue(t *testing.T) {
setTokenReturnEnabled(true /* enabled */, 0, 1)
// Wait for token return on n1, n2. We should only be tracking the tokens for
// n3 now.
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4MiB */, 0 /* serverIdx */)
h.comment(`-- Observe the total tracked tokens per-stream on n1.`)
h.query(n1, `
Expand All @@ -4723,7 +4716,7 @@ func TestFlowControlSendQueue(t *testing.T) {
// NB: The write won't be tracked because the quorum [n1,n2] have tokens for
// eval.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 5<<20 /* 5 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* 1MiB expSize */, 0 /* serverIdx */)
h.comment(`
-- The send queue metrics from n1 should reflect the 1 MiB write being queued
Expand All @@ -4739,7 +4732,7 @@ func TestFlowControlSendQueue(t *testing.T) {
h.waitForConnectedStreams(ctx, desc.RangeID, 2, 0 /* serverIdx */)
// There should also be 5 MiB of tracked tokens for n1->n3, 4 + 1 MiB.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 5<<20 /* 5 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))
h.waitForSendQueueSize(ctx, desc.RangeID, 0 /* expSize */, 0 /* serverIdx */)
h.comment(`
-- Flow token metrics from n1, the disconnect should be reflected in the metrics.`)
Expand Down Expand Up @@ -4767,7 +4760,7 @@ func TestFlowControlSendQueue(t *testing.T) {
h.comment(`-- (Disabling wait-for-eval bypass.)`)
noopWaitForEval.Store(false)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 6<<20 /* 6 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1))

h.comment(`
-- Send queue metrics from n1, n3's should not be allowed to form a send queue.`)
Expand Down Expand Up @@ -4827,7 +4820,7 @@ ORDER BY streams DESC;
// admission is allowed. While n4,n5 will continue to track as they are
// blocked from admitting.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 8<<20 /* 8 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1), mkStream(2))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1), testingMkFlowStream(2))
h.comment(`
-- From n1. We should expect to see the unblocked streams quickly
-- untrack as admission is allowed (so not observed here), while n4,n5 will continue
Expand All @@ -4845,7 +4838,7 @@ ORDER BY streams DESC;
// quickly admits and untracks. While n4,n5 queue the write, not sending the
// msg, deducting and tracking the entry tokens.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 8<<20 /* 8 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0), mkStream(1), mkStream(2))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0), testingMkFlowStream(1), testingMkFlowStream(2))
h.comment(`
-- Send queue and flow token metrics from n1. The 1 MiB write should be queued
-- for n4,n5, while the quorum (n1,n2,n3) proceeds.
Expand Down Expand Up @@ -4891,7 +4884,7 @@ ORDER BY streams DESC;
// Expect 4 x 4 MiB tracked tokens for the 4 MiB write = 16 MiB.
// Expect 2 x 1 MiB tracked tokens for the 1 MiB write = 2 MiB.
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 18<<20 /* 18MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, mkStream(0))
h.waitForAllTokensReturnedForStreamsV2(ctx, 0 /* serverIdx */, testingMkFlowStream(0))
h.comment(`
-- Observe the total tracked tokens per-stream on n1. We should expect to see the
-- 1 MiB write being tracked across a quorum of streams, while the 4 MiB write
Expand Down Expand Up @@ -5150,6 +5143,280 @@ func TestFlowControlSendQueueManyInflight(t *testing.T) {
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
}

func testingMkFlowStream(serverIdx int) kvflowcontrol.Stream {
return kvflowcontrol.Stream{
StoreID: roachpb.StoreID(serverIdx + 1),
TenantID: roachpb.SystemTenantID,
}
}

// TestFlowControlSendQueueRangeRelocate exercises the send queue formation,
// prevention and flushing via selective (logical) admission of entries and token
// return. It also exercises the behavior of the send queue when a range is
// relocated. See the initial comment for an overview of the test structure.
func TestFlowControlSendQueueRangeRelocate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// We use three relocate variations (*=lh,^=send_queue):
// - [n1*,n2 ,n3^,n4 ,n5] -> [n2 ,n3^,n4 ,n5 ,n6*] (transfer_lease)
// - The leader and leaseholder is relocated.
// - [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n4 ,n5 ,n6 ]
// - The replica with a send queue is relocated.
// - [n1*,n2 ,n3^,n4 ,n5] -> [n1*,n2 ,n3^,n4 ,n6 ]
// - The replica without a send queue is relocated.
testutils.RunValues(t, "from", []int{0, 2, 4}, func(t *testing.T, fromIdx int) {
testutils.RunTrueAndFalse(t, "transfer_lease", func(t *testing.T, transferLease bool) {
const numNodes = 6
// The transferLease arg indicates whether the AdminRelocateRange request
// will also transfer the lease to the voter which is in the first
// position of the target list. We always place n6 in the first position
// and pass in the transferLease arg to AdminRelocateRange.
fromNode := roachpb.NodeID(fromIdx + 1)
toNode := roachpb.NodeID(numNodes)
fromServerIdxs := []int{0, 1, 2, 3, 4}
toServerIdxs := []int{numNodes - 1}
for i := 0; i < numNodes-1; i++ {
if i != fromIdx {
toServerIdxs = append(toServerIdxs, i)
}
}
var fromString string
if fromIdx == 0 {
fromString = "leader_store"
} else if fromIdx == 2 {
fromString = "send_queue_store"
} else {
fromString = "has_token_store"
}
if transferLease {
fromString += "_transfer_lease"
}
// If n1 is removing itself from the range, the leaseholder will be
// transferred to n6 regardless of the value of transferLease.
newLeaseholderIdx := 0
if transferLease || fromIdx == 0 {
newLeaseholderIdx = 5
}
newLeaseNode := roachpb.NodeID(newLeaseholderIdx + 1)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettings()
kvflowcontrol.Mode.Override(ctx, &settings.SV, kvflowcontrol.ApplyToAll)
// We want to exhaust tokens but not overload the test, so we set the limits
// lower (8 and 16 MiB default).
kvflowcontrol.ElasticTokensPerStream.Override(ctx, &settings.SV, 2<<20)
kvflowcontrol.RegularTokensPerStream.Override(ctx, &settings.SV, 4<<20)

disableWorkQueueGrantingServers := make([]atomic.Bool, numNodes)
setTokenReturnEnabled := func(enabled bool, serverIdxs ...int) {
for _, serverIdx := range serverIdxs {
disableWorkQueueGrantingServers[serverIdx].Store(!enabled)
}
}

argsPerServer := make(map[int]base.TestServerArgs)
for i := range disableWorkQueueGrantingServers {
disableWorkQueueGrantingServers[i].Store(true)
argsPerServer[i] = base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
FlowControlTestingKnobs: &kvflowcontrol.TestingKnobs{
UseOnlyForScratchRanges: true,
OverrideTokenDeduction: func(tokens kvflowcontrol.Tokens) kvflowcontrol.Tokens {
// Deduct every write as 1 MiB, regardless of how large it
// actually is.
return kvflowcontrol.Tokens(1 << 20)
},
// We want to test the behavior of the send queue, so we want to
// always have up-to-date stats. This ensures that the send queue
// stats are always refreshed on each call to
// RangeController.HandleRaftEventRaftMuLocked.
OverrideAlwaysRefreshSendStreamStats: true,
},
},
AdmissionControl: &admission.TestingKnobs{
DisableWorkQueueFastPath: true,
DisableWorkQueueGranting: func() bool {
idx := i
return disableWorkQueueGrantingServers[idx].Load()
},
},
},
}
}

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: argsPerServer,
})
defer tc.Stopper().Stop(ctx)

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Targets(1, 2, 3, 4)...)

h := newFlowControlTestHelper(
t, tc, "flow_control_integration_v2", /* testdata */
kvflowcontrol.V2EnabledWhenLeaderV2Encoding, true, /* isStatic */
)
h.init(kvflowcontrol.ApplyToAll)
defer h.close(fmt.Sprintf("send_queue_range_relocate_from_%s", fromString))

desc, err := tc.LookupRange(k)
require.NoError(t, err)
h.enableVerboseRaftMsgLoggingForRange(desc.RangeID)
n1 := sqlutils.MakeSQLRunner(tc.ServerConn(0))
newLeaseDB := sqlutils.MakeSQLRunner(tc.ServerConn(newLeaseholderIdx))
h.waitForConnectedStreams(ctx, desc.RangeID, 5, 0 /* serverIdx */)
h.resetV2TokenMetrics(ctx)
h.waitForConnectedStreams(ctx, desc.RangeID, 5, 0 /* serverIdx */)

// Block admission on n3, while allowing every other node to admit.
setTokenReturnEnabled(true /* enabled */, 0, 1, 3, 4, 5)
setTokenReturnEnabled(false /* enabled */, 2)
// Drain the tokens to n3 by blocking admission and issuing the buffer
// size of writes to the range.
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)

h.comment(`(Sending 1 MiB put request to develop a send queue)`)
h.put(contextWithTestGeneratedPut(ctx), roachpb.Key(desc.StartKey), 1, admissionpb.NormalPri)
h.comment(`(Sent 1 MiB put request)`)
h.waitForTotalTrackedTokens(ctx, desc.RangeID, 4<<20 /* 4 MiB */, 0 /* serverIdx */)
h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1),
testingMkFlowStream(3), testingMkFlowStream(4))
h.waitForSendQueueSize(ctx, desc.RangeID, 1<<20 /* expSize 1 MiB */, 0 /* serverIdx */)

h.comment(`
-- Send queue metrics from n1, n3's send queue should have 1 MiB for s3.`)
h.query(n1, flowSendQueueQueryStr)
h.comment(`
-- Observe the total tracked tokens per-stream on n1, s3's entries will still
-- be tracked here.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")
h.comment(`
-- Per-store tokens available from n1, these should reflect the lack of tokens
-- for s3.`)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)

beforeString := fmt.Sprintf("%v", fromServerIdxs)
afterString := fmt.Sprintf("%v", toServerIdxs)

h.comment(fmt.Sprintf(`
-- Issuing RelocateRange:
-- before=%s
-- after =%s
-- Transferring the lease: %v.`, beforeString, afterString, transferLease))
testutils.SucceedsSoon(t, func() error {
if err := tc.Servers[2].DB().
AdminRelocateRange(
context.Background(),
desc.StartKey.AsRawKey(),
tc.Targets(toServerIdxs...),
nil, /* nonVoterTargets */
transferLease, /* transferLeaseToFirstVoter */
); err != nil {
return err
}
var err error
desc, err = tc.LookupRange(k)
if err != nil {
return err
}
rset := desc.Replicas()
if fullDescs := rset.VoterFullAndNonVoterDescriptors(); len(fullDescs) != 5 {
return errors.Errorf(
"expected 5 voters, got %v (replica_set=%v)", fullDescs, rset)
}
if rset.HasReplicaOnNode(fromNode) {
return errors.Errorf(
"expected no replica on node %v (replica_set=%v)", fromNode, rset)
}
if !rset.HasReplicaOnNode(toNode) {
return errors.Errorf(
"expected replica on node 6 (replica_set=%v)", rset)
}
leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil)
if err != nil {
return err
}
expLeaseTarget := tc.Target(newLeaseholderIdx)
if !leaseHolder.Equal(expLeaseTarget) {
return errors.Errorf(
"expected leaseholder to be on %v found %v (replica_set=%v)",
expLeaseTarget, leaseHolder, rset)
}
return nil
})

h.waitForConnectedStreams(ctx, desc.RangeID, 5, newLeaseholderIdx)
h.comment(`(Sending 1 MiB put request to the relocated range)`)
h.put(contextWithTestGeneratedPut(ctx), k, 1, admissionpb.NormalPri, newLeaseholderIdx)
h.comment(`(Sent 1 MiB put request to the relocated range)`)

h.waitForAllTokensReturnedForStreamsV2(ctx, 0, /* serverIdx */
testingMkFlowStream(0), testingMkFlowStream(1),
testingMkFlowStream(3), testingMkFlowStream(4))

toStreams := make([]kvflowcontrol.Stream, 0, len(toServerIdxs))
for _, toServerIdx := range toServerIdxs {
if toServerIdx != 2 /* send queue server */ {
toStreams = append(toStreams, testingMkFlowStream(toServerIdx))
}
}
h.waitForAllTokensReturnedForStreamsV2(ctx, newLeaseholderIdx, toStreams...)

h.comment(`-- Observe the total tracked tokens per-stream on n1.`)
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

if newLeaseholderIdx != 0 {
// Avoid double printing if the lease hasn't moved.
h.comment(fmt.Sprintf(`
-- Observe the total tracked tokens per-stream on new leaseholder n%v.`, newLeaseNode))
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles_v2
`, "range_id", "store_id", "total_tracked_tokens")

}

// Allow admission to proceed on n3 and wait for all tokens to be returned.
h.comment(`-- (Allowing below-raft admission to proceed on n3.)`)
setTokenReturnEnabled(true /* enabled */, 2)
h.waitForAllTokensReturned(ctx, 6 /* expStreamCount */, 0 /* serverIdx */)
if transferLease && fromIdx != 0 {
// When the lease is transferred first, the leaseholder is relocated to
// n6 after the fromNode is removed. In this case, we expect the
// leaseholder will have only 5 streams, because it will have never
// seen s3's stream as its replica was already removed from the range.
h.waitForAllTokensReturned(ctx, 5 /* expStreamCount */, newLeaseholderIdx)
} else {
h.waitForAllTokensReturned(ctx, 6 /* expStreamCount */, newLeaseholderIdx)
}

h.comment(`
-- Send queue and flow token metrics from n1. All tokens should be returned.`)
h.query(n1, flowSendQueueQueryStr)
h.query(n1, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
h.comment(fmt.Sprintf(`
-- Send queue and flow token metrics from leaseholder n%v.
-- All tokens should be returned.`, newLeaseNode))
h.query(newLeaseDB, flowSendQueueQueryStr)
h.query(newLeaseDB, flowPerStoreTokenQueryStr, flowPerStoreTokenQueryHeaderStrs...)
})
})
}

type flowControlTestHelper struct {
t testing.TB
tc *testcluster.TestCluster
Expand Down
Loading

0 comments on commit 07ce9fd

Please sign in to comment.