diff --git a/nomad/drainer/drain_heap.go b/nomad/drainer/drain_heap.go index 2d0a1506e05..3d51513a9c2 100644 --- a/nomad/drainer/drain_heap.go +++ b/nomad/drainer/drain_heap.go @@ -54,19 +54,14 @@ func (d *deadlineHeap) watch() { case <-timer.C: default: } - - var nextDeadline time.Time defer timer.Stop() + var nextDeadline time.Time for { select { case <-d.ctx.Done(): return case <-timer.C: - if nextDeadline.IsZero() { - continue - } - var batch []string d.mu.Lock() @@ -96,7 +91,10 @@ func (d *deadlineHeap) watch() { continue } - if !deadline.Equal(nextDeadline) { + // If the deadline is zero, it is a force drain. Otherwise if the + // deadline is in the future, see if we already have a timer setup to + // handle it. If we don't create the timer. + if deadline.IsZero() || !deadline.Equal(nextDeadline) { timer.Reset(deadline.Sub(time.Now())) nextDeadline = deadline } diff --git a/nomad/drainer/drain_heap_test.go b/nomad/drainer/drain_heap_test.go index 02108e1dfa0..464c413a2c3 100644 --- a/nomad/drainer/drain_heap_test.go +++ b/nomad/drainer/drain_heap_test.go @@ -147,3 +147,34 @@ func TestDeadlineHeap_WatchCoalesce(t *testing.T) { case <-time.After(100 * time.Millisecond): } } + +func TestDeadlineHeap_MultipleForce(t *testing.T) { + t.Parallel() + require := require.New(t) + h := NewDeadlineHeap(context.Background(), 1*time.Second) + + nodeID := "1" + deadline := time.Time{} + h.Watch(nodeID, deadline) + + var batch []string + select { + case batch = <-h.NextBatch(): + case <-time.After(10 * time.Millisecond): + t.Fatal("timeout") + } + + require.Len(batch, 1) + require.Equal(nodeID, batch[0]) + + nodeID = "2" + h.Watch(nodeID, deadline) + select { + case batch = <-h.NextBatch(): + case <-time.After(10 * time.Millisecond): + t.Fatal("timeout") + } + + require.Len(batch, 1) + require.Equal(nodeID, batch[0]) +} diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index d8c1b0a6590..b6cb8895005 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -633,3 +633,138 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { t.Fatalf("err: %v", err) }) } + +// Test that transistions to force drain work. +func TestDrainer_Batch_TransistionToForce(t *testing.T) { + t.Parallel() + require := require.New(t) + + for _, inf := range []bool{true, false} { + name := "Infinite" + if !inf { + name = "Deadline" + } + t.Run(name, func(t *testing.T) { + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create a node + n1 := mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Create a batch job + bjob := mock.BatchJob() + bjob.TaskGroups[0].Count = 2 + req := &structs.JobRegisterRequest{ + Job: bjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: bjob.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + + // Wait for the allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Pick the deadline + deadline := 0 * time.Second + if !inf { + deadline = 10 * time.Second + } + + // Drain the node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: deadline, + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) + + // Make sure the batch job isn't affected + testutil.AssertUntil(500*time.Millisecond, func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + for _, alloc := range allocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusRun { + return false, fmt.Errorf("got status %v", alloc.DesiredStatus) + } + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Foce drain the node + drainReq = &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: -1 * time.Second, // Infinite + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Make sure the batch job is migrated + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + for _, alloc := range allocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return false, fmt.Errorf("got status %v", alloc.DesiredStatus) + } + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + }) + } +}