Skip to content

Commit

Permalink
kvserver: document/simplify flow control integration
Browse files Browse the repository at this point in the history
This commit documents the kvflowcontrol integration interfaces
introduced in earlier commits across flow_control_*.go, grouping
commentary and interfaces in a top-level flow_control_integration.go,
and makes minor simplifications where applicable. It's helpful to read
kvflowcontrol/{doc,kvflowcontrol}.go to understand the library
components in question, and also the comment block on
replicaFlowControlIntegration.

Here's how the various pieces fit together:

                                         ┌───────────────────┐
                                         │ Receiver (client) │
                                         ├───────────────────┴─────────────────────┬─┬─┐
                                      ┌──○ kvflowcontrolpb.AdmittedRaftLogEntries  │ │ │
                                      │  └─────────────────────────────────────────┴─┴─┘
                                      │  ┌───────────────────┐
                                      │  │ Receiver (client) │
                                      │  ├───────────────────┴─────────────────────┬─┬─┐
              ┌─────────────────────▶─┼──○ kvflowcontrolpb.AdmittedRaftLogEntries  │ │ │
              │                       │  └─────────────────────────────────────────┴─┴─┘
  ['1] gRPC streams                   │
       connecting/disconnecting  [1] RaftMessageBatch
              │                       │
              │  ┌─────────────────┐  │
              │  │ Sender (server) │  │
              │  ├─────────────────┴──│────────────────┐        ┌────────────────────────────────────────┐
              │  │ RaftTransport      │                │        │ StoresForFlowControl                   │
              │  │                    │                │        │                                        │
              │  │                    │                │        │  ┌───────────────────────────────────┐ │
              │  │                    └────[2] Lookup ─┼────────┼─▶│       kvflowcontrol.Handles       ○─┼──┐
              │  │                                     │        │  └───────────────────────────────────┘ │  │
              │  │ ┌─────────────────────────────────┐ │        │  ┌───────────────────────────────────┐ │  │
              └──┼▶│ connectionTrackerForFlowControl │ ├──['2]──┼─▶│ RaftTransportDisconnectedListener │ │  │
                 │ └─────────────────────────────────┘ │        │  └──────○────────────────────────────┘ │  │
                 └─────────────────▲───────────────────┘        └─────────┼──────────────────────────────┘  │
                                   │                                      │                                 │
                                   │                                  ['3] onRaftTransportDisconnected   [3] ReturnTokensUpto
                                   │                                      │                                 │
                                   │                                      │                                 │
                                   │       ┌──────────────────────────────┼─────────────────────────────────┼─────────┬─┬─┐
                                   │       │ replicaFlowControlIntegration│          ┌──────────────────────▼───────┐ │ │ │
                                   │       │                              │          │     kvflowcontrol.Handle     │ │ │ │
                                   │       │   onBecameLeader()           ▼          └───────────────────▲─▲────────┘ │ │ │
                                   │       │   onBecameFollower()           ○────['4] DisconnectStream ──┘ │          │ │ │
                                   │       │   onDescChanged()              ◀─── ["5] tryReconnect ──────┐ │          │ │ │
                                   │       │   onFollowersPaused()          ○─── ["7] ConnectStream  ────┼─┘          │ │ │
                                   │       │ = onRaftTransportDisconnected()         ┌───────────────────▼──────────┐ │ │ │
                                   │       │ = onRaftTicked()                        │ replicaForFlowControl        │ │ │ │
                                   │       │   onReplicaDestroyed()                  │                              │ │ │ │
                                   │       │                                         │   getDescriptor()            │ │ │ │
                    ["6] isConnectedTo     │                                         │   getPausedFollowers()       │ │ │ │
                                   │       │                                         │   getBehindFollowers()       │ │ │ │
                                   │       │                                         │   getInactiveFollowers()     │ │ │ │
                                   └───────┼─────────────────────────────────────────▶ = getDisconnectedFollowers() │ │ │ │
                                           │                                         └──────────────────────────────┘ │ │ │
                                           └──────────────────────────────────────────────────────────────────────────┴─┴─┘

The "server" and "client" demarcations refer to the server and client-side of
the RaftTransport stream. "Sender" and "Receiver" is kvflowcontrol verbiage,
referring to where proposals originate (and flow tokens deducted) and the
remote follower nodes where they're received. Below-raft admission happens
asynchronously on the receiver nodes, of which the sender is informed, which
in turn lets it release flow tokens and unblock further proposals.

Notation:
- Stacked boxes (with "  │││" on the right hand side) indicate that there are
  multiple of a kind. Like multiple replicaFlowControlIntegration
  implementations (one per locally held replica), multiple
  kvflowcontrolpb.AdmittedRaftLogEntries, etc.
- [<digit>], [<digit>'], and [<digit>"] denote independent sequences,
  explained in text below.

---

A. How are flow tokens returned after work is admitted below-raft on remote,
   receiver nodes?

[1]: When work gets admitted below-raft on the receiver, the sender (where
     work originated, and flow tokens were deducted) is informed of the fact
     through the RaftMessageBatch gRPC stream. There are two bi-directional
     raft transport streams between a pair of nodes. We piggyback
     kvflowcontrolpb.AdmittedRaftLogEntries on raft messages being sent from
     the RaftMessageBatch client to the RaftMessageBatch server.
[2]: We lookup the relevant kvflowcontrol.Handle from the set of
     kvflowcontrol.Handles, to inform it of below-raft admission.
[3]: We use the relevant kvflowcontrol.Handle (hanging off of some locally
     held replica) to return relevant previously deducted flow tokens.

The piggy-backing from [1] and the intercepting of piggy-backed messages and
kvflowcontrol.Handle lookup from [2] both happen in the RaftTransport layer,
in raft_transport.go. The set of local kvflowcontrol.Handles is exposed
through the StoresForFlowControl interface, backed by local stores and their
contained replicas. Each replica exposes the underlying handle through the
replicaFlowControlIntegration interface.

---

B. How do we react to raft transport streams breaking? (I1 from
   kvflowcontrol/doc.go)

['1]: The server-side of RaftMessageBatch observes every client-initiated
      stream breaking. The connectionTrackerForFlowControl, used within the
      RaftTransport layer, also monitors all live gRPC streams to understand
      exactly the set of clients we're connected to.
['2]: Whenever any raft transport gRPC stream breaks, we notify components of
      this fact through the RaftTransportDisconnectedListener interface.
['3]: This in turn informs all locally held replicas, through the
      replicaFlowControlIntegration interface.
['4]: We actively disconnect streams for replicas we just disconnected from
      as informed by the raft transport.

Note that we actually plumb down information about exactly which raft
transport streams broke. It's not enough to simply inform the various
replicaFlowControlIntegrations of some transport stream breaking, and for
them to then determine which streams to disconnect. This is because it's
possible for the streams to be re-established in the interim, or for there to
be another active stream from the same client but using a different RPC
class. We still want to free up all tokens for that replication stream, lest
we leak flow tokens in transit on the particular stream that broke.

---

C. What happens when the raft transport streams reconnect? (I1 from
   kvflowcontrol/doc.go)

["5]: The replicaFlowControlIntegration interface is used to periodically
      reconnect previously disconnected streams. This is driven primarily
      through the onRaftTicked() API, but also happens opportunistically
      through onFollowersPaused(), onRaftTransportDisconnected(), etc.
["6]: We check whether we're connected to remote replicas via the
      raftTransportForFlowControl.isConnectedTo(). This is powered by the
      connectionTrackerForFlowControl embedded in the RaftTransport which
      monitors all active gRPC streams as seen on the server-side.
["7]: If we're now connected to previously disconnected replicas, we inform
      the underlying kvflowcontrol.Handle in order to deduct flow tokens for
      subsequent proposals.

---

replicaFlowControlIntegration is used to integrate with replication flow
control. It intercepts various points in a replica's lifecycle, like it
acquiring raft leadership or losing it, or its raft membership changing, etc.
Accessing it requires Replica.mu to be held, exclusively (this is asserted on
in the canonical implementation). The "external" state is mediated by the
replicaForFlowControl interface. The state transitions look as follows:

      ─ ─ ─ ─ ─ ─ ─                                      ┌───── onDestroyed ──────────────────▶ ╳╳╳╳╳╳╳╳╳╳╳╳╳
     ─ ─ ─ ─ ─ ─ ┐ │                                     │ ┌─── onDescChanged(removed=self) ──▶ ╳ destroyed ╳
                     ┌──────── onBecameLeader ─────────┐ │ │                                    ╳╳╳╳╳╳╳╳╳╳╳╳╳
                 │ │ │                                 │ │ │
                 ○ ○ ○                                 ▼ ○ ○
             ┌ ─ ─ ─ ─ ─ ─ ─ ┐                 ┌──────────────┐
      ─ ─ ─ ○     follower                     │    leader    │ ○─────────────────────────────┐
             └ ─ ─ ─ ─ ─ ─ ─ ┘                 └──────────────┘                               │
                   ▲ ▲                                 ○ ▲       onDescChanged                │
                   │ │                                 │ │       onFollowersPaused            │
      ─ ─ ─ ─ ─ ─ ─  └──────── onBecameFollower ───────┘ └────── onRaftTransportDisconnected ─┘
                                                                 onRaftTicked

We're primarily interested in transitions to/from the leader state -- the
equivalent transitions from the follower state are no-ops.

  - onBecameLeader is when the replica acquires raft leadership. At this
    point we initialize the underlying kvflowcontrol.Handle and other
    internal tracking state to handle subsequent transitions.

  - onBecameFollower is when the replica loses raft leadership. We close the
    underlying kvflowcontrol.Handle and clear other tracking state.

  - onDescChanged is when the range descriptor changes. We react to changes
    by disconnecting streams for replicas no longer part of the range,
    connecting streams for newly members of the range, closing the underlying
    kvflowcontrol.Handle + clearing tracking state if we ourselves are no
    longer part of the range.

  - onFollowersPaused is when the set of paused followers have changed. We
    react to it by disconnecting streams for newly paused followers, or
    reconnecting to newly unpaused ones.

  - onRaftTransportDisconnected is when we're no longer connected to some
    replicas via the raft transport. We react to it by disconnecting relevant
    streams.

  - onRaftTicked is invoked periodically, and refreshes the set of streams
    we're connected to. It disconnects streams to inactive followers and/or
    reconnects to now-active followers. It also observes raft progress state
    for individual replicas, disconnecting from ones we're not actively
    replicating to (because they're too far behind on their raft log, in need
    of snapshots, or because we're unaware of their committed log indexes).
    It also reconnects streams if the raft progress changes.

  - onDestroyed is when the replica is destroyed. Like onBecameFollower, we
    close the underlying kvflowcontrol.Handle and clear other tracking state.

Release note: None
  • Loading branch information
irfansharif committed Jun 9, 2023
1 parent 3da5c73 commit 87d6547
Show file tree
Hide file tree
Showing 28 changed files with 824 additions and 420 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"consistency_queue.go",
"debug_print.go",
"doc.go",
"flow_control_integration.go",
"flow_control_raft_transport.go",
"flow_control_replica.go",
"flow_control_replica_integration.go",
Expand Down
271 changes: 271 additions & 0 deletions pkg/kv/kvserver/flow_control_integration.go

Large diffs are not rendered by default.

104 changes: 78 additions & 26 deletions pkg/kv/kvserver/flow_control_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ import (
// tracker=1,client_raft_helpers_test=1'
//
// TODO(irfansharif): Add end-to-end tests for the following:
// - [ ] Node with full RaftTransport receive queue.
// - [ ] Node with full RaftTransport send queue, with proposals dropped.
// - [ ] Node with full RaftTransport receive queue (I8).
// - [ ] Node with full RaftTransport send queue, with proposals dropped (I8).
// - [ ] Raft commands getting reproposed, either due to timeouts or not having
// the right MLAI. See TestReplicaRefreshPendingCommandsTicks,
// TestLogGrowthWhenRefreshingPendingCommands.
// TestLogGrowthWhenRefreshingPendingCommands. I7.
// - [ ] Raft proposals getting dropped/abandoned. See
// (*Replica).cleanupFailedProposalLocked and its uses.

Expand Down Expand Up @@ -133,9 +133,9 @@ ORDER BY name ASC;
`)

h.comment(`-- (Issuing + admitting a regular 1MiB, triply replicated write...)`)
t.Log("sending put request")
h.log("sending put request")
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)
t.Log("sent put request")
h.log("sent put request")

h.waitForAllTokensReturned(ctx, 3)
h.comment(`
Expand Down Expand Up @@ -277,9 +277,9 @@ func TestFlowControlRangeSplitMerge(t *testing.T) {
require.NoError(t, err)

h.waitForConnectedStreams(ctx, desc.RangeID, 3)
t.Log("sending put request to pre-split range")
h.log("sending put request to pre-split range")
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)
t.Log("sent put request to pre-split range")
h.log("sent put request to pre-split range")

h.waitForAllTokensReturned(ctx, 3)
h.comment(`
Expand All @@ -302,13 +302,13 @@ ORDER BY name ASC;
// [T1,n1,s1,r64/1:/{Table/Max-Max},raft] connected to stream: t1/s2
// [T1,n1,s1,r64/1:/{Table/Max-Max},raft] connected to stream: t1/s3

t.Log("sending 2MiB put request to post-split LHS")
h.log("sending 2MiB put request to post-split LHS")
h.put(ctx, k, 2<<20 /* 2MiB */, admissionpb.NormalPri)
t.Log("sent 2MiB put request to post-split LHS")
h.log("sent 2MiB put request to post-split LHS")

t.Log("sending 3MiB put request to post-split RHS")
h.log("sending 3MiB put request to post-split RHS")
h.put(ctx, roachpb.Key(right.StartKey), 3<<20 /* 3MiB */, admissionpb.NormalPri)
t.Log("sent 3MiB put request to post-split RHS")
h.log("sent 3MiB put request to post-split RHS")

h.waitForAllTokensReturned(ctx, 3)
h.comment(`
Expand Down Expand Up @@ -343,9 +343,9 @@ ORDER BY streams DESC;
// [T1,n1,s1,r65/1:{\xfa\x00-/Max},raft] disconnected stream: t1/s2
// [T1,n1,s1,r65/1:{\xfa\x00-/Max},raft] disconnected stream: t1/s3

t.Log("sending 4MiB put request to post-merge range")
h.log("sending 4MiB put request to post-merge range")
h.put(ctx, roachpb.Key(merged.StartKey), 4<<20 /* 4MiB */, admissionpb.NormalPri)
t.Log("sent 4MiB put request to post-merged range")
h.log("sent 4MiB put request to post-merged range")

h.waitForAllTokensReturned(ctx, 3)
h.comment(`
Expand Down Expand Up @@ -415,11 +415,11 @@ func TestFlowControlBlockedAdmission(t *testing.T) {
h.waitForConnectedStreams(ctx, desc.RangeID, 3)

h.comment(`-- (Issuing regular 1MiB, 3x replicated write that's not admitted.)`)
t.Log("sending put requests")
h.log("sending put requests")
for i := 0; i < 5; i++ {
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)
}
t.Log("sent put requests")
h.log("sent put requests")

h.comment(`
-- Flow token metrics from n1 after issuing 5 regular 1MiB 3x replicated writes
Expand Down Expand Up @@ -488,13 +488,25 @@ func TestFlowControlCrashedNode(t *testing.T) {
const numNodes = 2
var maintainStreamsForBrokenRaftTransport atomic.Bool

st := cluster.MakeTestingClusterSettings()
// See I13 from kvflowcontrol/doc.go. We disable the raft-transport-break
// mechanism below, and for quiesced ranges, that can effectively disable
// the last-updated mechanism since quiesced ranges aren't being ticked, and
// we only check the last-updated state when ticked. So we disable range
// quiescence.
kvserver.ExpirationLeasesOnly.Override(ctx, &st.SV, true)

tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Settings: st,
RaftConfig: base.RaftConfig{
// Suppress timeout-based elections. This test doesn't want to
// deal with leadership changing hands.
RaftElectionTimeoutTicks: 1000000,
// Reduce the RangeLeaseDuration to speeds up failure detection
// below.
RangeLeaseDuration: time.Second,
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
Expand Down Expand Up @@ -531,11 +543,11 @@ func TestFlowControlCrashedNode(t *testing.T) {
h.waitForConnectedStreams(ctx, desc.RangeID, 2)

h.comment(`-- (Issuing regular 5x1MiB, 2x replicated writes that are not admitted.)`)
t.Log("sending put requests")
h.log("sending put requests")
for i := 0; i < 5; i++ {
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)
}
t.Log("sent put requests")
h.log("sent put requests")

h.comment(`
-- Flow token metrics from n1 after issuing 5 regular 1MiB 2x replicated writes
Expand Down Expand Up @@ -598,6 +610,7 @@ func TestFlowControlRaftSnapshot(t *testing.T) {

const numServers int = 5
stickyServerArgs := make(map[int]base.TestServerArgs)
var maintainStreamsForBehindFollowers atomic.Bool
var maintainStreamsForInactiveFollowers atomic.Bool
var maintainStreamsForBrokenRaftTransport atomic.Bool
var disableWorkQueueGranting atomic.Bool
Expand Down Expand Up @@ -629,6 +642,9 @@ func TestFlowControlRaftSnapshot(t *testing.T) {
// deductions/returns.
return kvflowcontrol.Tokens(1 << 20 /* 1MiB */)
},
MaintainStreamsForBehindFollowers: func() bool {
return maintainStreamsForBehindFollowers.Load()
},
MaintainStreamsForInactiveFollowers: func() bool {
return maintainStreamsForInactiveFollowers.Load()
},
Expand Down Expand Up @@ -728,6 +744,11 @@ ORDER BY name ASC;
maintainStreamsForInactiveFollowers.Store(true)
maintainStreamsForBrokenRaftTransport.Store(true)

// Depending on when the raft group gets ticked, we might notice than
// replicas on n2 and n3 are behind a bit too soon. Disable it first, and
// re-enable it right when this test wants to react to raft progress state.
maintainStreamsForBehindFollowers.Store(true)

// Now kill stores 1 + 2, increment the key on the other stores and
// truncate their logs to make sure that when store 1 + 2 comes back up
// they will require a snapshot from Raft.
Expand Down Expand Up @@ -795,6 +816,9 @@ ORDER BY name ASC;
t.Fatal(err)
}

// Allow the flow control integration layer to react to raft progress state.
maintainStreamsForBehindFollowers.Store(false)

h.comment(`-- (Restarting n2 and n3.)`)
require.NoError(t, tc.RestartServer(1))
require.NoError(t, tc.RestartServer(2))
Expand Down Expand Up @@ -832,6 +856,7 @@ ORDER BY name ASC;
h.query(n1, `
SELECT range_id, store_id, crdb_internal.humanize_bytes(total_tracked_tokens::INT8)
FROM crdb_internal.kv_flow_control_handles
WHERE total_tracked_tokens > 0
`, "range_id", "store_id", "total_tracked_tokens")

h.waitForConnectedStreams(ctx, repl.RangeID, 5)
Expand Down Expand Up @@ -933,11 +958,11 @@ func TestFlowControlRaftTransportBreak(t *testing.T) {
h.waitForConnectedStreams(ctx, desc.RangeID, 3)

h.comment(`-- (Issuing regular 5x1MiB, 3x replicated writes that are not admitted.)`)
t.Log("sending put requests")
h.log("sending put requests")
for i := 0; i < 5; i++ {
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)
}
t.Log("sent put requests")
h.log("sent put requests")

h.comment(`
-- Flow token metrics from n1 after issuing 5 regular 1MiB 3x replicated writes
Expand Down Expand Up @@ -1003,6 +1028,7 @@ func TestFlowControlRaftTransportCulled(t *testing.T) {
const numNodes = 3
workerTeardownCh := make(chan roachpb.NodeID, 1)
markSendQueueAsIdleCh := make(chan roachpb.NodeID)
var disableWorkerTeardown atomic.Bool

baseServerArgs := base.TestServerArgs{
Knobs: base.TestingKnobs{
Expand Down Expand Up @@ -1031,6 +1057,9 @@ func TestFlowControlRaftTransportCulled(t *testing.T) {
baseServerArgsWithRaftTransportKnobs.Knobs.RaftTransport = &kvserver.RaftTransportTestingKnobs{
MarkSendQueueAsIdleCh: markSendQueueAsIdleCh,
OnWorkerTeardown: func(nodeID roachpb.NodeID) {
if disableWorkerTeardown.Load() {
return
}
workerTeardownCh <- nodeID
},
}
Expand Down Expand Up @@ -1061,11 +1090,11 @@ func TestFlowControlRaftTransportCulled(t *testing.T) {
h.waitForConnectedStreams(ctx, desc.RangeID, 3)

h.comment(`-- (Issuing regular 5x1MiB, 3x replicated writes that are not admitted.)`)
t.Log("sending put requests")
h.log("sending put requests")
for i := 0; i < 5; i++ {
h.put(ctx, k, 1<<20 /* 1MiB */, admissionpb.NormalPri)
}
t.Log("sent put requests")
h.log("sent put requests")

h.comment(`
-- Flow token metrics from n1 after issuing 5 regular 1MiB 3x replicated writes
Expand Down Expand Up @@ -1124,6 +1153,8 @@ ORDER BY name ASC;
WHERE name LIKE '%kvadmission%tokens%'
ORDER BY name ASC;
`)

disableWorkerTeardown.Store(true)
}

// TestFlowControlRaftMembership tests flow token behavior when the raft
Expand Down Expand Up @@ -1269,9 +1300,24 @@ func TestFlowControlRaftMembershipRemoveSelf(t *testing.T) {
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
RaftConfig: base.RaftConfig{
// Suppress timeout-based elections. This test doesn't want to
// deal with leadership changing hands.
RaftElectionTimeoutTicks: 1000000,
// TODO(irfansharif): The AdminRelocateRange used below can
// occasionally flake if we suppress timeout-based
// elections. We get logging of the following form:
//
// I230507 19:47:03.143463 31 kv/kvserver_test/flow_control_integration_test.go:2065 [-] 349 -- (Replacing current raft leader on n1 in raft group with new n4 replica.)
// I230507 19:47:03.153105 5430 kv/kvserver/replica_raftstorage.go:514 [T1,n4,s4,r64/4:/{Table/Max-Max}] 352 applied INITIAL snapshot b8cdcb09 from (n1,s1):1 at applied index 23 (total=1ms data=1.0 MiB ingestion=6@1ms)
// I230507 19:47:03.167504 629 kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go:249 [T1,n1,s1,r64/1:/{Table/Max-Max},raft] 353 connected to stream: t1/s4
// W230507 19:47:03.186303 4268 go.etcd.io/raft/v3/raft.go:924 [T1,n4,s4,r64/4:/{Table/Max-Max}] 354 4 cannot campaign at term 6 since there are still 1 pending configuration changes to apply
// ...
// W230507 19:47:18.194829 5507 kv/kvserver/spanlatch/manager.go:559 [T1,n4,s4,r64/4:/{Table/Max-Max}] 378 have been waiting 15s to acquire read latch /Local/Range/Table/Max/RangeDescriptor@0,0, held by write latch /Local/Range/Table/Max/RangeDescriptor@0,0
// W230507 19:47:19.082183 5891 kv/kvserver/spanlatch/manager.go:559 [T1,n4,s4,r64/4:/{Table/Max-Max}] 379 have been waiting 15s to acquire read latch /Local/Range/Table/Max/RangeDescriptor@0,0, held by write latch /Local/Range/Table/Max/RangeDescriptor@0,0
//
// Followed by range unavailability. Are we relying on the
// new leader to be able to campaign immediately, in order
// to release the latch? And we're simultaneously preventing
// other replicas from calling elections?
//
// RaftElectionTimeoutTicks: 1000000,
},
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
Expand Down Expand Up @@ -2079,12 +2125,18 @@ func (h *flowControlTestHelper) comment(comment string) {

comment = strings.TrimSpace(comment)
h.buf.WriteString(fmt.Sprintf("%s\n", comment))
h.t.Log(comment)
h.log(comment)
}

func (h *flowControlTestHelper) log(msg string) {
if log.ShowLogs() {
log.Infof(context.Background(), "%s", msg)
}
}

func (h *flowControlTestHelper) query(runner *sqlutils.SQLRunner, sql string, headers ...string) {
sql = strings.TrimSpace(sql)
h.t.Log(sql)
h.log(sql)
h.buf.WriteString(fmt.Sprintf("%s\n\n", sql))

rows := runner.Query(h.t, sql)
Expand Down
16 changes: 0 additions & 16 deletions pkg/kv/kvserver/flow_control_raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/rpc"
)

// raftTransportForFlowControl abstracts the node-level raft transport, and is
// used by the canonical replicaFlowControlIntegration implementation. It
// exposes the set of (remote) stores the raft transport is connected to. If the
// underlying gRPC streams break and don't reconnect, this indicates as much.
// Ditto if they're reconnected to. Also see RaftTransportDisconnectListener,
// which is used to observe every instance of gRPC streams breaking.
type raftTransportForFlowControl interface {
isConnectedTo(storeID roachpb.StoreID) bool
}

var _ raftTransportForFlowControl = &RaftTransport{}

// isConnectedTo implements the raftTransportForFlowControl interface.
Expand All @@ -39,12 +29,6 @@ func (r *RaftTransport) isConnectedTo(storeID roachpb.StoreID) bool {
return r.kvflowControl.mu.connectionTracker.isStoreConnected(storeID)
}

// RaftTransportDisconnectListener observes every instance of the raft
// transport disconnecting replication traffic to the given (remote) stores.
type RaftTransportDisconnectListener interface {
OnRaftTransportDisconnected(context.Context, ...roachpb.StoreID)
}

// connectionTrackerForFlowControl tracks the set of client-side stores and
// server-side nodes the raft transport is connected to. The "client" and
// "server" refer to the client and server side of the RaftTransport stream (see
Expand Down
Loading

0 comments on commit 87d6547

Please sign in to comment.