Skip to content

Commit

Permalink
Merge pull request #4284 from hashicorp/f-drain-event
Browse files Browse the repository at this point in the history
Emit Node Events for draining
  • Loading branch information
dadgar authored May 22, 2018
2 parents 656731d + 79c5a9c commit b9d9b44
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 32 deletions.
25 changes: 22 additions & 3 deletions nomad/drainer/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ const (
// NodeDeadlineCoalesceWindow is the duration in which deadlining nodes will
// be coalesced together
NodeDeadlineCoalesceWindow = 5 * time.Second

// NodeDrainEventComplete is used to indicate that the node drain is
// finished.
NodeDrainEventComplete = "Node drain complete"

// NodeDrainEventDetailDeadlined is the key to use when the drain is
// complete because a deadline. The acceptable values are "true" and "false"
NodeDrainEventDetailDeadlined = "deadline_reached"
)

// RaftApplier contains methods for applying the raft requests required by the
// NodeDrainer.
type RaftApplier interface {
AllocUpdateDesiredTransition(allocs map[string]*structs.DesiredTransition, evals []*structs.Evaluation) (uint64, error)
NodesDrainComplete(nodes []string) (uint64, error)
NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error)
}

// NodeTracker is the interface to notify an object that is tracking draining
Expand Down Expand Up @@ -254,10 +262,16 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
n.l.RUnlock()
n.batchDrainAllocs(forceStop)

// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete).
AddDetail(NodeDrainEventDetailDeadlined, "true")

// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, nodes) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}
Expand Down Expand Up @@ -324,10 +338,15 @@ func (n *NodeDrainer) handleMigratedAllocs(allocs []*structs.Allocation) {
}
}

// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)

// Submit the node transitions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(defaultMaxIdsPerTxn, done) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
if _, err := n.raft.NodesDrainComplete(nodes, event); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}
Expand Down
7 changes: 6 additions & 1 deletion nomad/drainer/watch_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,12 @@ func (n *NodeDrainer) Update(node *structs.Node) {
}
}

index, err := n.raft.NodesDrainComplete([]string{node.ID})
// Create the node event
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemDrain).
SetMessage(NodeDrainEventComplete)

index, err := n.raft.NodesDrainComplete([]string{node.ID}, event)
if err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for node %q: %v", node.ID, err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions nomad/drainer/watch_nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestNodeDrainWatcher_Remove(t *testing.T) {
require.Equal(n, tracked[n.ID])

// Change the node to be not draining and wait for it to be untracked
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false))
require.Nil(state.UpdateNodeDrain(101, n.ID, nil, false, nil))
testutil.WaitForResult(func() (bool, error) {
return len(m.Events) == 2, nil
}, func(err error) {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestNodeDrainWatcher_Update(t *testing.T) {
// Change the node to have a new spec
s2 := n.DrainStrategy.Copy()
s2.Deadline += time.Hour
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false))
require.Nil(state.UpdateNodeDrain(101, n.ID, s2, false, nil))

// Wait for it to be updated
testutil.WaitForResult(func() (bool, error) {
Expand Down
47 changes: 47 additions & 0 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/drainer"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -212,6 +213,12 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}

func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
Expand Down Expand Up @@ -300,6 +307,13 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}

func TestDrainer_DrainEmptyNode(t *testing.T) {
Expand Down Expand Up @@ -343,6 +357,12 @@ func TestDrainer_DrainEmptyNode(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}

func TestDrainer_AllTypes_Deadline(t *testing.T) {
Expand Down Expand Up @@ -500,6 +520,13 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) {
}
}
require.True(serviceMax < batchMax)

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}

// Test that drain is unset when batch jobs naturally finish
Expand Down Expand Up @@ -659,6 +686,12 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}

func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
Expand Down Expand Up @@ -824,6 +857,13 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 3)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}

// Test that transitions to force drain work.
Expand Down Expand Up @@ -962,6 +1002,13 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) {
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
require.Len(node.Events, 4)
require.Equal(drainer.NodeDrainEventComplete, node.Events[3].Message)
require.Contains(node.Events[3].Details, drainer.NodeDrainEventDetailDeadlined)
})
}
}
6 changes: 5 additions & 1 deletion nomad/drainer_shims.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ type drainerShim struct {
s *Server
}

