Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor drains from CLI by default but allow detaching #3948

Merged
merged 4 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/allocations.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,8 @@ type DesiredTransition struct {
// migrated to another node.
Migrate *bool
}

// ShouldMigrate returns whether the transition object dictates a migration.
func (d DesiredTransition) ShouldMigrate() bool {
return d.Migrate != nil && *d.Migrate
}
7 changes: 7 additions & 0 deletions api/allocations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,10 @@ func TestAllocations_RescheduleInfo(t *testing.T) {
}

}

func TestAllocations_ShouldMigrate(t *testing.T) {
t.Parallel()
require.True(t, DesiredTransition{Migrate: helper.BoolToPtr(true)}.ShouldMigrate())
require.False(t, DesiredTransition{}.ShouldMigrate())
require.False(t, DesiredTransition{Migrate: helper.BoolToPtr(false)}.ShouldMigrate())
}
165 changes: 163 additions & 2 deletions command/node_drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/posener/complete"
)

Expand Down Expand Up @@ -45,6 +46,9 @@ Node Drain Options:
Remaining allocations after the deadline are forced removed from the node.
If unspecified, a default deadline of one hour is applied.

-detach
Return immediately instead of entering monitor mode.

-force
Force remove allocations off the node immediately.

Expand Down Expand Up @@ -80,6 +84,7 @@ func (c *NodeDrainCommand) AutocompleteFlags() complete.Flags {
"-disable": complete.PredictNothing,
"-enable": complete.PredictNothing,
"-deadline": complete.PredictAnything,
"-detach": complete.PredictNothing,
"-force": complete.PredictNothing,
"-no-deadline": complete.PredictNothing,
"-ignore-system": complete.PredictNothing,
Expand All @@ -105,7 +110,7 @@ func (c *NodeDrainCommand) AutocompleteArgs() complete.Predictor {
}

func (c *NodeDrainCommand) Run(args []string) int {
var enable, disable, force,
var enable, disable, detach, force,
noDeadline, ignoreSystem, keepIneligible, self, autoYes bool
var deadline string

Expand All @@ -114,6 +119,7 @@ func (c *NodeDrainCommand) Run(args []string) int {
flags.BoolVar(&enable, "enable", false, "Enable drain mode")
flags.BoolVar(&disable, "disable", false, "Disable drain mode")
flags.StringVar(&deadline, "deadline", "", "Deadline after which allocations are force stopped")
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&force, "force", false, "Force immediate drain")
flags.BoolVar(&noDeadline, "no-deadline", false, "Drain node with no deadline")
flags.BoolVar(&ignoreSystem, "ignore-system", false, "Do not drain system job allocations from the node")
Expand Down Expand Up @@ -259,11 +265,166 @@ func (c *NodeDrainCommand) Run(args []string) int {
}

// Toggle node draining
if _, err := client.Nodes().UpdateDrain(node.ID, spec, !keepIneligible, nil); err != nil {
meta, err := client.Nodes().UpdateDrain(node.ID, spec, !keepIneligible, nil)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error updating drain specification: %s", err))
return 1
}

c.Ui.Output(fmt.Sprintf("Node %q drain strategy set", node.ID))

if enable && !detach {
if err := monitorDrain(c.Ui.Output, client.Nodes(), node.ID, meta.LastIndex); err != nil {
c.Ui.Error(fmt.Sprintf("Error monitoring drain: %v", err))
return 1
}

c.Ui.Output(fmt.Sprintf("Node %q drain complete", nodeID))
}

return 0
}

// monitorDrain monitors the node being drained and exits when the node has
// finished draining.
func monitorDrain(output func(string), nodeClient *api.Nodes, nodeID string, index uint64) error {
doneCh := make(chan struct{})
defer close(doneCh)

// Errors from either goroutine are sent here
errCh := make(chan error, 1)

// Monitor node changes and close chan when drain is complete
nodeCh := make(chan struct{})
go func() {
for {
q := api.QueryOptions{
AllowStale: true,
WaitIndex: index,
}
node, meta, err := nodeClient.Info(nodeID, &q)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}

if node.DrainStrategy == nil {
close(nodeCh)
return
}

// Drain still ongoing
index = meta.LastIndex
}
}()

// Monitor alloc changes
allocCh := make(chan string, 1)
go func() {
allocs, meta, err := nodeClient.Allocations(nodeID, nil)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}

initial := make(map[string]*api.Allocation, len(allocs))
for _, a := range allocs {
initial[a.ID] = a
}

for {
q := api.QueryOptions{
AllowStale: true,
WaitIndex: meta.LastIndex,
}

allocs, meta, err = nodeClient.Allocations(nodeID, &q)
if err != nil {
select {
case errCh <- err:
case <-doneCh:
}
return
}

for _, a := range allocs {
// Get previous version of alloc
orig, ok := initial[a.ID]

// Update local alloc state
initial[a.ID] = a

migrating := a.DesiredTransition.ShouldMigrate()

msg := ""
switch {
case !ok:
// Should only be possible if response
// from initial Allocations call was
// stale. No need to output

case orig.ClientStatus != a.ClientStatus:
// Alloc status has changed; output
msg = fmt.Sprintf("status %s -> %s", orig.ClientStatus, a.ClientStatus)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a case for NextAllocation != "" and then make the message, "replaced by allocation %q".

case migrating && !orig.DesiredTransition.ShouldMigrate():
// Alloc was marked for migration
msg = "marked for migration"
case migrating && (orig.DesiredStatus != a.DesiredStatus) && a.DesiredStatus == structs.AllocDesiredStatusStop:
// Alloc has already been marked for migration and is now being stopped
msg = "draining"
case a.NextAllocation != "" && orig.NextAllocation == "":
// Alloc has been replaced by another allocation
msg = fmt.Sprintf("replaced by allocation %q", a.NextAllocation)
}

if msg != "" {
select {
case allocCh <- fmt.Sprintf("Alloc %q %s", a.ID, msg):
case <-doneCh:
return
}
}
}
}
}()

done := false
for !done {
select {
case err := <-errCh:
return err
case <-nodeCh:
done = true
case msg := <-allocCh:
output(msg)
}
}

// Loop on alloc messages for a bit longer as we may have gotten the
// "node done" first (since the watchers run concurrently the events
// may be received out of order)
deadline := 250 * time.Millisecond
timer := time.NewTimer(deadline)
for {
select {
case err := <-errCh:
return err
case msg := <-allocCh:
output(msg)
if !timer.Stop() {
<-timer.C
}
timer.Reset(deadline)
case <-timer.C:
// No events within deadline, exit
return nil
}
}
}
Loading