From 750e7b1ee204828cfb4c201ebc112ca7c74a9a69 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Wed, 21 Apr 2021 12:11:14 -0400 Subject: [PATCH] Migrate all allocs when draining a node (#10411) This fixes a bug affecting drain nodes, where allocs may fail to be migrated if they belong to different namespaces but share the same job name. The reason is that the helper function that creates the migration evals indexed the allocs by job ID without accounting for the namespaces. When job ids clash, only an eval is created for one and the rest of the allocs remain intact. Fixes #10172 --- nomad/drainer/drainer.go | 8 +-- nomad/drainer_int_test.go | 129 ++++++++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 4 ++ 3 files changed, 137 insertions(+), 4 deletions(-) diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index c5d4d6a7cad..2e4e8528ffc 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -392,25 +392,25 @@ func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, er // affected jobs. func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs.Allocation) { // Compute the effected jobs and make the transition map - jobs := make(map[string]*structs.Allocation, 4) + jobs := make(map[structs.NamespacedID]*structs.Allocation, 4) transitions := make(map[string]*structs.DesiredTransition, len(allocs)) for _, alloc := range allocs { transitions[alloc.ID] = &structs.DesiredTransition{ Migrate: helper.BoolToPtr(true), } - jobs[alloc.JobID] = alloc + jobs[alloc.JobNamespacedID()] = alloc } evals := make([]*structs.Evaluation, 0, len(jobs)) now := time.Now().UTC().UnixNano() - for job, alloc := range jobs { + for _, alloc := range jobs { evals = append(evals, &structs.Evaluation{ ID: uuid.Generate(), Namespace: alloc.Namespace, Priority: alloc.Job.Priority, Type: alloc.Job.Type, TriggeredBy: structs.EvalTriggerNodeDrain, - JobID: job, + JobID: alloc.JobID, Status: structs.EvalStatusPending, CreateTime: now, ModifyTime: now, diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 7cfea74abf6..739e2e14df9 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -880,6 +880,135 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } +// TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even +// when they belong to different namespaces and share the same ID +func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create two nodes + n1, n2 := mock.Node(), mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + ns1, ns2 := mock.Namespace(), mock.Namespace() + nses := []*structs.Namespace{ns1, ns2} + nsReg := &structs.NamespaceUpsertRequest{ + Namespaces: nses, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nsResp structs.GenericResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp)) + + for _, ns := range nses { + // Create a job that runs on just one + job := mock.Job() + job.ID = "example" + job.Name = "example" + job.Namespace = ns.Name + job.TaskGroups[0].Count = 1 + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + } + + // Wait for the two allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + iter, err := state.Allocs(nil) + if err != nil { + return false, err + } + + count := 0 + for iter.Next() != nil { + count++ + } + if count != 2 { + return false, fmt.Errorf("expected %d allocs, found %d", 2, count) + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + // Create the second node + nodeReg = &structs.NodeRegisterRequest{ + Node: n2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Drain the first node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 10 * time.Minute, + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + errCh := make(chan error, 2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n2.ID) + if err != nil { + return false, err + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + require.NoError(err) + }) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + require.NoError(err) + }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + // sometimes test gets a duplicate node drain complete event + require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) +} + // Test that transitions to force drain work. func TestDrainer_Batch_TransitionToForce(t *testing.T) { t.Parallel() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8e574cee8a8..6761ca87ad8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -9088,6 +9088,10 @@ type Allocation struct { ModifyTime int64 } +func (a *Allocation) JobNamespacedID() NamespacedID { + return NewNamespacedID(a.JobID, a.Namespace) +} + // Index returns the index of the allocation. If the allocation is from a task // group with count greater than 1, there will be multiple allocations for it. func (a *Allocation) Index() uint {