func (d drainerShim) NodesDrainComplete(nodes []string) (uint64, error) {
func (d drainerShim) NodesDrainComplete(nodes []string, event *structs.NodeEvent) (uint64, error) {
args := &structs.BatchNodeUpdateDrainRequest{
Updates: make(map[string]*structs.DrainUpdate, len(nodes)),
NodeEvents: make(map[string]*structs.NodeEvent, len(nodes)),
WriteRequest: structs.WriteRequest{Region: d.s.config.Region},
}

update := &structs.DrainUpdate{}
for _, node := range nodes {
args.Updates[node] = update
if event != nil {
args.NodeEvents[node] = event
}
}

resp, index, err := d.s.raftApply(structs.BatchNodeUpdateDrainRequestType, args)
Expand Down
4 changes: 2 additions & 2 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
}
}

if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible); err != nil {
if err := n.state.UpdateNodeDrain(index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.NodeEvent); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpdateNodeDrain failed: %v", err)
return err
}
Expand All @@ -363,7 +363,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.state.BatchUpdateNodeDrain(index, req.Updates); err != nil {
if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
n.logger.Printf("[ERR] nomad.fsm: BatchUpdateNodeDrain failed: %v", err)
return err
}
Expand Down
15 changes: 15 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,20 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
Deadline: 10 * time.Second,
},
}
event := &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
}
req2 := structs.BatchNodeUpdateDrainRequest{
Updates: map[string]*structs.DrainUpdate{
node.ID: {
DrainStrategy: strategy,
},
},
NodeEvents: map[string]*structs.NodeEvent{
node.ID: event,
},
}
buf, err = structs.Encode(structs.BatchNodeUpdateDrainRequestType, req2)
require.Nil(err)
Expand All @@ -346,6 +354,7 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) {
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}

func TestFSM_UpdateNodeDrain(t *testing.T) {
Expand All @@ -371,6 +380,11 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
req2 := structs.NodeUpdateDrainRequest{
NodeID: node.ID,
DrainStrategy: strategy,
NodeEvent: &structs.NodeEvent{
Message: "Drain strategy enabled",
Subsystem: structs.NodeEventSubsystemDrain,
Timestamp: time.Now(),
},
}
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2)
require.Nil(err)
Expand All @@ -384,6 +398,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
require.Nil(err)
require.True(node.Drain)
require.Equal(node.DrainStrategy, strategy)
require.Len(node.Events, 2)
}

func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions nomad/node_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ const (
// maxParallelRequestsPerDerive is the maximum number of parallel Vault
// create token requests that may be outstanding per derive request
maxParallelRequestsPerDerive = 16

// NodeDrainEvents are the various drain messages
NodeDrainEventDrainSet = "Node drain strategy set"
NodeDrainEventDrainDisabled = "Node drain disabled"
NodeDrainEventDrainUpdated = "Node drain stategy updated"
)

// Node endpoint is used for client interactions
Expand Down Expand Up @@ -439,6 +444,9 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
if args.NodeID == "" {
return fmt.Errorf("missing node ID for drain update")
}
if args.NodeEvent != nil {
return fmt.Errorf("node event must not be set")
}

// Look for the node
snap, err := n.srv.fsm.State().Snapshot()
Expand Down Expand Up @@ -468,6 +476,18 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
args.DrainStrategy.ForceDeadline = time.Now().Add(args.DrainStrategy.Deadline)
}

// Construct the node event
args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemDrain)
if node.DrainStrategy == nil && args.DrainStrategy == nil {
return nil // Nothing to do
} else if node.DrainStrategy == nil && args.DrainStrategy != nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainSet)
} else if node.DrainStrategy != nil && args.DrainStrategy != nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainUpdated)
} else if node.DrainStrategy != nil && args.DrainStrategy == nil {
args.NodeEvent.SetMessage(NodeDrainEventDrainDisabled)
}

// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.NodeUpdateDrainRequestType, args)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,8 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) {
require.Nil(err)
require.True(out.Drain)
require.Equal(strategy.Deadline, out.DrainStrategy.Deadline)
require.Len(out.Events, 2)
require.Equal(NodeDrainEventDrainSet, out.Events[1].Message)

// before+deadline should be before the forced deadline
require.True(beforeUpdate.Add(strategy.Deadline).Before(out.DrainStrategy.ForceDeadline))
Expand Down Expand Up @@ -2587,7 +2589,7 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
Deadline: 10 * time.Second,
},
}
errCh <- state.UpdateNodeDrain(3, node.ID, s, false)
errCh <- state.UpdateNodeDrain(3, node.ID, s, false, nil)
})

req.MinQueryIndex = 2
Expand Down
Loading

0 comments on commit b9d9b44

Please sign in to comment.