diff --git a/api/nodes.go b/api/nodes.go index 581f957be85..6184f6cd7aa 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -124,10 +124,10 @@ func (n *Nodes) MonitorDrain(ctx context.Context, nodeID string, index uint64, i // Multiplex node and alloc chans onto outCh. This goroutine closes // outCh when other chans have been closed or context canceled. multiplexCtx, cancel := context.WithCancel(ctx) - go n.monitorDrainMultiplex(ctx, cancel, outCh, nodeCh, allocCh) + go n.monitorDrainMultiplex(multiplexCtx, cancel, outCh, nodeCh, allocCh) // Monitor node for updates - go n.monitorDrainNode(multiplexCtx, nodeID, index, nodeCh) + go n.monitorDrainNode(multiplexCtx, cancel, nodeID, index, nodeCh) // Monitor allocs on node for updates go n.monitorDrainAllocs(multiplexCtx, nodeID, ignoreSys, allocCh) @@ -177,6 +177,14 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), select { case outCh <- msg: case <-ctx.Done(): + + // If we are exiting but we have a message, attempt to send it + // so we don't lose a message but do not block. + select { + case outCh <- msg: + default: + } + return } @@ -189,7 +197,8 @@ func (n *Nodes) monitorDrainMultiplex(ctx context.Context, cancel func(), // monitorDrainNode emits node updates on nodeCh and closes the channel when // the node has finished draining. -func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint64, nodeCh chan<- *MonitorMessage) { +func (n *Nodes) monitorDrainNode(ctx context.Context, cancel func(), + nodeID string, index uint64, nodeCh chan<- *MonitorMessage) { defer close(nodeCh) var lastStrategy *DrainStrategy @@ -215,6 +224,7 @@ func (n *Nodes) monitorDrainNode(ctx context.Context, nodeID string, index uint6 msg = Messagef(MonitorMsgLevelInfo, "Node %q has marked all allocations for migration", nodeID) } else { msg = Messagef(MonitorMsgLevelInfo, "No drain strategy set for node %s", nodeID) + defer cancel() } select { case nodeCh <- msg: diff --git a/command/node_drain_test.go b/command/node_drain_test.go index f0152125c8c..793cc47aa45 100644 --- a/command/node_drain_test.go +++ b/command/node_drain_test.go @@ -258,6 +258,49 @@ func TestNodeDrainCommand_Monitor(t *testing.T) { require.Contains(out, "No drain strategy set") } +func TestNodeDrainCommand_Monitor_NoDrainStrategy(t *testing.T) { + t.Parallel() + require := require.New(t) + server, client, url := testServer(t, true, func(c *agent.Config) { + c.NodeName = "drain_monitor_node" + }) + defer server.Shutdown() + + // Wait for a node to appear + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + if len(nodes) == 0 { + return false, fmt.Errorf("missing node") + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + // Test -monitor flag + outBuf := bytes.NewBuffer(nil) + ui := &cli.BasicUi{ + Reader: bytes.NewReader(nil), + Writer: outBuf, + ErrorWriter: outBuf, + } + cmd := &NodeDrainCommand{Meta: Meta{Ui: ui}} + args := []string{"-address=" + url, "-self", "-monitor", "-ignore-system"} + t.Logf("Running: %v", args) + if code := cmd.Run(args); code != 0 { + t.Fatalf("expected exit 0, got: %d\n%s", code, outBuf.String()) + } + + out := outBuf.String() + t.Logf("Output:\n%s", out) + + require.Contains(out, "Monitoring node") + require.Contains(out, "No drain strategy set") +} + func TestNodeDrainCommand_Fails(t *testing.T) { t.Parallel() srv, _, url := testServer(t, false, nil)