From 6da9f3c450612edb1994cfc11cfa8e24e05cc3fb Mon Sep 17 00:00:00 2001 From: Wenyi Date: Fri, 16 Jun 2023 12:12:17 -0400 Subject: [PATCH] kvserver: rename RaftMessageHandler to IncomingRaftMessageHandler The commit renames `RaftMessageHandler`, `Listen`, and `Stop` to `IncomingRaftMessageHandler`, `ListenIncomingRaftMessages`, and `StopIncomingRaftMessages`. Another PR is introducing a new interface `OutgoingRaftMessageHandler`, dedicated to managing messages sent. The main purpose of this PR is to make the future PR cleaner by handling the renaming process. Note that this commit does not change any existing functionality. Part of: https://github.com/cockroachdb/cockroach/issues/103983 Related: https://github.com/cockroachdb/cockroach/pull/105122 Release Note: None --- pkg/kv/kvserver/client_merge_test.go | 56 +++++++++--------- pkg/kv/kvserver/client_raft_helpers_test.go | 28 ++++----- pkg/kv/kvserver/client_raft_test.go | 58 +++++++++---------- pkg/kv/kvserver/client_replica_test.go | 44 +++++++------- pkg/kv/kvserver/client_split_test.go | 24 ++++---- .../kvserver/flow_control_integration_test.go | 12 ++-- pkg/kv/kvserver/raft_transport.go | 48 ++++++++------- pkg/kv/kvserver/raft_transport_test.go | 6 +- pkg/kv/kvserver/replica_rangefeed_test.go | 12 ++-- pkg/kv/kvserver/replicate_queue_test.go | 8 +-- pkg/kv/kvserver/store.go | 26 ++++----- pkg/kv/kvserver/store_raft.go | 16 ++--- 12 files changed, 171 insertions(+), 167 deletions(-) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index b8d70ba85d1d..abe81980c987 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -699,9 +699,9 @@ func mergeCheckingTimestampCaches( } else { funcs = partitionedLeaderFuncs } - tc.Servers[i].RaftTransport().Listen(s.StoreID(), &unreliableRaftHandler{ + tc.Servers[i].RaftTransport().ListenIncomingRaftMessages(s.StoreID(), &unreliableRaftHandler{ rangeID: lhsDesc.GetRangeID(), - RaftMessageHandler: s, + IncomingRaftMessageHandler: s, unreliableRaftHandlerFuncs: funcs, }) } @@ -801,17 +801,17 @@ func mergeCheckingTimestampCaches( // Remove the partition. A snapshot to the leaseholder should follow. // This snapshot will inform the leaseholder about the range merge. for i, s := range lhsStores { - var h kvserver.RaftMessageHandler + var h kvserver.IncomingRaftMessageHandler if i == 0 { h = &unreliableRaftHandler{ rangeID: lhsDesc.GetRangeID(), - RaftMessageHandler: s, + IncomingRaftMessageHandler: s, unreliableRaftHandlerFuncs: restoredLeaseholderFuncs, } } else { h = s } - tc.Servers[i].RaftTransport().Listen(s.StoreID(), h) + tc.Servers[i].RaftTransport().ListenIncomingRaftMessages(s.StoreID(), h) } close(filterMu.blockHBAndGCs) filterMu.Lock() @@ -2481,8 +2481,8 @@ func TestStoreReplicaGCAfterMerge(t *testing.T) { nil, /* knobs */ ) errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1)) - transport.Listen(store0.StoreID(), errChan) - transport.Listen(store1.StoreID(), errChan) + transport.ListenIncomingRaftMessages(store0.StoreID(), errChan) + transport.ListenIncomingRaftMessages(store1.StoreID(), errChan) sendHeartbeat := func( rangeID roachpb.RangeID, @@ -2736,9 +2736,9 @@ func TestStoreRangeMergeSlowUnabandonedFollower_WithSplit(t *testing.T) { // Start dropping all Raft traffic to the LHS on store2 so that it won't be // aware that there is a merge in progress. - tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{ - rangeID: lhsDesc.RangeID, - RaftMessageHandler: store2, + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{ + rangeID: lhsDesc.RangeID, + IncomingRaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(req *kvserverpb.RaftMessageRequest) bool { return true @@ -3017,9 +3017,9 @@ func TestStoreRangeMergeAbandonedFollowersAutomaticallyGarbageCollected(t *testi // Start dropping all Raft traffic to the LHS replica on store2 so that it // won't be aware that there is a merge in progress. - tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{ - rangeID: lhsDesc.RangeID, - RaftMessageHandler: store2, + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{ + rangeID: lhsDesc.RangeID, + IncomingRaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(*kvserverpb.RaftMessageRequest) bool { return true @@ -3219,7 +3219,7 @@ func TestStoreRangeReadoptedLHSFollower(t *testing.T) { type slowSnapRaftHandler struct { rangeID roachpb.RangeID waitCh chan struct{} - kvserver.RaftMessageHandler + kvserver.IncomingRaftMessageHandler syncutil.Mutex } @@ -3245,7 +3245,7 @@ func (h *slowSnapRaftHandler) HandleSnapshot( <-waitCh } } - return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream) + return h.IncomingRaftMessageHandler.HandleSnapshot(ctx, header, respStream) } // TestStoreRangeMergeUninitializedLHSFollower reproduces a rare bug in which a @@ -3326,15 +3326,15 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { // of range 1 never processes the split trigger, which would create an // initialized replica of A. unreliableHandler := &unreliableRaftHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: store2, + rangeID: desc.RangeID, + IncomingRaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(request *kvserverpb.RaftMessageRequest) bool { return true }, }, } - tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, unreliableHandler) + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, unreliableHandler) // Perform the split of A, now that store2 won't be able to initialize its // replica of A. @@ -3343,12 +3343,12 @@ func TestStoreRangeMergeUninitializedLHSFollower(t *testing.T) { // Wedge a Raft snapshot that's destined for A. This allows us to capture a // pre-merge Raft snapshot, which we'll let loose after the merge commits. slowSnapHandler := &slowSnapRaftHandler{ - rangeID: aRangeID, - waitCh: make(chan struct{}), - RaftMessageHandler: unreliableHandler, + rangeID: aRangeID, + waitCh: make(chan struct{}), + IncomingRaftMessageHandler: unreliableHandler, } defer slowSnapHandler.unblock() - tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, slowSnapHandler) + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, slowSnapHandler) // Remove the replica of range 1 on store2. If we were to leave it in place, // store2 would refuse to GC its replica of C after the merge commits, because @@ -4007,9 +4007,9 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { aRepl0 := store0.LookupReplica(roachpb.RKey(keyA)) // Start dropping all Raft traffic to the first range on store2. - tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{ - rangeID: aRepl0.RangeID, - RaftMessageHandler: store2, + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{ + rangeID: aRepl0.RangeID, + IncomingRaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(request *kvserverpb.RaftMessageRequest) bool { return true @@ -4050,9 +4050,9 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) { // Restore Raft traffic to the LHS on store2. log.Infof(ctx, "restored traffic to store 2") - tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, &unreliableRaftHandler{ - rangeID: aRepl0.RangeID, - RaftMessageHandler: store2, + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, &unreliableRaftHandler{ + rangeID: aRepl0.RangeID, + IncomingRaftMessageHandler: store2, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(req *kvserverpb.RaftMessageRequest) bool { // Make sure that even going forward no MsgApp for what we just diff --git a/pkg/kv/kvserver/client_raft_helpers_test.go b/pkg/kv/kvserver/client_raft_helpers_test.go index fc00d6070b90..0964dcaef779 100644 --- a/pkg/kv/kvserver/client_raft_helpers_test.go +++ b/pkg/kv/kvserver/client_raft_helpers_test.go @@ -60,7 +60,7 @@ func noopRaftHandlerFuncs() unreliableRaftHandlerFuncs { type unreliableRaftHandler struct { name string rangeID roachpb.RangeID - kvserver.RaftMessageHandler + kvserver.IncomingRaftMessageHandler unreliableRaftHandlerFuncs } @@ -115,7 +115,7 @@ func (h *unreliableRaftHandler) HandleRaftRequest( ) } } - return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) + return h.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream) } func (h *unreliableRaftHandler) filterHeartbeats( @@ -142,7 +142,7 @@ func (h *unreliableRaftHandler) HandleRaftResponse( return nil } } - return h.RaftMessageHandler.HandleRaftResponse(ctx, resp) + return h.IncomingRaftMessageHandler.HandleRaftResponse(ctx, resp) } func (h *unreliableRaftHandler) HandleSnapshot( @@ -155,7 +155,7 @@ func (h *unreliableRaftHandler) HandleSnapshot( return err } } - return h.RaftMessageHandler.HandleSnapshot(ctx, header, respStream) + return h.IncomingRaftMessageHandler.HandleSnapshot(ctx, header, respStream) } func (h *unreliableRaftHandler) HandleDelegatedSnapshot( @@ -169,7 +169,7 @@ func (h *unreliableRaftHandler) HandleDelegatedSnapshot( } } } - return h.RaftMessageHandler.HandleDelegatedSnapshot(ctx, req) + return h.IncomingRaftMessageHandler.HandleDelegatedSnapshot(ctx, req) } // testClusterStoreRaftMessageHandler exists to allows a store to be stopped and @@ -241,7 +241,7 @@ type testClusterPartitionedRange struct { partitioned bool partitionedReplicas map[roachpb.ReplicaID]bool } - handlers []kvserver.RaftMessageHandler + handlers []kvserver.IncomingRaftMessageHandler } // setupPartitionedRange sets up an testClusterPartitionedRange for the provided @@ -275,7 +275,7 @@ func setupPartitionedRange( activated bool, funcs unreliableRaftHandlerFuncs, ) (*testClusterPartitionedRange, error) { - handlers := make([]kvserver.RaftMessageHandler, 0, len(tc.Servers)) + handlers := make([]kvserver.IncomingRaftMessageHandler, 0, len(tc.Servers)) for i := range tc.Servers { handlers = append(handlers, &testClusterStoreRaftMessageHandler{ tc: tc, @@ -291,12 +291,12 @@ func setupPartitionedRangeWithHandlers( replicaID roachpb.ReplicaID, partitionedNodeIdx int, activated bool, - handlers []kvserver.RaftMessageHandler, + handlers []kvserver.IncomingRaftMessageHandler, funcs unreliableRaftHandlerFuncs, ) (*testClusterPartitionedRange, error) { pr := &testClusterPartitionedRange{ rangeID: rangeID, - handlers: make([]kvserver.RaftMessageHandler, 0, len(handlers)), + handlers: make([]kvserver.IncomingRaftMessageHandler, 0, len(handlers)), } pr.mu.partitioned = activated pr.mu.partitionedNodeIdx = partitionedNodeIdx @@ -323,7 +323,7 @@ func setupPartitionedRangeWithHandlers( s := i h := &unreliableRaftHandler{ rangeID: rangeID, - RaftMessageHandler: handlers[s], + IncomingRaftMessageHandler: handlers[s], unreliableRaftHandlerFuncs: funcs, } // Only filter messages from the partitioned store on the other @@ -383,7 +383,7 @@ func setupPartitionedRangeWithHandlers( } } pr.handlers = append(pr.handlers, h) - tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h) + tc.Servers[s].RaftTransport().ListenIncomingRaftMessages(tc.Target(s).StoreID, h) } return pr, nil } @@ -438,9 +438,9 @@ func dropRaftMessagesFrom( store, err := srv.Stores().GetStore(srv.GetFirstStoreID()) require.NoError(t, err) - srv.RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ - rangeID: rangeID, - RaftMessageHandler: store, + srv.RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{ + rangeID: rangeID, + IncomingRaftMessageHandler: store, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropHB: func(hb *kvserverpb.RaftHeartbeat) bool { return shouldDrop(hb.RangeID, hb.FromReplicaID) diff --git a/pkg/kv/kvserver/client_raft_test.go b/pkg/kv/kvserver/client_raft_test.go index 72fdbe5b30b1..f91825ad1ee3 100644 --- a/pkg/kv/kvserver/client_raft_test.go +++ b/pkg/kv/kvserver/client_raft_test.go @@ -919,8 +919,8 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { log.Infof(ctx, "test: installing unreliable Raft transports") for _, s := range []int{0, 1, 2} { h := &unreliableRaftHandler{ - rangeID: partRepl.RangeID, - RaftMessageHandler: tc.GetFirstStoreFromServer(t, s), + rangeID: partRepl.RangeID, + IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, s), } if s != partStore { // Only filter messages from the partitioned store on the other @@ -932,7 +932,7 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { return hb.FromReplicaID == partReplDesc.ReplicaID } } - tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, h) + tc.Servers[s].RaftTransport().ListenIncomingRaftMessages(tc.Target(s).StoreID, h) } // Perform a series of writes on the partitioned replica. The writes will @@ -1028,9 +1028,9 @@ func TestSnapshotAfterTruncationWithUncommittedTail(t *testing.T) { // Remove the partition. Snapshot should follow. log.Infof(ctx, "test: removing the partition") for _, s := range []int{0, 1, 2} { - tc.Servers[s].RaftTransport().Listen(tc.Target(s).StoreID, &unreliableRaftHandler{ - rangeID: partRepl.RangeID, - RaftMessageHandler: tc.GetFirstStoreFromServer(t, s), + tc.Servers[s].RaftTransport().ListenIncomingRaftMessages(tc.Target(s).StoreID, &unreliableRaftHandler{ + rangeID: partRepl.RangeID, + IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, s), unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(req *kvserverpb.RaftMessageRequest) bool { // Make sure that even going forward no MsgApp for what we just truncated can @@ -1149,9 +1149,9 @@ func TestRequestsOnLaggingReplica(t *testing.T) { for _, i := range []int{0, 1, 2} { store := tc.GetFirstStoreFromServer(t, i) h := &unreliableRaftHandler{ - name: fmt.Sprintf("store %d", i), - rangeID: rngDesc.RangeID, - RaftMessageHandler: store, + name: fmt.Sprintf("store %d", i), + rangeID: rngDesc.RangeID, + IncomingRaftMessageHandler: store, } if i != partitionNodeIdx { // Only filter messages from the partitioned store on the other two @@ -1163,7 +1163,7 @@ func TestRequestsOnLaggingReplica(t *testing.T) { return hb.FromReplicaID == partReplDesc.ReplicaID } } - store.Transport().Listen(store.Ident.StoreID, h) + store.Transport().ListenIncomingRaftMessages(store.Ident.StoreID, h) } // Stop the heartbeats so that n1's lease can expire. @@ -1262,12 +1262,12 @@ func TestRequestsOnLaggingReplica(t *testing.T) { // believing that it is the leader, and lease acquisition requests block. log.Infof(ctx, "test: removing partition") slowSnapHandler := &slowSnapRaftHandler{ - rangeID: rngDesc.RangeID, - waitCh: make(chan struct{}), - RaftMessageHandler: partitionStore, + rangeID: rngDesc.RangeID, + waitCh: make(chan struct{}), + IncomingRaftMessageHandler: partitionStore, } defer slowSnapHandler.unblock() - partitionStore.Transport().Listen(partitionStore.Ident.StoreID, slowSnapHandler) + partitionStore.Transport().ListenIncomingRaftMessages(partitionStore.Ident.StoreID, slowSnapHandler) // Remove the unreliable transport from the other stores, so that messages // sent by the partitioned store can reach them. for _, i := range []int{0, 1, 2} { @@ -1276,7 +1276,7 @@ func TestRequestsOnLaggingReplica(t *testing.T) { continue } store := tc.GetFirstStoreFromServer(t, i) - store.Transport().Listen(store.Ident.StoreID, store) + store.Transport().ListenIncomingRaftMessages(store.Ident.StoreID, store) } // Now we're going to send a request to the behind replica, and we expect it @@ -3038,7 +3038,7 @@ func TestRemovePlaceholderRace(t *testing.T) { type noConfChangeTestHandler struct { rangeID roachpb.RangeID - kvserver.RaftMessageHandler + kvserver.IncomingRaftMessageHandler } func (ncc *noConfChangeTestHandler) HandleRaftRequest( @@ -3069,7 +3069,7 @@ func (ncc *noConfChangeTestHandler) HandleRaftRequest( } } } - return ncc.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) + return ncc.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream) } func (ncc *noConfChangeTestHandler) HandleRaftResponse( @@ -3083,7 +3083,7 @@ func (ncc *noConfChangeTestHandler) HandleRaftResponse( return nil } } - return ncc.RaftMessageHandler.HandleRaftResponse(ctx, resp) + return ncc.IncomingRaftMessageHandler.HandleRaftResponse(ctx, resp) } func TestReplicaGCRace(t *testing.T) { @@ -3106,10 +3106,10 @@ func TestReplicaGCRace(t *testing.T) { toStore := tc.GetFirstStoreFromServer(t, 2) // Prevent the victim replica from processing configuration changes. - tc.Servers[2].RaftTransport().Stop(toStore.Ident.StoreID) - tc.Servers[2].RaftTransport().Listen(toStore.Ident.StoreID, &noConfChangeTestHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: toStore, + tc.Servers[2].RaftTransport().StopIncomingRaftMessages(toStore.Ident.StoreID) + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(toStore.Ident.StoreID, &noConfChangeTestHandler{ + rangeID: desc.RangeID, + IncomingRaftMessageHandler: toStore, }) repl, err := leaderStore.GetReplica(desc.RangeID) @@ -3206,7 +3206,7 @@ func TestReplicaGCRace(t *testing.T) { nil, /* knobs */ ) errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1)) - fromTransport.Listen(fromStore.StoreID(), errChan) + fromTransport.ListenIncomingRaftMessages(fromStore.StoreID(), errChan) // Send the heartbeat. Boom. See #11591. // We have to send this multiple times to protect against @@ -3709,7 +3709,7 @@ func TestReplicateRemovedNodeDisruptiveElection(t *testing.T) { nil, /* knobs */ ) errChan := errorChannelTestHandler(make(chan *kvpb.Error, 1)) - transport0.Listen(target0.StoreID, errChan) + transport0.ListenIncomingRaftMessages(target0.StoreID, errChan) // Simulate the removed node asking to trigger an election. Try and try again // until we're reasonably sure the message was sent. @@ -4240,9 +4240,9 @@ func TestUninitializedReplicaRemainsQuiesced(t *testing.T) { } s2, err := tc.Server(1).GetStores().(*kvserver.Stores).GetStore(tc.Server(1).GetFirstStoreID()) require.NoError(t, err) - tc.Servers[1].RaftTransport().Listen(s2.StoreID(), &unreliableRaftHandler{ + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(s2.StoreID(), &unreliableRaftHandler{ rangeID: desc.RangeID, - RaftMessageHandler: s2, + IncomingRaftMessageHandler: s2, unreliableRaftHandlerFuncs: handlerFuncs, }) @@ -4731,9 +4731,9 @@ func TestTracingDoesNotRaceWithCancelation(t *testing.T) { require.Nil(t, err) for i := 0; i < 3; i++ { - tc.Servers[i].RaftTransport().Listen(tc.Target(i).StoreID, &unreliableRaftHandler{ - rangeID: ri.Desc.RangeID, - RaftMessageHandler: tc.GetFirstStoreFromServer(t, i), + tc.Servers[i].RaftTransport().ListenIncomingRaftMessages(tc.Target(i).StoreID, &unreliableRaftHandler{ + rangeID: ri.Desc.RangeID, + IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, i), unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(req *kvserverpb.RaftMessageRequest) bool { return rand.Intn(2) == 0 diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index d16758313d36..becfdc1c49b6 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -2966,9 +2966,9 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { funcs.snapErr = func(*kvserverpb.SnapshotRequest_Header) error { return errors.New("rejected") } - tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{ + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.StoreID(), &unreliableRaftHandler{ rangeID: repl0.GetRangeID(), - RaftMessageHandler: store2, + IncomingRaftMessageHandler: store2, unreliableRaftHandlerFuncs: funcs, }) @@ -2999,7 +2999,7 @@ func TestLeaseTransferInSnapshotUpdatesTimestampCache(t *testing.T) { // Remove the partition. A snapshot to node 2 should follow. This snapshot // will inform node 2 that it is the new leaseholder for the range. Node 2 // should act accordingly and update its internal state to reflect this. - tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, store2) + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, store2) tc.WaitForValues(t, keyC, []int64{4, 4, 4}) // Attempt to write under the read on the new leaseholder. The batch @@ -3101,9 +3101,9 @@ func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) { funcs.snapErr = func(*kvserverpb.SnapshotRequest_Header) error { return errors.New("rejected") } - tc.Servers[2].RaftTransport().Listen(store2.StoreID(), &unreliableRaftHandler{ + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.StoreID(), &unreliableRaftHandler{ rangeID: repl0.GetRangeID(), - RaftMessageHandler: store2, + IncomingRaftMessageHandler: store2, unreliableRaftHandlerFuncs: funcs, }) @@ -3145,7 +3145,7 @@ func TestLeaseTransferRejectedIfTargetNeedsSnapshot(t *testing.T) { require.True(t, isRejectedErr, "%+v", transferErr) // Remove the partition. A snapshot to node 2 should follow. - tc.Servers[2].RaftTransport().Listen(store2.Ident.StoreID, store2) + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store2.Ident.StoreID, store2) tc.WaitForValues(t, keyC, []int64{4, 4, 4}) // Now that node 2 caught up on the log through a snapshot, we should be @@ -3400,9 +3400,9 @@ func TestReplicaTombstone(t *testing.T) { funcs.dropResp = func(*kvserverpb.RaftMessageResponse) bool { return true } - tc.Servers[1].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{ rangeID: desc.RangeID, - RaftMessageHandler: store, + IncomingRaftMessageHandler: store, unreliableRaftHandlerFuncs: funcs, }) tc.RemoveVotersOrFatal(t, key, tc.Target(1)) @@ -3456,9 +3456,9 @@ func TestReplicaTombstone(t *testing.T) { raftFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { return req.ToReplica.StoreID == store.StoreID() } - tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{ rangeID: desc.RangeID, - RaftMessageHandler: store, + IncomingRaftMessageHandler: store, unreliableRaftHandlerFuncs: raftFuncs, }) tc.RemoveVotersOrFatal(t, key, tc.Target(2)) @@ -3500,9 +3500,9 @@ func TestReplicaTombstone(t *testing.T) { // It will never find out it has been removed. We'll remove it // with a manual replica GC. store, _ := getFirstStoreReplica(t, tc.Server(2), key) - tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: store, + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{ + rangeID: desc.RangeID, + IncomingRaftMessageHandler: store, }) tc.RemoveVotersOrFatal(t, key, tc.Target(2)) repl, err := store.GetReplica(desc.RangeID) @@ -3539,9 +3539,9 @@ func TestReplicaTombstone(t *testing.T) { rangeID := desc.RangeID // Partition node 2 from all raft communication. store, _ := getFirstStoreReplica(t, tc.Server(2), keyA) - tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: store, + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{ + rangeID: desc.RangeID, + IncomingRaftMessageHandler: store, }) // We'll move the range from server 2 to 3 and merge key and keyA. @@ -3620,9 +3620,9 @@ func TestReplicaTombstone(t *testing.T) { waiter.blockSnapshot = true } setMinHeartbeat(repl.ReplicaID() + 1) - tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: store, + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{ + rangeID: desc.RangeID, + IncomingRaftMessageHandler: store, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropResp: func(*kvserverpb.RaftMessageResponse) bool { return true @@ -3727,12 +3727,12 @@ func TestReplicaTombstone(t *testing.T) { raftFuncs.dropReq = func(req *kvserverpb.RaftMessageRequest) bool { return partActive.Load().(bool) && req.Message.Type == raftpb.MsgApp } - tc.Servers[2].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ + tc.Servers[2].RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{ rangeID: lhsDesc.RangeID, unreliableRaftHandlerFuncs: raftFuncs, - RaftMessageHandler: &unreliableRaftHandler{ + IncomingRaftMessageHandler: &unreliableRaftHandler{ rangeID: rhsDesc.RangeID, - RaftMessageHandler: store, + IncomingRaftMessageHandler: store, unreliableRaftHandlerFuncs: raftFuncs, }, }) diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 066689b8c369..cb98248e48b8 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -868,12 +868,12 @@ func TestStoreRangeSplitMergeStats(t *testing.T) { require.Equal(t, ms, msMerged, "post-merge stats differ from pre-split") } -// RaftMessageHandlerInterceptor wraps a storage.RaftMessageHandler. It -// delegates all methods to the underlying storage.RaftMessageHandler, except -// that HandleSnapshot calls receiveSnapshotFilter with the snapshot request -// header before delegating to the underlying HandleSnapshot method. +// RaftMessageHandlerInterceptor wraps a storage.IncomingRaftMessageHandler. It +// delegates all methods to the underlying storage.IncomingRaftMessageHandler, +// except that HandleSnapshot calls receiveSnapshotFilter with the snapshot +// request header before delegating to the underlying HandleSnapshot method. type RaftMessageHandlerInterceptor struct { - kvserver.RaftMessageHandler + kvserver.IncomingRaftMessageHandler handleSnapshotFilter func(header *kvserverpb.SnapshotRequest_Header) } @@ -883,7 +883,7 @@ func (mh RaftMessageHandlerInterceptor) HandleSnapshot( respStream kvserver.SnapshotResponseStream, ) error { mh.handleSnapshotFilter(header) - return mh.RaftMessageHandler.HandleSnapshot(ctx, header, respStream) + return mh.IncomingRaftMessageHandler.HandleSnapshot(ctx, header, respStream) } // TestStoreEmptyRangeSnapshotSize tests that the snapshot request header for a @@ -923,7 +923,7 @@ func TestStoreEmptyRangeSnapshotSize(t *testing.T) { headers []*kvserverpb.SnapshotRequest_Header }{} messageHandler := RaftMessageHandlerInterceptor{ - RaftMessageHandler: tc.GetFirstStoreFromServer(t, 1), + IncomingRaftMessageHandler: tc.GetFirstStoreFromServer(t, 1), handleSnapshotFilter: func(header *kvserverpb.SnapshotRequest_Header) { // Each snapshot request is handled in a new goroutine, so we need // synchronization. @@ -932,7 +932,7 @@ func TestStoreEmptyRangeSnapshotSize(t *testing.T) { messageRecorder.headers = append(messageRecorder.headers, header) }, } - tc.Servers[1].RaftTransport().Listen(tc.GetFirstStoreFromServer(t, 1).StoreID(), messageHandler) + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(tc.GetFirstStoreFromServer(t, 1).StoreID(), messageHandler) // Replicate the newly-split range to trigger a snapshot request from store 0 // to store 1. @@ -3383,9 +3383,9 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { }) store, _ := getFirstStoreReplica(t, tc.Server(1), k) - tc.Servers[1].RaftTransport().Listen(store.StoreID(), &unreliableRaftHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: store, + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(store.StoreID(), &unreliableRaftHandler{ + rangeID: desc.RangeID, + IncomingRaftMessageHandler: store, }) _, kRHS := k, k.Next() @@ -3461,7 +3461,7 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) { // Re-enable raft and wait for the lhs to catch up to the post-split // descriptor. This used to panic with "raft group deleted". - tc.Servers[1].RaftTransport().Listen(store.StoreID(), store) + tc.Servers[1].RaftTransport().ListenIncomingRaftMessages(store.StoreID(), store) testutils.SucceedsSoon(t, func() error { repl, err := store.GetReplica(descLHS.RangeID) if err != nil { diff --git a/pkg/kv/kvserver/flow_control_integration_test.go b/pkg/kv/kvserver/flow_control_integration_test.go index e03972f38a30..8857185fe283 100644 --- a/pkg/kv/kvserver/flow_control_integration_test.go +++ b/pkg/kv/kvserver/flow_control_integration_test.go @@ -100,10 +100,10 @@ func TestFlowControlBasic(t *testing.T) { for i := 0; i < numNodes; i++ { si, err := tc.Server(i).GetStores().(*kvserver.Stores).GetStore(tc.Server(i).GetFirstStoreID()) require.NoError(t, err) - tc.Servers[i].RaftTransport().Listen(si.StoreID(), + tc.Servers[i].RaftTransport().ListenIncomingRaftMessages(si.StoreID(), &unreliableRaftHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: si, + rangeID: desc.RangeID, + IncomingRaftMessageHandler: si, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(req *kvserverpb.RaftMessageRequest) bool { // Install a raft handler to get verbose raft logging. @@ -1907,10 +1907,10 @@ func TestFlowControlUnquiescedRange(t *testing.T) { for i := 0; i < numNodes; i++ { si, err := tc.Server(i).GetStores().(*kvserver.Stores).GetStore(tc.Server(i).GetFirstStoreID()) require.NoError(t, err) - tc.Servers[i].RaftTransport().Listen(si.StoreID(), + tc.Servers[i].RaftTransport().ListenIncomingRaftMessages(si.StoreID(), &unreliableRaftHandler{ - rangeID: desc.RangeID, - RaftMessageHandler: si, + rangeID: desc.RangeID, + IncomingRaftMessageHandler: si, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(req *kvserverpb.RaftMessageRequest) bool { // Install a raft handler to get verbose raft logging. diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index 3090305ce134..8458cf2f8a4f 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -105,9 +105,9 @@ type SnapshotResponseStream interface { Recv() (*kvserverpb.SnapshotRequest, error) } -// RaftMessageHandler is the interface that must be implemented by -// arguments to RaftTransport.Listen. -type RaftMessageHandler interface { +// IncomingRaftMessageHandler is the interface that must be implemented by +// arguments to RaftTransport.ListenIncomingRaftMessages. +type IncomingRaftMessageHandler interface { // HandleRaftRequest is called for each incoming Raft message. The request is // always processed asynchronously and the response is sent over respStream. // If an error is encountered during asynchronous processing, it will be @@ -161,8 +161,8 @@ type RaftTransport struct { // is done while holding kvflowControl.mu. queues [rpc.NumConnectionClasses]syncutil.IntMap - dialer *nodedialer.Dialer - handlers syncutil.IntMap // map[roachpb.StoreID]*RaftMessageHandler + dialer *nodedialer.Dialer + incomingMessageHandlers syncutil.IntMap // map[roachpb.StoreID]*IncomingRaftMessageHandler kvflowControl struct { // Everything nested under this struct is used to return flow tokens @@ -372,9 +372,11 @@ func (t *RaftTransport) queueByteSize() int64 { return size } -func (t *RaftTransport) getHandler(storeID roachpb.StoreID) (RaftMessageHandler, bool) { - if value, ok := t.handlers.Load(int64(storeID)); ok { - return *(*RaftMessageHandler)(value), true +func (t *RaftTransport) getIncomingRaftMessageHandler( + storeID roachpb.StoreID, +) (IncomingRaftMessageHandler, bool) { + if value, ok := t.incomingMessageHandlers.Load(int64(storeID)); ok { + return *(*IncomingRaftMessageHandler)(value), true } return nil, false } @@ -410,14 +412,14 @@ func (t *RaftTransport) handleRaftRequest( return nil } - handler, ok := t.getHandler(req.ToReplica.StoreID) + incomingMessageHandler, ok := t.getIncomingRaftMessageHandler(req.ToReplica.StoreID) if !ok { log.Warningf(ctx, "unable to accept Raft message from %+v: no handler registered for %+v", req.FromReplica, req.ToReplica) return kvpb.NewError(kvpb.NewStoreNotFoundError(req.ToReplica.StoreID)) } - return handler.HandleRaftRequest(ctx, req, respStream) + return incomingMessageHandler.HandleRaftRequest(ctx, req, respStream) } // newRaftMessageResponse constructs a RaftMessageResponse from the @@ -537,7 +539,7 @@ func (t *RaftTransport) InternalDelegateRaftSnapshot( } } // Get the handler of the sender store. - handler, ok := t.getHandler(req.DelegatedSender.StoreID) + incomingMessageHandler, ok := t.getIncomingRaftMessageHandler(req.DelegatedSender.StoreID) if !ok { log.Warningf( ctx, @@ -554,7 +556,7 @@ func (t *RaftTransport) InternalDelegateRaftSnapshot( } // Pass off the snapshot request to the sender store. - return handler.HandleDelegatedSnapshot(ctx, req) + return incomingMessageHandler.HandleDelegatedSnapshot(ctx, req) } // RaftSnapshot handles incoming streaming snapshot requests. @@ -570,23 +572,25 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error return stream.Send(snapRespErr(err)) } rmr := req.Header.RaftMessageRequest - handler, ok := t.getHandler(rmr.ToReplica.StoreID) + incomingMessageHandler, ok := t.getIncomingRaftMessageHandler(rmr.ToReplica.StoreID) if !ok { log.Warningf(ctx, "unable to accept Raft message from %+v: no handler registered for %+v", rmr.FromReplica, rmr.ToReplica) return kvpb.NewStoreNotFoundError(rmr.ToReplica.StoreID) } - return handler.HandleSnapshot(ctx, req.Header, stream) + return incomingMessageHandler.HandleSnapshot(ctx, req.Header, stream) } -// Listen registers a raftMessageHandler to receive proxied messages. -func (t *RaftTransport) Listen(storeID roachpb.StoreID, handler RaftMessageHandler) { - t.handlers.Store(int64(storeID), unsafe.Pointer(&handler)) +// ListenIncomingRaftMessages registers a IncomingRaftMessageHandler to receive proxied messages. +func (t *RaftTransport) ListenIncomingRaftMessages( + storeID roachpb.StoreID, handler IncomingRaftMessageHandler, +) { + t.incomingMessageHandlers.Store(int64(storeID), unsafe.Pointer(&handler)) } -// Stop unregisters a raftMessageHandler. -func (t *RaftTransport) Stop(storeID roachpb.StoreID) { - t.handlers.Delete(int64(storeID)) +// StopIncomingRaftMessages unregisters a IncomingRaftMessageHandler. +func (t *RaftTransport) StopIncomingRaftMessages(storeID roachpb.StoreID) { + t.incomingMessageHandlers.Delete(int64(storeID)) } // processQueue opens a Raft client stream and sends messages from the @@ -611,13 +615,13 @@ func (t *RaftTransport) processQueue( return err } t.metrics.ReverseRcvd.Inc(1) - handler, ok := t.getHandler(resp.ToReplica.StoreID) + incomingMessageHandler, ok := t.getIncomingRaftMessageHandler(resp.ToReplica.StoreID) if !ok { log.Warningf(ctx, "no handler found for store %s in response %s", resp.ToReplica.StoreID, resp) continue } - if err := handler.HandleRaftResponse(ctx, resp); err != nil { + if err := incomingMessageHandler.HandleRaftResponse(ctx, resp); err != nil { return err } } diff --git a/pkg/kv/kvserver/raft_transport_test.go b/pkg/kv/kvserver/raft_transport_test.go index fd9ed75045a7..0990099014b1 100644 --- a/pkg/kv/kvserver/raft_transport_test.go +++ b/pkg/kv/kvserver/raft_transport_test.go @@ -220,7 +220,7 @@ func (rttc *raftTransportTestContext) ListenStore( nodeID roachpb.NodeID, storeID roachpb.StoreID, ) channelServer { ch := newChannelServer(100, 10*time.Millisecond) - rttc.transports[nodeID].Listen(storeID, ch) + rttc.transports[nodeID].ListenIncomingRaftMessages(storeID, ch) return ch } @@ -530,7 +530,7 @@ func TestRaftTransportIndependentRanges(t *testing.T) { const numMessages = 50 channelServer := newChannelServer(numMessages*2, 10*time.Millisecond) channelServer.brokenRange = 13 - serverTransport.Listen(server.StoreID, channelServer) + serverTransport.ListenIncomingRaftMessages(server.StoreID, channelServer) for i := 0; i < numMessages; i++ { for _, rangeID := range []roachpb.RangeID{1, 13} { @@ -591,7 +591,7 @@ func TestReopenConnection(t *testing.T) { rttc.ListenStore(clientReplica.NodeID, clientReplica.StoreID) // Take down the old server and start a new one at the same address. - serverTransport.Stop(serverReplica.StoreID) + serverTransport.StopIncomingRaftMessages(serverReplica.StoreID) serverStopper.Stop(context.Background()) // With the old server down, nothing is listening no the address right now diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index 7bd1a9e3b344..0c85ec4016e2 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -814,9 +814,9 @@ func TestReplicaRangefeedErrors(t *testing.T) { }) // Partition the replica from the rest of its range. - partitionStore.Transport().Listen(partitionStore.Ident.StoreID, &unreliableRaftHandler{ - rangeID: rangeID, - RaftMessageHandler: partitionStore, + partitionStore.Transport().ListenIncomingRaftMessages(partitionStore.Ident.StoreID, &unreliableRaftHandler{ + rangeID: rangeID, + IncomingRaftMessageHandler: partitionStore, }) // Perform a write on the range. @@ -849,9 +849,9 @@ func TestReplicaRangefeedErrors(t *testing.T) { } // Remove the partition. Snapshot should follow. - partitionStore.Transport().Listen(partitionStore.Ident.StoreID, &unreliableRaftHandler{ - rangeID: rangeID, - RaftMessageHandler: partitionStore, + partitionStore.Transport().ListenIncomingRaftMessages(partitionStore.Ident.StoreID, &unreliableRaftHandler{ + rangeID: rangeID, + IncomingRaftMessageHandler: partitionStore, unreliableRaftHandlerFuncs: unreliableRaftHandlerFuncs{ dropReq: func(req *kvserverpb.RaftMessageRequest) bool { // Make sure that even going forward no MsgApp for what we just truncated can diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index 6b700632aa19..12d730781e75 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -1875,7 +1875,7 @@ func TestLargeUnsplittableRangeReplicate(t *testing.T) { } type delayingRaftMessageHandler struct { - kvserver.RaftMessageHandler + kvserver.IncomingRaftMessageHandler leaseHolderNodeID uint64 rangeID roachpb.RangeID } @@ -1891,11 +1891,11 @@ func (h delayingRaftMessageHandler) HandleRaftRequest( respStream kvserver.RaftMessageResponseStream, ) *kvpb.Error { if h.rangeID != req.RangeID { - return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) + return h.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream) } go func() { time.Sleep(raftDelay) - err := h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) + err := h.IncomingRaftMessageHandler.HandleRaftRequest(ctx, req, respStream) if err != nil { log.Infof(ctx, "HandleRaftRequest returned err %s", err) } @@ -1972,7 +1972,7 @@ func TestTransferLeaseToLaggingNode(t *testing.T) { if err != nil { t.Fatal(err) } - remoteStore.Transport().Listen( + remoteStore.Transport().ListenIncomingRaftMessages( remoteStoreID, delayingRaftMessageHandler{remoteStore, leaseHolderNodeID, rangeID}, ) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 6c25a43159e9..e57863bcd430 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -585,18 +585,18 @@ A Replica should be thought of primarily as a State Machine applying commands from a replicated log (the log being replicated across the members of the Range). The Store's RaftTransport receives Raft messages from Replicas residing on other Stores and routes them to the appropriate Replicas via -Store.HandleRaftRequest (which is part of the RaftMessageHandler interface), -ultimately resulting in a call to Replica.handleRaftReadyRaftMuLocked, which -houses the integration with the etcd/raft library (raft.RawNode). This may -generate Raft messages to be sent to other Stores; these are handed to -Replica.sendRaftMessages which ultimately hands them to the Store's -RaftTransport.SendAsync method. Raft uses message passing (not -request-response), and outgoing messages will use a gRPC stream that differs -from that used for incoming messages (which makes asymmetric partitions more -likely in case of stream-specific problems). The steady state is relatively -straightforward but when Ranges are being reconfigured, an understanding the -Replica Lifecycle becomes important and upholding the Store's invariants becomes -more complex. +Store.HandleRaftRequest (which is part of the IncomingRaftMessageHandler +interface), ultimately resulting in a call to +Replica.handleRaftReadyRaftMuLocked, which houses the integration with the +etcd/raft library (raft.RawNode). This may generate Raft messages to be sent to +other Stores; these are handed to Replica.sendRaftMessages which ultimately +hands them to the Store's RaftTransport.SendAsync method. Raft uses message +passing (not request-response), and outgoing messages will use a gRPC stream +that differs from that used for incoming messages (which makes asymmetric +partitions more likely in case of stream-specific problems). The steady state is +relatively straightforward but when Ranges are being reconfigured, an +understanding the Replica Lifecycle becomes important and upholding the Store's +invariants becomes more complex. A first phenomenon to understand is that of uninitialized Replicas, which is the State Machine at applied index zero, i.e. has an empty state. In CockroachDB, an @@ -2059,7 +2059,7 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error { } // Start Raft processing goroutines. - s.cfg.Transport.Listen(s.StoreID(), s) + s.cfg.Transport.ListenIncomingRaftMessages(s.StoreID(), s) s.processRaft(ctx) // Register a callback to unquiesce any ranges with replicas on a diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 07d260473a3f..073beed90c60 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -269,9 +269,9 @@ func (s *Store) uncoalesceBeats( func (s *Store) HandleRaftRequest( ctx context.Context, req *kvserverpb.RaftMessageRequest, respStream RaftMessageResponseStream, ) *kvpb.Error { - // NB: unlike the other two RaftMessageHandler methods implemented by Store, - // this one doesn't need to directly run through a Stopper task because it - // delegates all work through a raftScheduler, whose workers' lifetimes are + // NB: unlike the other two IncomingRaftMessageHandler methods implemented by + // Store, this one doesn't need to directly run through a Stopper task because + // it delegates all work through a raftScheduler, whose workers' lifetimes are // already tied to the Store's Stopper. if len(req.Heartbeats)+len(req.HeartbeatResps) > 0 { if req.RangeID != 0 { @@ -476,10 +476,10 @@ func (s *Store) processRaftSnapshotRequest( }) } -// HandleRaftResponse implements the RaftMessageHandler interface. Per the -// interface specification, an error is returned if and only if the underlying -// Raft connection should be closed. -// It requires that s.mu is not held. +// HandleRaftResponse implements the IncomingRaftMessageHandler interface. Per +// the interface specification, an error is returned if and only if the +// underlying Raft connection should be closed. It requires that s.mu is not +// held. func (s *Store) HandleRaftResponse( ctx context.Context, resp *kvserverpb.RaftMessageResponse, ) error { @@ -720,7 +720,7 @@ func (s *Store) processRaft(ctx context.Context) { _ = s.stopper.RunAsyncTask(ctx, "sched-tick-loop", s.raftTickLoop) _ = s.stopper.RunAsyncTask(ctx, "coalesced-hb-loop", s.coalescedHeartbeatsLoop) s.stopper.AddCloser(stop.CloserFn(func() { - s.cfg.Transport.Stop(s.StoreID()) + s.cfg.Transport.StopIncomingRaftMessages(s.StoreID()) })) s.syncWaiter.Start(ctx, s.stopper)