From 9fd5847433fcfd9ae86ec5a30de63dd231f36fb0 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 25 Apr 2018 13:21:36 -0700 Subject: [PATCH 1/3] Fix detecting drain strategy on GC'd node --- nomad/drainer/watch_jobs.go | 5 +- nomad/drainer/watch_jobs_test.go | 73 ++++++++++++++ nomad/drainer_int_test.go | 166 +++++++++++++++++++++++++++++++ 3 files changed, 242 insertions(+), 2 deletions(-) diff --git a/nomad/drainer/watch_jobs.go b/nomad/drainer/watch_jobs.go index b5173c82aae..fec7743e6a0 100644 --- a/nomad/drainer/watch_jobs.go +++ b/nomad/drainer/watch_jobs.go @@ -352,8 +352,9 @@ func handleTaskGroup(snap *state.StateSnapshot, batch bool, tg *structs.TaskGrou return err } - onDrainingNode = node.DrainStrategy != nil - drainingNodes[node.ID] = onDrainingNode + // Check if the node exists and whether it has a drain strategy + onDrainingNode = node != nil && node.DrainStrategy != nil + drainingNodes[alloc.NodeID] = onDrainingNode } // Check if the alloc should be considered migrated. A migrated diff --git a/nomad/drainer/watch_jobs_test.go b/nomad/drainer/watch_jobs_test.go index b44303a6c35..ad0926c3c33 100644 --- a/nomad/drainer/watch_jobs_test.go +++ b/nomad/drainer/watch_jobs_test.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -663,3 +664,75 @@ func TestHandleTaskGroup_Migrations(t *testing.T) { require.Empty(res.migrated) require.True(res.done) } + +// This test asserts that handle task group works when an allocation is on a +// garbage collected node +func TestHandleTaskGroup_GarbageCollectedNode(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Create a draining node + state := state.TestStateStore(t) + n := mock.Node() + n.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 5 * time.Minute, + }, + ForceDeadline: time.Now().Add(1 * time.Minute), + } + require.Nil(state.UpsertNode(100, n)) + + job := mock.Job() + require.Nil(state.UpsertJob(101, job)) + + // Create 10 done allocs + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + a := mock.Alloc() + a.Job = job + a.TaskGroup = job.TaskGroups[0].Name + a.NodeID = n.ID + a.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(false), + } + + if i%2 == 0 { + a.DesiredStatus = structs.AllocDesiredStatusStop + } else { + a.ClientStatus = structs.AllocClientStatusFailed + } + allocs = append(allocs, a) + } + + // Make the first one be on a GC'd node + allocs[0].NodeID = uuid.Generate() + require.Nil(state.UpsertAllocs(102, allocs)) + + snap, err := state.Snapshot() + require.Nil(err) + + // Handle before and after indexes as both service and batch + res := newJobResult() + require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 101, res)) + require.Empty(res.drain) + require.Len(res.migrated, 9) + require.True(res.done) + + res = newJobResult() + require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 101, res)) + require.Empty(res.drain) + require.Len(res.migrated, 9) + require.True(res.done) + + res = newJobResult() + require.Nil(handleTaskGroup(snap, false, job.TaskGroups[0], allocs, 103, res)) + require.Empty(res.drain) + require.Empty(res.migrated) + require.True(res.done) + + res = newJobResult() + require.Nil(handleTaskGroup(snap, true, job.TaskGroups[0], allocs, 103, res)) + require.Empty(res.drain) + require.Empty(res.migrated) + require.True(res.done) +} diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 315a538f7da..61a6c15870b 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -11,6 +11,7 @@ import ( memdb "github.com/hashicorp/go-memdb" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -658,6 +659,171 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { }) } +func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create two nodes, registering the second later + n1, n2 := mock.Node(), mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Create a service job that runs on just one + job := mock.Job() + job.TaskGroups[0].Count = 2 + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + job.CreateIndex = resp.JobModifyIndex + + // Create a system job + sysjob := mock.SystemJob() + req = &structs.JobRegisterRequest{ + Job: sysjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + sysjob.CreateIndex = resp.JobModifyIndex + + // Create a batch job + bjob := mock.BatchJob() + bjob.TaskGroups[0].Count = 2 + req = &structs.JobRegisterRequest{ + Job: bjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + bjob.CreateIndex = resp.JobModifyIndex + + // Wait for the allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Create some old terminal allocs for each job that point at a non-existent + // node to simulate it being on a GC'd node. + var badAllocs []*structs.Allocation + for _, job := range []*structs.Job{job, sysjob, bjob} { + alloc := mock.Alloc() + alloc.Namespace = job.Namespace + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.TaskGroup = job.TaskGroups[0].Name + alloc.DesiredStatus = structs.AllocDesiredStatusStop + alloc.ClientStatus = structs.AllocClientStatusComplete + badAllocs = append(badAllocs, alloc) + } + require.NoError(state.UpsertAllocs(1, badAllocs)) + + // Create the second node + nodeReg = &structs.NodeRegisterRequest{ + Node: n2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Drain the node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 2 * time.Second, + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + errCh := make(chan error, 2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + + // Wait for the allocs to be stopped + var finalAllocs []*structs.Allocation + testutil.WaitForResult(func() (bool, error) { + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } + + var err error + finalAllocs, err = state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + for _, alloc := range finalAllocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) + } + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Wait for the allocations to be placed on the other node + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n2.ID) + if err != nil { + return false, err + } + return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + // Test that transistions to force drain work. func TestDrainer_Batch_TransitionToForce(t *testing.T) { t.Parallel() From 913a4d30c6f26897735c59b8c4a1f92579f7ca64 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 25 Apr 2018 13:36:03 -0700 Subject: [PATCH 2/3] Safety guard --- nomad/drainer/draining_node.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/nomad/drainer/draining_node.go b/nomad/drainer/draining_node.go index 21e1f254dab..5a9ee1c1598 100644 --- a/nomad/drainer/draining_node.go +++ b/nomad/drainer/draining_node.go @@ -125,6 +125,11 @@ func (n *drainingNode) DrainingJobs() ([]structs.NamespacedID, error) { n.l.RLock() defer n.l.RUnlock() + // Should never happen + if n.node == nil || n.node.DrainStrategy == nil { + return nil, fmt.Errorf("node doesn't have a drain strategy set") + } + // Retrieve the allocs on the node allocs, err := n.state.AllocsByNode(nil, n.node.ID) if err != nil { From 7bdbe43b164f99d2a9cdd43ecaf94d94d9230918 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 25 Apr 2018 16:01:17 -0700 Subject: [PATCH 3/3] Changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fee8c1f559..e3c9b0b6d00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ IMPROVEMENTS: * ui: Stop job button added to job detail pages [[GH-4189](https://github.com/hashicorp/nomad/pull/4189)] BUG FIXES: + * core: Fix panic when doing a node drain effecting a job that has an + allocation that was on a node that no longer exists + [[GH-4215](https://github.com/hashicorp/nomad/issues/4215)] * driver/exec: Create process group for Windows process and send Ctrl-Break signal on Shutdown [[GH-4153](https://github.com/hashicorp/nomad/pull/4153)] * ui: Alloc stats will continue to poll after a request errors or returns an invalid response [[GH-4195](https://github.com/hashicorp/nomad/pull/4195)]