diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index c5d4d6a7cad..2e4e8528ffc 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -392,25 +392,25 @@ func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, er // affected jobs. func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs.Allocation) { // Compute the effected jobs and make the transition map - jobs := make(map[string]*structs.Allocation, 4) + jobs := make(map[structs.NamespacedID]*structs.Allocation, 4) transitions := make(map[string]*structs.DesiredTransition, len(allocs)) for _, alloc := range allocs { transitions[alloc.ID] = &structs.DesiredTransition{ Migrate: helper.BoolToPtr(true), } - jobs[alloc.JobID] = alloc + jobs[alloc.JobNamespacedID()] = alloc } evals := make([]*structs.Evaluation, 0, len(jobs)) now := time.Now().UTC().UnixNano() - for job, alloc := range jobs { + for _, alloc := range jobs { evals = append(evals, &structs.Evaluation{ ID: uuid.Generate(), Namespace: alloc.Namespace, Priority: alloc.Job.Priority, Type: alloc.Job.Type, TriggeredBy: structs.EvalTriggerNodeDrain, - JobID: job, + JobID: alloc.JobID, Status: structs.EvalStatusPending, CreateTime: now, ModifyTime: now, diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index 7cfea74abf6..739e2e14df9 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -880,6 +880,135 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) } +// TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even +// when they belong to different namespaces and share the same ID +func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create two nodes + n1, n2 := mock.Node(), mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + ns1, ns2 := mock.Namespace(), mock.Namespace() + nses := []*structs.Namespace{ns1, ns2} + nsReg := &structs.NamespaceUpsertRequest{ + Namespaces: nses, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nsResp structs.GenericResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp)) + + for _, ns := range nses { + // Create a job that runs on just one + job := mock.Job() + job.ID = "example" + job.Name = "example" + job.Namespace = ns.Name + job.TaskGroups[0].Count = 1 + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + } + + // Wait for the two allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + iter, err := state.Allocs(nil) + if err != nil { + return false, err + } + + count := 0 + for iter.Next() != nil { + count++ + } + if count != 2 { + return false, fmt.Errorf("expected %d allocs, found %d", 2, count) + } + return true, nil + }, func(err error) { + require.NoError(err) + }) + + // Create the second node + nodeReg = &structs.NodeRegisterRequest{ + Node: n2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Drain the first node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 10 * time.Minute, + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + errCh := make(chan error, 2) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n2.ID) + if err != nil { + return false, err + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + require.NoError(err) + }) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + if err := checkAllocPromoter(errCh); err != nil { + return false, err + } + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + require.NoError(err) + }) + + // Check we got the right events + node, err := state.NodeByID(nil, n1.ID) + require.NoError(err) + // sometimes test gets a duplicate node drain complete event + require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) + require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) +} + // Test that transitions to force drain work. func TestDrainer_Batch_TransitionToForce(t *testing.T) { t.Parallel() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8e574cee8a8..6761ca87ad8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -9088,6 +9088,10 @@ type Allocation struct { ModifyTime int64 } +func (a *Allocation) JobNamespacedID() NamespacedID { + return NewNamespacedID(a.JobID, a.Namespace) +} + // Index returns the index of the allocation. If the allocation is from a task // group with count greater than 1, there will be multiple allocations for it. func (a *Allocation) Index() uint {