Skip to content

Commit

Permalink
Merge pull request #4380 from hashicorp/b-drain-monitor
Browse files Browse the repository at this point in the history
Monitoring non-draining node exits
  • Loading branch information
dadgar authored Jun 7, 2018
2 parents 4aebd7f + 3402f25 commit 9fd25f1
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 3 deletions.
16 changes: 13 additions & 3 deletions api/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, i
// Multiplex node and alloc chans onto outCh. This goroutine closes
// outCh when other chans have been closed or context canceled.
multiplexCtx, cancel := context.WithCancel(ctx)
go n.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh)
go n.monitorDrainMultiplex(multiplexCtx, cancel, outCh, nodeCh, allocCh)

// Monitor node for updates
go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh)
go n.monitorDrainNode(multiplexCtx, cancel, nodeID, index, nodeCh)

// Monitor allocs on node for updates
go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh)
Expand Down Expand Up @@ -177,6 +177,14 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),
select {
case outCh <- msg:
case <-ctx.Done():

// If we are exiting but we have a message, attempt to send it
// so we don't lose a message but do not block.
select {
case outCh <- msg:
default:
}

return
}

Expand All @@ -189,7 +197,8 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(),

// monitorDrainNode emits node updates on nodeCh and closes the channel when
// the node has finished draining.
func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh chan<- *MonitorMessage) {
func (n *Nodes) monitorDrainNode(ctx context.Context, cancel func(),
nodeID string, index uint64, nodeCh chan<- *MonitorMessage) {
defer close(nodeCh)

var lastStrategy *DrainStrategy
Expand All @@ -215,6 +224,7 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6
msg = Messagef(MonitorMsgLevelInfo, "Node %q has marked all allocations for migration", nodeID)
} else {
msg = Messagef(MonitorMsgLevelInfo, "No drain strategy set for node %s", nodeID)
defer cancel()
}
select {
case nodeCh <- msg:
Expand Down
43 changes: 43 additions & 0 deletions command/node_drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,49 @@ func TestNodeDrainCommand_Monitor(t *testing.T) {
require.Contains(out, "No drain strategy set")
}

func TestNodeDrainCommand_Monitor_NoDrainStrategy(t *testing.T) {
t.Parallel()
require := require.New(t)
server, client, url := testServer(t, true, func(c *agent.Config) {
c.NodeName = "drain_monitor_node"
})
defer server.Shutdown()

// Wait for a node to appear
testutil.WaitForResult(func() (bool, error) {
nodes, _, err := client.Nodes().List(nil)
if err != nil {
return false, err
}
if len(nodes) == 0 {
return false, fmt.Errorf("missing node")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})

// Test -monitor flag
outBuf := bytes.NewBuffer(nil)
ui := &cli.BasicUi{
Reader: bytes.NewReader(nil),
Writer: outBuf,
ErrorWriter: outBuf,
}
cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}}
args := []string{"-address=" + url, "-self", "-monitor", "-ignore-system"}
t.Logf("Running: %v", args)
if code := cmd.Run(args); code != 0 {
t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String())
}

out := outBuf.String()
t.Logf("Output:\n%s", out)

require.Contains(out, "Monitoring node")
require.Contains(out, "No drain strategy set")
}

func TestNodeDrainCommand_Fails(t *testing.T) {
t.Parallel()
srv, _, url := testServer(t, false, nil)
Expand Down

0 comments on commit 9fd25f1

Please sign in to comment.