Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panic draining when alloc on non-existent node #4215

Merged
merged 3 commits into from
Apr 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ IMPROVEMENTS:
* ui: Stop job button added to job detail pages [[GH-4189](https://github.com/hashicorp/nomad/pull/4189)]

BUG FIXES:
* core: Fix panic when doing a node drain effecting a job that has an
allocation that was on a node that no longer exists
[[GH-4215](https://github.com/hashicorp/nomad/issues/4215)]
* driver/exec: Create process group for Windows process and send Ctrl-Break signal on Shutdown [[GH-4153](https://github.com/hashicorp/nomad/pull/4153)]
* ui: Alloc stats will continue to poll after a request errors or returns an invalid response [[GH-4195](https://github.com/hashicorp/nomad/pull/4195)]

Expand Down
5 changes: 5 additions & 0 deletions nomad/drainer/draining_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (n *drainingNode) DrainingJobs() ([]structs.NamespacedID, error) {
n.l.RLock()
defer n.l.RUnlock()

// Should never happen
if n.node == nil || n.node.DrainStrategy == nil {
return nil, fmt.Errorf("node doesn't have a drain strategy set")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is returning an error much better than panicking because won't it cause leader flapping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe how we use it: https://github.com/hashicorp/nomad/blob/master/nomad/drainer/watch_nodes.go#L51-L71

Update will only be called if the node is draining. So it is more going to help future uses not introduce bad logic.

}

// Retrieve the allocs on the node
allocs, err := n.state.AllocsByNode(nil, n.node.ID)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions nomad/drainer/watch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,9 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou
return err
}

onDrainingNode = node.DrainStrategy != nil
drainingNodes[node.ID] = onDrainingNode
// Check if the node exists and whether it has a drain strategy
onDrainingNode = node != nil && node.DrainStrategy != nil
drainingNodes[alloc.NodeID] = onDrainingNode
}

// Check if the alloc should be considered migrated. A migrated
Expand Down
73 changes: 73 additions & 0 deletions nomad/drainer/watch_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -663,3 +664,75 @@ func TestHandleTaskGroup_Migrations(t *testing.T) {
require.Empty(res.migrated)
require.True(res.done)
}

// This test asserts that handle task group works when an allocation is on a
// garbage collected node
func TestHandleTaskGroup_GarbageCollectedNode(t *testing.T) {
t.Parallel()
require := require.New(t)

// Create a draining node
state := state.TestStateStore(t)
n := mock.Node()
n.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 5 * time.Minute,
},
ForceDeadline: time.Now().Add(1 * time.Minute),
}
require.Nil(state.UpsertNode(100, n))

job := mock.Job()
require.Nil(state.UpsertJob(101, job))

// Create 10 done allocs
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = n.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}

if i%2 == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop
} else {
a.ClientStatus = structs.AllocClientStatusFailed
}
allocs = append(allocs, a)
}

// Make the first one be on a GC'd node
allocs[0].NodeID = uuid.Generate()
require.Nil(state.UpsertAllocs(102, allocs))

snap, err := state.Snapshot()
require.Nil(err)

// Handle before and after indexes as both service and batch
res := newJobResult()
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 9)
require.True(res.done)

res = newJobResult()
require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 9)
require.True(res.done)

res = newJobResult()
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)

res = newJobResult()
require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)
}
166 changes: 166 additions & 0 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
memdb "github.com/hashicorp/go-memdb"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -658,6 +659,171 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
})
}

func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create two nodes, registering the second later
n1, n2 := mock.Node(), mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

// Create a service job that runs on just one
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
job.CreateIndex = resp.JobModifyIndex

// Create a system job
sysjob := mock.SystemJob()
req = &structs.JobRegisterRequest{
Job: sysjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
sysjob.CreateIndex = resp.JobModifyIndex

// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req = &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
bjob.CreateIndex = resp.JobModifyIndex

// Wait for the allocations to be placed
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Create some old terminal allocs for each job that point at a non-existent
// node to simulate it being on a GC'd node.
var badAllocs []*structs.Allocation
for _, job := range []*structs.Job{job, sysjob, bjob} {
alloc := mock.Alloc()
alloc.Namespace = job.Namespace
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete
badAllocs = append(badAllocs, alloc)
}
require.NoError(state.UpsertAllocs(1, badAllocs))

// Create the second node
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 2 * time.Second,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))

// Wait for the allocs to be replaced
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)

// Wait for the allocs to be stopped
var finalAllocs []*structs.Allocation
testutil.WaitForResult(func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}

var err error
finalAllocs, err = state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
for _, alloc := range finalAllocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Wait for the allocations to be placed on the other node
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

// Test that transistions to force drain work.
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
t.Parallel()
Expand Down