From 050ace0edeca92873c2edd1e005c0c912dfbc011 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 20 Apr 2021 10:56:18 -0400 Subject: [PATCH 1/2] add a failing test --- nomad/drainer_int_test.go | 129 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 7cfea74abf6..739e2e14df9 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -880,6 +880,135 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } +// TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even +// when they belong to different namespaces and share the same ID +func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create two nodes + n1, n2 := mock.Node(), mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + ns1, ns2 := mock.Namespace(), mock.Namespace() + nses := []*structs.Namespace{ns1, ns2} + nsReg := &structs.NamespaceUpsertRequest{ + Namespaces: nses, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nsResp structs.GenericResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp)) + + for _, ns := range nses { + // Create a job that runs on just one + job := mock.Job() + job.ID = "example" + job.Name = "example" + job.Namespace = ns.Name + job.TaskGroups[0].Count = 1 + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + } + + // Wait for the two allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + iter, err := state.Allocs(nil) + if err != nil { + return false, err + } + + count := 0 + for iter.Next() != nil { + count++ + } + if count != 2 { + return false, fmt.Errorf("expected %d allocs, found %d", 2, count) + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + // Create the second node + nodeReg = &structs.NodeRegisterRequest{ + Node: n2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Drain the first node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 10 * time.Minute, + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + errCh := make(chan error, 2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n2.ID) + if err != nil { + return false, err + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + require.NoError(err) + }) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + require.NoError(err) + }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + // sometimes test gets a duplicate node drain complete event + require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) +} + // Test that transitions to force drain work. func TestDrainer_Batch_TransitionToForce(t *testing.T) { t.Parallel() From f9a46559814d755006a808e50df066df574055bf Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 20 Apr 2021 10:59:05 -0400 Subject: [PATCH 2/2] Migrate all allocs when draining a node This fixes a bug affecting drain nodes, where allocs may fail to be migrated if they belong to different namespaces but share the same job name. The reason is that the helper function that creates the migration evals indexed the allocs by job ID without accounting for the namespaces. When job ids clash, only an eval is created for one and the rest of the allocs remain unmigrated. --- nomad/drainer/drainer.go | 8 ++++---- nomad/structs/structs.go | 4 ++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index c5d4d6a7cad..2e4e8528ffc 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -392,25 +392,25 @@ func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, er // affected jobs. func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs.Allocation) { // Compute the effected jobs and make the transition map - jobs := make(map[string]*structs.Allocation, 4) + jobs := make(map[structs.NamespacedID]*structs.Allocation, 4) transitions := make(map[string]*structs.DesiredTransition, len(allocs)) for _, alloc := range allocs { transitions[alloc.ID] = &structs.DesiredTransition{ Migrate: helper.BoolToPtr(true), } - jobs[alloc.JobID] = alloc + jobs[alloc.JobNamespacedID()] = alloc } evals := make([]*structs.Evaluation, 0, len(jobs)) now := time.Now().UTC().UnixNano() - for job, alloc := range jobs { + for _, alloc := range jobs { evals = append(evals, &structs.Evaluation{ ID: uuid.Generate(), Namespace: alloc.Namespace, Priority: alloc.Job.Priority, Type: alloc.Job.Type, TriggeredBy: structs.EvalTriggerNodeDrain, - JobID: job, + JobID: alloc.JobID, Status: structs.EvalStatusPending, CreateTime: now, ModifyTime: now, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6f1b110401a..0ea27cb24b9 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -9184,6 +9184,10 @@ func (a *Allocation) ConsulNamespace() string { return a.Job.LookupTaskGroup(a.TaskGroup).Consul.GetNamespace() } +func (a *Allocation) JobNamespacedID() NamespacedID { + return NewNamespacedID(a.JobID, a.Namespace) +} + // Index returns the index of the allocation. If the allocation is from a task // group with count greater than 1, there will be multiple allocations for it. func (a *Allocation) Index() uint {