Skip to content

Commit

Permalink
server: don't re-run node decommissioning callback
Browse files Browse the repository at this point in the history
This commit fixes a bug from #80993. Without this commit, nodes
might re-run the callback to enqueue a decommissioning node's ranges
into their replicate queues if they received a gossip update from that
decommissioning node that was perceived to be newer. Re-running this
callback on every newer gossip update from a decommissioning node will
be too expensive for nodes with a lot of replicas.

Release note: None
  • Loading branch information
aayushshah15 committed Jun 9, 2022
1 parent 4d5966e commit 30795be
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2444,6 +2444,19 @@ func TestDecommissionEnqueueReplicas(t *testing.T) {

// Ensure that the scratch range's replica was proactively enqueued.
require.Equal(t, <-enqueuedRangeIDs, tc.LookupRangeOrFatal(t, scratchKey).RangeID)

// Check that the node was marked as decommissioning in each of the nodes'
// decommissioningNodeMap. This needs to be wrapped in a SucceedsSoon to
// deal with gossip propagation delays.
testutils.SucceedsSoon(t, func() error {
for i := 0; i < tc.NumServers(); i++ {
srv := tc.Server(i)
if _, exists := srv.DecommissioningNodeMap()[decommissioningSrv.NodeID()]; !exists {
return errors.Newf("node %d not detected to be decommissioning", decommissioningSrv.NodeID())
}
}
return nil
})
}

decommissionAndCheck(2 /* decommissioningSrvIdx */)
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/decommission.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (t *decommissioningNodeMap) makeOnNodeDecommissioningCallback(
// Nothing more to do.
return
}
t.nodes[decommissioningNodeID] = struct{}{}

logLimiter := log.Every(5 * time.Second) // avoid log spam
if err := stores.VisitStores(func(store *kvserver.Store) error {
Expand Down Expand Up @@ -216,3 +217,15 @@ func (s *Server) Decommission(
}
return nil
}

// DecommissioningNodeMap returns the set of node IDs that are decommissioning
// from the perspective of the server.
func (s *Server) DecommissioningNodeMap() map[roachpb.NodeID]interface{} {
s.decomNodeMap.RLock()
defer s.decomNodeMap.RUnlock()
nodes := make(map[roachpb.NodeID]interface{})
for key, val := range s.decomNodeMap.nodes {
nodes[key] = val
}
return nodes
}
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type Server struct {
admin *adminServer
status *statusServer
drain *drainServer
decomNodeMap *decommissioningNodeMap
authentication *authenticationServer
migrationServer *migrationServer
tsDB *ts.DB
Expand Down Expand Up @@ -844,6 +845,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
admin: sAdmin,
status: sStatus,
drain: drain,
decomNodeMap: decomNodeMap,
authentication: sAuth,
tsDB: tsDB,
tsServer: &sTS,
Expand Down
4 changes: 4 additions & 0 deletions pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ type TestServerInterface interface {
// Decommission idempotently sets the decommissioning flag for specified nodes.
Decommission(ctx context.Context, targetStatus livenesspb.MembershipStatus, nodeIDs []roachpb.NodeID) error

// DecommissioningNodeMap returns a map of nodeIDs that are known to the
// server to be decommissioning.
DecommissioningNodeMap() map[roachpb.NodeID]interface{}

// SplitRange splits the range containing splitKey.
SplitRange(splitKey roachpb.Key) (left roachpb.RangeDescriptor, right roachpb.RangeDescriptor, err error)

Expand Down

0 comments on commit 30795be

Please sign in to comment.