Skip to content

Commit

Permalink
kvserver: setup flow token returns using the RaftTransport
Browse files Browse the repository at this point in the history
This commit integrates the kvflowcontrol.Dispatch with the
kvserver-level RaftTransport. When log entries are admitted below raft,
we'll want to inform the origin nodes of this fact, effectively
returning the flow tokens that were deducted when replicating the log
entry to us. We repurpose the existing RaftTransport for this
communication -- we piggyback these flow token returns[^1] on raft
messages already bound to nodes we're returning tokens to. We also
guarantee delivery of tokens in the presence of idle RaftTransport
connections[^2].

We had to to introduce some protocol changes here. When a client
establishes a RaftMessageRequestBatch stream, it sends along to the
server the set of all StoreIDs it has. It's populated on the first
RaftMessageRequestBatch sent along MultiRaft.RaftMessageBatch gRPC
stream identifying at least one store, and then populated once more if
any additional stores have been initialized[^3]. This data is used by
the kvflowcontrol machinery to track the exact set of stores on the
client node. It uses this information to react to the gRPC streams
breaking. Since these streams are used to piggy information about which
log entries were admitted below raft[^4] in order for the server-side to
free up flow tokens, if the stream breaks we possibly risk leaking these
tokens. So when these streams break, we use information about the
client's stores to release all held tokens[^5].

We're not using this code just yet, which is just the below-raft
integration with kvflowcontrol. The subsequent commit will introduce the
above-raft integration where we'll actually deduct flow tokens at the
sender, encode proposals using EntryEncoding{Standard,Sideloaded}WithAC,
which in turn enqueues virtual work items in below-raft admission queues
for asynchronous admission. Once asynchronously admitted, using the
changes in this commit, we'll return flow tokens using the now-wired-up
kvflowcontrol.Dispatch interface.

---

Suggested reading order for reviewers:

- (*RaftTransport).kvflowControl
  Brief comment block which tries to give a lay of the land.

- flow_control_stores.go
  Integration interface+implementation that's going to be used by the
  RaftTransport to return flow tokens to the specific locally held
  kvflowcontrol.Handles, after learning about admitted raft log entries
  from remote nodes. It's implemented more fully in the subsequent commit.

- flow_control_raft_transport.go
  Contains the set of new dependencies now used in the RaftTransport
  code for flow token purposes. It also includes the interfaces that
  show how the RaftTransport informs individual replicas that its no
  longer connected to specific (remote) stores. They're used more fully
  in the subsequent commit.

- raft_transport.go
  The actual code changes to the RaftTransport.

