Skip to content

Commit

Permalink
Merge pull request #4086 from hashicorp/b-force
Browse files Browse the repository at this point in the history
Fix issue where force deadlines weren't respected
  • Loading branch information
dadgar authored Mar 30, 2018
2 parents 97d39b1 + aa7df6a commit 1188529
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 7 deletions.
12 changes: 5 additions & 7 deletions nomad/drainer/drain_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,14 @@ func (d *deadlineHeap) watch() {
case <-timer.C:
default:
}

var nextDeadline time.Time
defer timer.Stop()

var nextDeadline time.Time
for {
select {
case <-d.ctx.Done():
return
case <-timer.C:
if nextDeadline.IsZero() {
continue
}

var batch []string

d.mu.Lock()
Expand Down Expand Up @@ -96,7 +91,10 @@ func (d *deadlineHeap) watch() {
continue
}

if !deadline.Equal(nextDeadline) {
// If the deadline is zero, it is a force drain. Otherwise if the
// deadline is in the future, see if we already have a timer setup to
// handle it. If we don't create the timer.
if deadline.IsZero() || !deadline.Equal(nextDeadline) {
timer.Reset(deadline.Sub(time.Now()))
nextDeadline = deadline
}
Expand Down
31 changes: 31 additions & 0 deletions nomad/drainer/drain_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,34 @@ func TestDeadlineHeap_WatchCoalesce(t *testing.T) {
case <-time.After(100 * time.Millisecond):
}
}

func TestDeadlineHeap_MultipleForce(t *testing.T) {
t.Parallel()
require := require.New(t)
h := NewDeadlineHeap(context.Background(), 1*time.Second)

nodeID := "1"
deadline := time.Time{}
h.Watch(nodeID, deadline)

var batch []string
select {
case batch = <-h.NextBatch():
case <-time.After(10 * time.Millisecond):
t.Fatal("timeout")
}

require.Len(batch, 1)
require.Equal(nodeID, batch[0])

nodeID = "2"
h.Watch(nodeID, deadline)
select {
case batch = <-h.NextBatch():
case <-time.After(10 * time.Millisecond):
t.Fatal("timeout")
}

require.Len(batch, 1)
require.Equal(nodeID, batch[0])
}
135 changes: 135 additions & 0 deletions nomad/drainer_int_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,3 +633,138 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) {
t.Fatalf("err: %v", err)
})
}

// Test that transistions to force drain work.
func TestDrainer_Batch_TransistionToForce(t *testing.T) {
t.Parallel()
require := require.New(t)

for _, inf := range []bool{true, false} {
name := "Infinite"
if !inf {
name = "Deadline"
}
t.Run(name, func(t *testing.T) {
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create a node
n1 := mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))

// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: bjob.Namespace,
},
}

// Fetch the response
var resp structs.JobRegisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)

// Wait for the allocations to be placed
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Pick the deadline
deadline := 0 * time.Second
if !inf {
deadline = 10 * time.Second
}

// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: deadline,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))

// Wait for the allocs to be replaced
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger)

// Make sure the batch job isn't affected
testutil.AssertUntil(500*time.Millisecond, func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
for _, alloc := range allocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusRun {
return false, fmt.Errorf("got status %v", alloc.DesiredStatus)
}
}
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Foce drain the node
drainReq = &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: -1 * time.Second, // Infinite
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))

// Make sure the batch job is migrated
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
for _, alloc := range allocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return false, fmt.Errorf("got status %v", alloc.DesiredStatus)
}
}
return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})

// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
}, func(err error) {
t.Fatalf("err: %v", err)
})
})
}
}

0 comments on commit 1188529

Please sign in to comment.