Skip to content

Commit

Permalink
Merge pull request #4215 from hashicorp/b-drain
Browse files Browse the repository at this point in the history
Fix panic draining when alloc on non-existent node
  • Loading branch information
dadgar authored Apr 25, 2018
2 parents 267fded + 7bdbe43 commit f595e9f
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ IMPROVEMENTS:
* ui: Stop job button added to job detail pages [[GH-4189](https://github.com/hashicorp/nomad/pull/4189)]

BUG FIXES:
* core: Fix panic when doing a node drain effecting a job that has an
allocation that was on a node that no longer exists
[[GH-4215](https://github.com/hashicorp/nomad/issues/4215)]
* driver/exec: Create process group for Windows process and send Ctrl-Break signal on Shutdown [[GH-4153](https://github.com/hashicorp/nomad/pull/4153)]
* ui: Alloc stats will continue to poll after a request errors or returns an invalid response [[GH-4195](https://github.com/hashicorp/nomad/pull/4195)]

Expand Down
5 changes: 5 additions & 0 deletions nomad/drainer/draining_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func (n *drainingNode) DrainingJobs() ([]structs.NamespacedID, error) {
n.l.RLock()
defer n.l.RUnlock()

// Should never happen
if n.node == nil || n.node.DrainStrategy == nil {
return nil, fmt.Errorf("node doesn't have a drain strategy set")
}

// Retrieve the allocs on the node
allocs, err := n.state.AllocsByNode(nil, n.node.ID)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions nomad/drainer/watch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,9 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou
return err
}

onDrainingNode = node.DrainStrategy != nil
drainingNodes[node.ID] = onDrainingNode
// Check if the node exists and whether it has a drain strategy
onDrainingNode = node != nil && node.DrainStrategy != nil
drainingNodes[alloc.NodeID] = onDrainingNode
}

// Check if the alloc should be considered migrated. A migrated
Expand Down
73 changes: 73 additions & 0 deletions nomad/drainer/watch_jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -663,3 +664,75 @@ func TestHandleTaskGroup_Migrations(t *testing.T) {
require.Empty(res.migrated)
require.True(res.done)
}

// This test asserts that handle task group works when an allocation is on a
// garbage collected node
func TestHandleTaskGroup_GarbageCollectedNode(t *testing.T) {
t.Parallel()
require := require.New(t)

// Create a draining node
state := state.TestStateStore(t)
n := mock.Node()
n.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 5 * time.Minute,
},
ForceDeadline: time.Now().Add(1 * time.Minute),
}
require.Nil(state.UpsertNode(100, n))

job := mock.Job()
require.Nil(state.UpsertJob(101, job))

// Create 10 done allocs
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
a := mock.Alloc()
a.Job = job
a.TaskGroup = job.TaskGroups[0].Name
a.NodeID = n.ID
a.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(false),
}

if i%2 == 0 {
a.DesiredStatus = structs.AllocDesiredStatusStop
} else {
a.ClientStatus = structs.AllocClientStatusFailed
}
allocs = append(allocs, a)
}

// Make the first one be on a GC'd node
allocs[0].NodeID = uuid.Generate()
require.Nil(state.UpsertAllocs(102, allocs))

snap, err := state.Snapshot()
require.Nil(err)

// Handle before and after indexes as both service and batch
res := newJobResult()
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 9)
require.True(res.done)

res = newJobResult()
require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 101, res))
require.Empty(res.drain)
require.Len(res.migrated, 9)
require.True(res.done)

res = newJobResult()
require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)

res = newJobResult()
require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 103, res))
require.Empty(res.drain)
require.Empty(res.migrated)
require.True(res.done)
}
166 changes: 166 additions & 0 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
memdb "github.com/hashicorp/go-memdb"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -658,6 +659,171 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
})
}

func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create two nodes, registering the second later
n1, n2 := mock.Node(), mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

// Create a service job that runs on just one
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
job.CreateIndex = resp.JobModifyIndex

// Create a system job
sysjob := mock.SystemJob()
req = &structs.JobRegisterRequest{
Job: sysjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
sysjob.CreateIndex = resp.JobModifyIndex

// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req = &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
bjob.CreateIndex = resp.JobModifyIndex

// Wait for the allocations to be placed
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Create some old terminal allocs for each job that point at a non-existent
// node to simulate it being on a GC'd node.
var badAllocs []*structs.Allocation
for _, job := range []*structs.Job{job, sysjob, bjob} {
alloc := mock.Alloc()
alloc.Namespace = job.Namespace
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.DesiredStatus = structs.AllocDesiredStatusStop
alloc.ClientStatus = structs.AllocClientStatusComplete
badAllocs = append(badAllocs, alloc)
}
require.NoError(state.UpsertAllocs(1, badAllocs))

// Create the second node
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 2 * time.Second,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))

// Wait for the allocs to be replaced
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)

// Wait for the allocs to be stopped
var finalAllocs []*structs.Allocation
testutil.WaitForResult(func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}

var err error
finalAllocs, err = state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
for _, alloc := range finalAllocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Wait for the allocations to be placed on the other node
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

// Test that transistions to force drain work.
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
t.Parallel()
Expand Down

0 comments on commit f595e9f

Please sign in to comment.