- flow_token_transport_test.go and flow_token_transport/*
  Datadriven test to understand how the various pieces fit together.

- kvflowdispatch/*
  Adds some metrics and unit testing for the canonical
  kvflowcontrol.Dispatch implementation (previously implemented).

---

[^1]: In the form of kvflowcontrolpb.AdmittedRaftLogEntries.
[^2]: See kvserver.TestFlowTokenTransport.
[^3]: This two-step process is because of how and when we allocate
      StoreIDs. Ignoring nodes that are bootstrapping the cluster (which
      just picks the initial set of StoreIDs -- see
      pkg/server.bootstrapCluster), whenever a new node is added, it's
      assigned a node ID and store ID by an existing node in CRDB (see
      kvpb.JoinNodeResponse). Subsequent store IDs, for multi-store
      nodes, are generated by the joining node by incrementing a
      sequence ID generator (see
      pkg/server.(*Node).initializeAdditionalStores). All of which is to
      say that the very first time we issue a RaftMessageRequestBatch,
      we might not have all the StoreIDs. But we will very shortly
      after, and certainly before any replicas get allocated to the
      additional store.
[^4]: See kvflowcontrolpb.AdmittedRaftLogEntries and its use in
      RaftMessageRequest.
[^5]: See I1 from kvflowcontrol/doc.go.

Release note: None
  • Loading branch information
irfansharif committed Jun 6, 2023
1 parent e1253c3 commit 4241f6b
Show file tree
Hide file tree
Showing 37 changed files with 2,230 additions and 42 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ go_library(
"consistency_queue.go",
"debug_print.go",
"doc.go",
"flow_control_raft_transport.go",
"flow_control_stores.go",
"lease_history.go",
"markers.go",
"merge_queue.go",
Expand Down Expand Up @@ -136,6 +138,9 @@ go_library(
"//pkg/kv/kvserver/idalloc",
"//pkg/kv/kvserver/intentresolver",
"//pkg/kv/kvserver/kvadmission",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/kvstorage",
Expand Down Expand Up @@ -264,6 +269,7 @@ go_test(
"consistency_queue_test.go",
"debug_print_test.go",
"errors_test.go",
"flow_control_raft_transport_test.go",
"gossip_test.go",
"helpers_test.go",
"intent_resolver_integration_test.go",
Expand Down Expand Up @@ -378,6 +384,10 @@ go_test(
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/gc",
"//pkg/kv/kvserver/intentresolver",
"//pkg/kv/kvserver/kvadmission",
"//pkg/kv/kvserver/kvflowcontrol",
"//pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb",
"//pkg/kv/kvserver/kvflowcontrol/kvflowdispatch",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/kvstorage",
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
Expand Down Expand Up @@ -2474,6 +2475,10 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) {
nodedialer.New(tc.Servers[0].RPCContext(), gossip.AddressResolver(tc.Servers[0].Gossip())),
nil, /* grpcServer */
tc.Servers[0].Stopper(),
kvflowdispatch.NewDummyDispatch(),
kvserver.NoopStoresFlowControlIntegration{},
kvserver.NoopRaftTransportDisconnectListener{},
nil, /* knobs */
)
errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1))
transport.Listen(store0.StoreID(), errChan)
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowdispatch"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness"
Expand Down Expand Up @@ -3197,6 +3198,10 @@ func TestReplicaGCRace(t *testing.T) {
nodedialer.New(tc.Servers[0].RPCContext(), gossip.AddressResolver(fromStore.Gossip())),
nil, /* grpcServer */
tc.Servers[0].Stopper(),
kvflowdispatch.NewDummyDispatch(),
kvserver.NoopStoresFlowControlIntegration{},
kvserver.NoopRaftTransportDisconnectListener{},
nil, /* knobs */
)
errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1))
fromTransport.Listen(fromStore.StoreID(), errChan)
Expand Down Expand Up @@ -3696,6 +3701,10 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) {
gossip.AddressResolver(tc.GetFirstStoreFromServer(t, 0).Gossip())),
nil, /* grpcServer */
tc.Servers[0].Stopper(),
kvflowdispatch.NewDummyDispatch(),
kvserver.NoopStoresFlowControlIntegration{},
kvserver.NoopRaftTransportDisconnectListener{},
nil, /* knobs */
)
errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1))
transport0.Listen(target0.StoreID, errChan)
Expand Down
146 changes: 146 additions & 0 deletions pkg/kv/kvserver/flow_control_raft_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver

import (
"context"
"fmt"
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
)

// raftTransportForFlowControl abstracts the node-level raft transport, and is
// used by the canonical replicaFlowControlIntegration implementation. It
// exposes the set of (remote) stores the raft transport is connected to. If the
// underlying gRPC streams break and don't reconnect, this indicates as much.
// Ditto if they're reconnected to. Also see RaftTransportDisconnectListener,
// which is used to observe every instance of gRPC streams breaking.
type raftTransportForFlowControl interface {
isConnectedTo(storeID roachpb.StoreID) bool
}

var _ raftTransportForFlowControl = &RaftTransport{}

// isConnectedTo implements the raftTransportForFlowControl interface.
func (r *RaftTransport) isConnectedTo(storeID roachpb.StoreID) bool {
r.kvflowControl.mu.RLock()
defer r.kvflowControl.mu.RUnlock()
return r.kvflowControl.mu.connectionTracker.isStoreConnected(storeID)
}

// RaftTransportDisconnectListener observes every instance of the raft
// transport disconnecting replication traffic to the given (remote) stores.
type RaftTransportDisconnectListener interface {
OnRaftTransportDisconnected(context.Context, ...roachpb.StoreID)
}

