Skip to content

Commit

Permalink
Migrate all allocs when draining a node (#10411)
Browse files Browse the repository at this point in the history
This fixes a bug affecting drain nodes, where allocs may fail to be
migrated if they belong to different namespaces but share the same job
name.

The reason is that the helper function that creates the migration evals
indexed the allocs by job ID without accounting for the namespaces.
When job ids clash, only an eval is created for one and the rest of the
allocs remain intact.

Fixes #10172
  • Loading branch information
Mahmood Ali authored and schmichael committed May 14, 2021
1 parent 7387bd5 commit 750e7b1
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 4 deletions.
8 changes: 4 additions & 4 deletions nomad/drainer/drainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,25 +392,25 @@ func (n *NodeDrainer) batchDrainAllocs(allocs []*structs.Allocation) (uint64, er
// affected jobs.
func (n *NodeDrainer) drainAllocs(future *structs.BatchFuture, allocs []*structs.Allocation) {
// Compute the effected jobs and make the transition map
jobs := make(map[string]*structs.Allocation, 4)
jobs := make(map[structs.NamespacedID]*structs.Allocation, 4)
transitions := make(map[string]*structs.DesiredTransition, len(allocs))
for _, alloc := range allocs {
transitions[alloc.ID] = &structs.DesiredTransition{
Migrate: helper.BoolToPtr(true),
}
jobs[alloc.JobID] = alloc
jobs[alloc.JobNamespacedID()] = alloc
}

evals := make([]*structs.Evaluation, 0, len(jobs))
now := time.Now().UTC().UnixNano()
for job, alloc := range jobs {
for _, alloc := range jobs {
evals = append(evals, &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
Priority: alloc.Job.Priority,
Type: alloc.Job.Type,
TriggeredBy: structs.EvalTriggerNodeDrain,
JobID: job,
JobID: alloc.JobID,
Status: structs.EvalStatusPending,
CreateTime: now,
ModifyTime: now,
Expand Down
129 changes: 129 additions & 0 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,135 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) {
require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined)
}

// TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even
// when they belong to different namespaces and share the same ID
func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) {
t.Parallel()
require := require.New(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create two nodes
n1, n2 := mock.Node(), mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

ns1, ns2 := mock.Namespace(), mock.Namespace()
nses := []*structs.Namespace{ns1, ns2}
nsReg := &structs.NamespaceUpsertRequest{
Namespaces: nses,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nsResp structs.GenericResponse
require.NoError(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp))

for _, ns := range nses {
// Create a job that runs on just one
job := mock.Job()
job.ID = "example"
job.Name = "example"
job.Namespace = ns.Name
job.TaskGroups[0].Count = 1
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
}

// Wait for the two allocations to be placed
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
iter, err := state.Allocs(nil)
if err != nil {
return false, err
}

count := 0
for iter.Next() != nil {
count++
}
if count != 2 {
return false, fmt.Errorf("expected %d allocs, found %d", 2, count)
}
return true, nil
}, func(err error) {
require.NoError(err)
})

// Create the second node
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

// Drain the first node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 10 * time.Minute,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))

// Wait for the allocs to be replaced
errCh := make(chan error, 2)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger)

testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
require.NoError(err)
})

// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
if err := checkAllocPromoter(errCh); err != nil {
return false, err
}
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
}, func(err error) {
require.NoError(err)
})

// Check we got the right events
node, err := state.NodeByID(nil, n1.ID)
require.NoError(err)
// sometimes test gets a duplicate node drain complete event
require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events)
require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message)
}

// Test that transitions to force drain work.
func TestDrainer_Batch_TransitionToForce(t *testing.T) {
t.Parallel()
Expand Down
4 changes: 4 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9088,6 +9088,10 @@ type Allocation struct {
ModifyTime int64
}

func (a *Allocation) JobNamespacedID() NamespacedID {
return NewNamespacedID(a.JobID, a.Namespace)
}

// Index returns the index of the allocation. If the allocation is from a task
// group with count greater than 1, there will be multiple allocations for it.
func (a *Allocation) Index() uint {
Expand Down

0 comments on commit 750e7b1

Please sign in to comment.