From cd6d34425fa35b9ebadabb8d64bc48a9ef512d73 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Wed, 13 May 2020 16:39:04 -0400 Subject: [PATCH] server: stop after client disconnect (#7939) * jobspec, api: add stop_after_client_disconnect * nomad/state/state_store: error message typo * structs: alloc methods to support stop_after_client_disconnect 1. a global AllocStates to track status changes with timestamps. We need this to track the time at which the alloc became lost originally. 2. ShouldClientStop() and WaitClientStop() to actually do the math * scheduler/reconcile_util: delayByStopAfterClientDisconnect * scheduler/reconcile: use delayByStopAfterClientDisconnect * scheduler/util: updateNonTerminalAllocsToLost comments This was setup to only update allocs to lost if the DesiredStatus had already been set by the scheduler. It seems like the intention was to update the status from any non-terminal state, and not all lost allocs have been marked stop or evict by now * scheduler/testing: AssertEvalStatus just use require * scheduler/generic_sched: don't create a blocked eval if delayed * scheduler/generic_sched_test: several scheduling cases --- api/tasks.go | 35 +++---- client/heartbeatstop.go | 4 +- command/agent/job_endpoint.go | 4 + jobspec/parse_group.go | 1 + jobspec/parse_test.go | 1 + jobspec/test-fixtures/basic.hcl | 2 + nomad/state/state_store.go | 2 +- nomad/structs/diff.go | 14 +++ nomad/structs/structs.go | 87 +++++++++++++++++ nomad/structs/structs_test.go | 69 +++++++++++++- scheduler/generic_sched.go | 8 +- scheduler/generic_sched_test.go | 162 ++++++++++++++++++++++++++++++++ scheduler/reconcile.go | 20 +++- scheduler/reconcile_util.go | 22 +++++ scheduler/testing.go | 10 +- scheduler/util.go | 7 +- 16 files changed, 411 insertions(+), 37 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index b25ac736852..3ab4523a302 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -411,23 +411,24 @@ func (vm *VolumeMount) Canonicalize() { // TaskGroup is the unit of scheduling. type TaskGroup struct { - Name *string - Count *int - Constraints []*Constraint - Affinities []*Affinity - Tasks []*Task - Spreads []*Spread - Volumes map[string]*VolumeRequest - RestartPolicy *RestartPolicy - ReschedulePolicy *ReschedulePolicy - EphemeralDisk *EphemeralDisk - Update *UpdateStrategy - Migrate *MigrateStrategy - Networks []*NetworkResource - Meta map[string]string - Services []*Service - ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"` - Scaling *ScalingPolicy + Name *string + Count *int + Constraints []*Constraint + Affinities []*Affinity + Tasks []*Task + Spreads []*Spread + Volumes map[string]*VolumeRequest + RestartPolicy *RestartPolicy + ReschedulePolicy *ReschedulePolicy + EphemeralDisk *EphemeralDisk + Update *UpdateStrategy + Migrate *MigrateStrategy + Networks []*NetworkResource + Meta map[string]string + Services []*Service + ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"` + StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect"` + Scaling *ScalingPolicy } // NewTaskGroup creates a new TaskGroup. diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index 05e9b1f5e4e..7d9570e7d07 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -91,7 +91,7 @@ func (h *heartbeatStop) watch() { select { case allocID := <-stop: if err := h.stopAlloc(allocID); err != nil { - h.logger.Warn("stopping alloc %s on heartbeat timeout failed: %v", allocID, err) + h.logger.Warn("error stopping on heartbeat timeout", "alloc", allocID, "error", err) continue } delete(h.allocInterval, allocID) @@ -142,6 +142,8 @@ func (h *heartbeatStop) stopAlloc(allocID string) error { return err } + h.logger.Debug("stopping alloc for stop_after_client_disconnect", "alloc", allocID) + runner.Destroy() return nil } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 09bac74cc2e..5b3d36ee870 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -793,6 +793,10 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta tg.ShutdownDelay = taskGroup.ShutdownDelay } + if taskGroup.StopAfterClientDisconnect != nil { + tg.StopAfterClientDisconnect = taskGroup.StopAfterClientDisconnect + } + if taskGroup.ReschedulePolicy != nil { tg.ReschedulePolicy = &structs.ReschedulePolicy{ Attempts: *taskGroup.ReschedulePolicy.Attempts, diff --git a/jobspec/parse_group.go b/jobspec/parse_group.go index 731018c6ba4..db78ec538c6 100644 --- a/jobspec/parse_group.go +++ b/jobspec/parse_group.go @@ -56,6 +56,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { "service", "volume", "scaling", + "stop_after_client_disconnect", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 88fca2894c6..a676d8f5b19 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -184,6 +184,7 @@ func TestParse(t *testing.T) { }, }, }, + StopAfterClientDisconnect: helper.TimeToPtr(120 * time.Second), ReschedulePolicy: &api.ReschedulePolicy{ Interval: helper.TimeToPtr(12 * time.Hour), Attempts: helper.IntToPtr(5), diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index dfa63caf654..8b2f9ef74d6 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -152,6 +152,8 @@ job "binstore-storagelocker" { } } + stop_after_client_disconnect = "120s" + task "binstore" { driver = "docker" user = "bob" diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index fbb1e3e57c2..9b0eca9cc16 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4623,7 +4623,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat } case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: default: - s.logger.Error("invalid old client status for allocatio", + s.logger.Error("invalid old client status for allocation", "alloc_id", existingAlloc.ID, "client_status", existingAlloc.ClientStatus) } summaryChanged = true diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index 34533c0584c..ebd927397ac 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -238,6 +238,20 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er } } + // StopAfterClientDisconnect diff + if oldPrimitiveFlat != nil && newPrimitiveFlat != nil { + if tg.StopAfterClientDisconnect == nil { + oldPrimitiveFlat["StopAfterClientDisconnect"] = "" + } else { + oldPrimitiveFlat["StopAfterClientDisconnect"] = fmt.Sprintf("%d", *tg.StopAfterClientDisconnect) + } + if other.StopAfterClientDisconnect == nil { + newPrimitiveFlat["StopAfterClientDisconnect"] = "" + } else { + newPrimitiveFlat["StopAfterClientDisconnect"] = fmt.Sprintf("%d", *other.StopAfterClientDisconnect) + } + } + // Diff the primitive fields. diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, false) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 680b6893897..f56fd8519cc 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3799,6 +3799,10 @@ func (j *Job) Validate() error { mErr.Errors = append(mErr.Errors, errors.New("ShutdownDelay must be a positive value")) } + if tg.StopAfterClientDisconnect != nil && *tg.StopAfterClientDisconnect < 0 { + mErr.Errors = append(mErr.Errors, errors.New("StopAfterClientDisconnect must be a positive value")) + } + if j.Type == "system" && tg.Count > 1 { mErr.Errors = append(mErr.Errors, fmt.Errorf("Job task group %s has count %d. Count cannot exceed 1 with system scheduler", @@ -5265,6 +5269,10 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.ShutdownDelay = tg.ShutdownDelay } + if tg.StopAfterClientDisconnect != nil { + ntg.StopAfterClientDisconnect = tg.StopAfterClientDisconnect + } + return ntg } @@ -6516,6 +6524,19 @@ func (t *Template) Warnings() error { return mErr.ErrorOrNil() } +// AllocState records a single event that changes the state of the whole allocation +type AllocStateField uint8 + +const ( + AllocStateFieldClientStatus AllocStateField = iota +) + +type AllocState struct { + Field AllocStateField + Value string + Time time.Time +} + // Set of possible states for a task. const ( TaskStatePending = "pending" // The task is waiting to be run. @@ -8152,6 +8173,9 @@ type Allocation struct { // TaskStates stores the state of each task, TaskStates map[string]*TaskState + // AllocStates track meta data associated with changes to the state of the whole allocation, like becoming lost + AllocStates []*AllocState + // PreviousAllocation is the allocation that this allocation is replacing PreviousAllocation string @@ -8420,6 +8444,49 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) { return nextRescheduleTime, rescheduleEligible } +// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration +func (a *Allocation) ShouldClientStop() bool { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + if tg == nil || + tg.StopAfterClientDisconnect == nil || + *tg.StopAfterClientDisconnect == 0*time.Nanosecond { + return false + } + return true +} + +// WaitClientStop uses the reschedule delay mechanism to block rescheduling until +// StopAfterClientDisconnect's block interval passes +func (a *Allocation) WaitClientStop() time.Time { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + + // An alloc can only be marked lost once, so use the first lost transition + var t time.Time + for _, s := range a.AllocStates { + if s.Field == AllocStateFieldClientStatus && + s.Value == AllocClientStatusLost { + t = s.Time + break + } + } + + // On the first pass, the alloc hasn't been marked lost yet, and so we start + // counting from now + if t.IsZero() { + t = time.Now().UTC() + } + + // Find the max kill timeout + kill := DefaultKillTimeout + for _, t := range tg.Tasks { + if t.KillTimeout > kill { + kill = t.KillTimeout + } + } + + return t.Add(*tg.StopAfterClientDisconnect + kill) +} + // NextDelay returns a duration after which the allocation can be rescheduled. // It is calculated according to the delay function and previous reschedule attempts. func (a *Allocation) NextDelay() time.Duration { @@ -8476,6 +8543,24 @@ func (a *Allocation) Terminated() bool { return false } +// SetStopped updates the allocation in place to a DesiredStatus stop, with the ClientStatus +func (a *Allocation) SetStop(clientStatus, clientDesc string) { + a.DesiredStatus = AllocDesiredStatusStop + a.ClientStatus = clientStatus + a.ClientDescription = clientDesc + a.AppendState(AllocStateFieldClientStatus, clientStatus) +} + +// AppendState creates and appends an AllocState entry recording the time of the state +// transition. Used to mark the transition to lost +func (a *Allocation) AppendState(field AllocStateField, value string) { + a.AllocStates = append(a.AllocStates, &AllocState{ + Field: field, + Value: value, + Time: time.Now().UTC(), + }) +} + // RanSuccessfully returns whether the client has ran the allocation and all // tasks finished successfully. Critically this function returns whether the // allocation has ran to completion and not just that the alloc has converged to @@ -9384,6 +9469,8 @@ func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus s newAlloc.ClientStatus = clientStatus } + newAlloc.AppendState(AllocStateFieldClientStatus, clientStatus) + node := alloc.NodeID existing := p.NodeUpdate[node] p.NodeUpdate[node] = append(existing, newAlloc) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 96529be7c9e..a160a3ec916 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -3595,13 +3595,21 @@ func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) { plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost) - appendedAlloc := plan.NodeUpdate[alloc.NodeID][0] expectedAlloc := new(Allocation) *expectedAlloc = *alloc expectedAlloc.DesiredDescription = desiredDesc expectedAlloc.DesiredStatus = AllocDesiredStatusStop expectedAlloc.ClientStatus = AllocClientStatusLost expectedAlloc.Job = nil + expectedAlloc.AllocStates = []*AllocState{{ + Field: AllocStateFieldClientStatus, + Value: "lost", + }} + + // This value is set to time.Now() in AppendStoppedAlloc, so clear it + appendedAlloc := plan.NodeUpdate[alloc.NodeID][0] + appendedAlloc.AllocStates[0].Time = time.Time{} + assert.Equal(t, expectedAlloc, appendedAlloc) assert.Equal(t, alloc.Job, plan.Job) } @@ -4372,6 +4380,65 @@ func TestAllocation_NextDelay(t *testing.T) { } +func TestAllocation_WaitClientStop(t *testing.T) { + type testCase struct { + desc string + stop time.Duration + status string + expectedShould bool + expectedRescheduleTime time.Time + } + now := time.Now().UTC() + testCases := []testCase{ + { + desc: "running", + stop: 2 * time.Second, + status: AllocClientStatusRunning, + expectedShould: true, + }, + { + desc: "no stop_after_client_disconnect", + status: AllocClientStatusLost, + expectedShould: false, + }, + { + desc: "stop", + status: AllocClientStatusLost, + stop: 2 * time.Second, + expectedShould: true, + expectedRescheduleTime: now.Add((2 + 5) * time.Second), + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + j := testJob() + a := &Allocation{ + ClientStatus: tc.status, + Job: j, + TaskStates: map[string]*TaskState{}, + } + + if tc.status == AllocClientStatusLost { + a.AppendState(AllocStateFieldClientStatus, AllocClientStatusLost) + } + + j.TaskGroups[0].StopAfterClientDisconnect = &tc.stop + a.TaskGroup = j.TaskGroups[0].Name + + require.Equal(t, tc.expectedShould, a.ShouldClientStop()) + + if !tc.expectedShould || tc.status != AllocClientStatusLost { + return + } + + // the reschedTime is close to the expectedRescheduleTime + reschedTime := a.WaitClientStop() + e := reschedTime.Unix() - tc.expectedRescheduleTime.Unix() + require.Less(t, e, int64(2)) + }) + } +} + func TestAllocation_Canonicalize_Old(t *testing.T) { alloc := MockAlloc() alloc.AllocatedResources = nil diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index f47e02500ff..6e42c297ac9 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -259,8 +259,10 @@ func (s *GenericScheduler) process() (bool, error) { // If there are failed allocations, we need to create a blocked evaluation // to place the failed allocations when resources become available. If the // current evaluation is already a blocked eval, we reuse it by submitting - // a new eval to the planner in createBlockedEval - if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil { + // a new eval to the planner in createBlockedEval. If the current eval is + // pending with WaitUntil set, it's delayed rather than blocked. + if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil && + s.eval.WaitUntil.IsZero() { if err := s.createBlockedEval(false); err != nil { s.logger.Error("failed to make blocked eval", "error", err) return false, err @@ -338,7 +340,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Update the allocations which are in pending/running state on tainted - // nodes to lost + // nodes to lost, but only if the scheduler has already marked them updateNonTerminalAllocsToLost(s.plan, tainted, allocs) reconciler := NewAllocReconciler(s.logger, diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index deb6ddecd60..779c3881918 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -2768,6 +2769,167 @@ func TestServiceSched_NodeDown(t *testing.T) { } } +func TestServiceSched_StopAfterClientDisconnect(t *testing.T) { + cases := []struct { + stop time.Duration + when time.Time + rescheduled bool + }{ + { + rescheduled: true, + }, + { + stop: 1 * time.Second, + rescheduled: false, + }, + { + stop: 1 * time.Second, + when: time.Now().UTC().Add(-10 * time.Second), + rescheduled: true, + }, + { + stop: 1 * time.Second, + when: time.Now().UTC().Add(10 * time.Minute), + rescheduled: false, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf(""), func(t *testing.T) { + h := NewHarness(t) + + // Node, which is down + node := mock.Node() + node.Status = structs.NodeStatusDown + require.NoError(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Job with allocations and stop_after_client_disconnect + job := mock.Job() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].StopAfterClientDisconnect = &tc.stop + require.NoError(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Alloc for the running group + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = structs.AllocClientStatusRunning + if !tc.when.IsZero() { + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusLost, + Time: tc.when, + }} + } + allocs := []*structs.Allocation{alloc} + require.NoError(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deal with drain + evals := []*structs.Evaluation{{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: job.ID, + NodeID: node.ID, + Status: structs.EvalStatusPending, + }} + eval := evals[0] + require.NoError(t, h.State.UpsertEvals(h.NextIndex(), evals)) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + require.NoError(t, err) + require.Equal(t, h.Evals[0].Status, structs.EvalStatusComplete) + require.Len(t, h.Plans, 1, "plan") + + // Followup eval created + require.True(t, len(h.CreateEvals) > 0) + e := h.CreateEvals[0] + require.Equal(t, eval.ID, e.PreviousEval) + + if tc.rescheduled { + require.Equal(t, "blocked", e.Status) + } else { + require.Equal(t, "pending", e.Status) + require.NotEmpty(t, e.WaitUntil) + } + + // This eval is still being inserted in the state store + ws := memdb.NewWatchSet() + testutil.WaitForResult(func() (bool, error) { + found, err := h.State.EvalByID(ws, e.ID) + if err != nil { + return false, err + } + if found == nil { + return false, nil + } + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + alloc, err = h.State.AllocByID(ws, alloc.ID) + require.NoError(t, err) + + // Allocations have been transitioned to lost + require.Equal(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus) + require.Equal(t, structs.AllocClientStatusLost, alloc.ClientStatus) + // At least 1, 2 if we manually set the tc.when + require.NotEmpty(t, alloc.AllocStates) + + if tc.rescheduled { + // Register a new node, leave it up, process the followup eval + node = mock.Node() + require.NoError(t, h.State.UpsertNode(h.NextIndex(), node)) + require.NoError(t, h.Process(NewServiceScheduler, eval)) + + as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + require.NoError(t, err) + + testutil.WaitForResult(func() (bool, error) { + as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + if err != nil { + return false, err + } + return len(as) == 2, nil + }, func(err error) { + require.NoError(t, err) + }) + + a2 := as[0] + if a2.ID == alloc.ID { + a2 = as[1] + } + + require.Equal(t, structs.AllocClientStatusPending, a2.ClientStatus) + require.Equal(t, structs.AllocDesiredStatusRun, a2.DesiredStatus) + require.Equal(t, node.ID, a2.NodeID) + + // No blocked evals + require.Empty(t, h.ReblockEvals) + require.Len(t, h.CreateEvals, 1) + require.Equal(t, h.CreateEvals[0].ID, e.ID) + } else { + // No new alloc was created + as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + require.NoError(t, err) + + require.Len(t, as, 1) + old := as[0] + + require.Equal(t, alloc.ID, old.ID) + require.Equal(t, structs.AllocClientStatusLost, old.ClientStatus) + require.Equal(t, structs.AllocDesiredStatusStop, old.DesiredStatus) + } + }) + } +} + func TestServiceSched_NodeUpdate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index ccddfa07f85..a3f79180606 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -353,10 +353,20 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Determine what set of terminal allocations need to be rescheduled untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment) + // Find delays for any lost allocs that have stop_after_client_disconnect + lostLater := lost.delayByStopAfterClientDisconnect() + rescheduleLater = append(rescheduleLater, lostLater...) + // Create batched follow up evaluations for allocations that are // reschedulable later and mark the allocations for in place updating a.handleDelayedReschedules(rescheduleLater, all, tg.Name) + // Allocs that are lost and delayed have an attributeUpdate that correctly links to + // the eval, but incorrectly has the current (running) status + for _, d := range lostLater { + a.result.attributeUpdates[d.allocID].SetStop(structs.AllocClientStatusLost, structs.AllocClientStatusLost) + } + // Create a structure for choosing names. Seed with the taken names which is // the union of untainted and migrating nodes (includes canaries) nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow)) @@ -413,9 +423,13 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // * The deployment is not paused or failed // * Not placing any canaries // * If there are any canaries that they have been promoted - place := a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow) - if !existingDeployment { - dstate.DesiredTotal += len(place) + // * There is no delayed stop_after_client_disconnect alloc + var place []allocPlaceResult + if len(lostLater) == 0 { + place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow) + if !existingDeployment { + dstate.DesiredTotal += len(place) + } } // deploymentPlaceReady tracks whether the deployment is in a state where diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index ec6c264735f..e8da880dbf9 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -370,6 +370,28 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) { return } +// delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a +// stop_after_client_disconnect configured +func (as allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) { + now := time.Now().UTC() + for _, a := range as { + if !a.ShouldClientStop() { + continue + } + + t := a.WaitClientStop() + + if t.After(now) { + later = append(later, &delayedRescheduleInfo{ + allocID: a.ID, + alloc: a, + rescheduleTime: t, + }) + } + } + return later +} + // allocNameIndex is used to select allocation names for placement or removal // given an existing set of placed allocations. type allocNameIndex struct { diff --git a/scheduler/testing.go b/scheduler/testing.go index cb6059c5469..c1d8776b4ad 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -6,6 +6,7 @@ import ( "time" testing "github.com/mitchellh/go-testing-interface" + "github.com/stretchr/testify/require" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/testlog" @@ -272,12 +273,7 @@ func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error { } func (h *Harness) AssertEvalStatus(t testing.T, state string) { - if len(h.Evals) != 1 { - t.Fatalf("bad: %#v", h.Evals) - } + require.Len(t, h.Evals, 1) update := h.Evals[0] - - if update.Status != state { - t.Fatalf("bad: %#v", update) - } + require.Equal(t, state, update.Status) } diff --git a/scheduler/util.go b/scheduler/util.go index ee18e30007f..0890f32dfca 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -812,8 +812,8 @@ func adjustQueuedAllocations(logger log.Logger, result *structs.PlanResult, queu } } -// updateNonTerminalAllocsToLost updates the allocations which are in pending/running state on tainted node -// to lost +// updateNonTerminalAllocsToLost updates the allocations which are in pending/running state +// on tainted node to lost, but only for allocs already DesiredStatus stop or evict func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*structs.Node, allocs []*structs.Allocation) { for _, alloc := range allocs { node, ok := tainted[alloc.NodeID] @@ -826,8 +826,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc continue } - // If the scheduler has marked it as stop or evict already but the alloc - // wasn't terminal on the client change the status to lost. + // If the alloc is already correctly marked lost, we're done if (alloc.DesiredStatus == structs.AllocDesiredStatusStop || alloc.DesiredStatus == structs.AllocDesiredStatusEvict) && (alloc.ClientStatus == structs.AllocClientStatusRunning ||