// connectionTrackerForFlowControl tracks the set of client-side stores and
// server-side nodes the raft transport is connected to. The "client" and
// "server" refer to the client and server side of the RaftTransport stream (see
// flow_control_integration.go). Client-side stores return tokens, it's where
// work gets admitted. Server-side nodes are where tokens were originally
// deducted.
type connectionTrackerForFlowControl struct {
stores map[roachpb.StoreID]struct{}
nodes map[roachpb.NodeID]map[rpc.ConnectionClass]struct{}
}

func newConnectionTrackerForFlowControl() *connectionTrackerForFlowControl {
return &connectionTrackerForFlowControl{
stores: make(map[roachpb.StoreID]struct{}),
nodes: make(map[roachpb.NodeID]map[rpc.ConnectionClass]struct{}),
}
}

// isStoreConnected returns whether we're connected to the given (client-side)
// store.
func (c *connectionTrackerForFlowControl) isStoreConnected(storeID roachpb.StoreID) bool {
_, found := c.stores[storeID]
return found
}

// markStoresConnected is used to inform the tracker that we've received
// raft messages from nodes with the given set of stores.
func (c *connectionTrackerForFlowControl) markStoresConnected(storeIDs []roachpb.StoreID) {
for _, storeID := range storeIDs {
c.stores[storeID] = struct{}{}
}
}

// markStoresDisconnected marks the given set of stores as disconnected.
func (c *connectionTrackerForFlowControl) markStoresDisconnected(storeIDs []roachpb.StoreID) {
for _, storeID := range storeIDs {
delete(c.stores, storeID)
}
}

// isNodeConnected returns whether we're connected to the given (server-side)
// node, independent of the specific RPC connection class.
func (q *connectionTrackerForFlowControl) isNodeConnected(nodeID roachpb.NodeID) bool {
_, found := q.nodes[nodeID]
return found
}

// markNodeConnected informs the tracker that we're connected to the given
// node using the given RPC connection class.
func (q *connectionTrackerForFlowControl) markNodeConnected(
nodeID roachpb.NodeID, class rpc.ConnectionClass,
) {
if len(q.nodes[nodeID]) == 0 {
q.nodes[nodeID] = map[rpc.ConnectionClass]struct{}{}
}
q.nodes[nodeID][class] = struct{}{}
}

// markNodeDisconnected informs the tracker that a previous connection to
// the given node along the given RPC connection class is now broken.
func (q *connectionTrackerForFlowControl) markNodeDisconnected(
nodeID roachpb.NodeID, class rpc.ConnectionClass,
) {
delete(q.nodes[nodeID], class)
if len(q.nodes[nodeID]) == 0 {
delete(q.nodes, nodeID)
}
}

func (c *connectionTrackerForFlowControl) testingPrint() string {
storeIDs := make([]roachpb.StoreID, 0, len(c.stores))
nodeIDs := make([]roachpb.NodeID, 0, len(c.nodes))
for storeID := range c.stores {
storeIDs = append(storeIDs, storeID)
}
for nodeID := range c.nodes {
nodeIDs = append(nodeIDs, nodeID)
}

var buf strings.Builder
sort.Sort(roachpb.StoreIDSlice(storeIDs))
sort.Sort(roachpb.NodeIDSlice(nodeIDs))
buf.WriteString(fmt.Sprintf("connected-stores (server POV): %s\n", roachpb.StoreIDSlice(storeIDs)))
buf.WriteString(fmt.Sprintf("connected-nodes (client POV): %s\n", roachpb.NodeIDSlice(nodeIDs)))
return buf.String()
}

// NoopRaftTransportDisconnectListener is a no-op implementation of the
// RaftTransportDisconnectListener interface.
type NoopRaftTransportDisconnectListener struct{}

var _ RaftTransportDisconnectListener = NoopRaftTransportDisconnectListener{}

// OnRaftTransportDisconnected implements the RaftTransportDisconnectListener
// interface.
func (n NoopRaftTransportDisconnectListener) OnRaftTransportDisconnected(
ctx context.Context, storeIDs ...roachpb.StoreID,
) {
}
Loading

0 comments on commit 4241f6b

Please sign in to comment.