Skip to content

Commit

Permalink
server: stop after client disconnect (#7939)
Browse files Browse the repository at this point in the history
* jobspec, api: add stop_after_client_disconnect

* nomad/state/state_store: error message typo

* structs: alloc methods to support stop_after_client_disconnect

1. a global AllocStates to track status changes with timestamps. We
   need this to track the time at which the alloc became lost
   originally.

2. ShouldClientStop() and WaitClientStop() to actually do the math

* scheduler/reconcile_util: delayByStopAfterClientDisconnect

* scheduler/reconcile: use delayByStopAfterClientDisconnect

* scheduler/util: updateNonTerminalAllocsToLost comments

This was setup to only update allocs to lost if the DesiredStatus had
already been set by the scheduler. It seems like the intention was to
update the status from any non-terminal state, and not all lost allocs
have been marked stop or evict by now

* scheduler/testing: AssertEvalStatus just use require

* scheduler/generic_sched: don't create a blocked eval if delayed

* scheduler/generic_sched_test: several scheduling cases
  • Loading branch information
langmartin authored May 13, 2020
1 parent e3f9beb commit cd6d344
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 37 deletions.
35 changes: 18 additions & 17 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,23 +411,24 @@ func (vm *VolumeMount) Canonicalize() {

// TaskGroup is the unit of scheduling.
type TaskGroup struct {
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
Volumes map[string]*VolumeRequest
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Migrate *MigrateStrategy
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
Scaling *ScalingPolicy
Name *string
Count *int
Constraints []*Constraint
Affinities []*Affinity
Tasks []*Task
Spreads []*Spread
Volumes map[string]*VolumeRequest
RestartPolicy *RestartPolicy
ReschedulePolicy *ReschedulePolicy
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Migrate *MigrateStrategy
Networks []*NetworkResource
Meta map[string]string
Services []*Service
ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"`
StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect"`
Scaling *ScalingPolicy
}

// NewTaskGroup creates a new TaskGroup.
Expand Down
4 changes: 3 additions & 1 deletion client/heartbeatstop.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (h *heartbeatStop) watch() {
select {
case allocID := <-stop:
if err := h.stopAlloc(allocID); err != nil {
h.logger.Warn("stopping alloc %s on heartbeat timeout failed: %v", allocID, err)
h.logger.Warn("error stopping on heartbeat timeout", "alloc", allocID, "error", err)
continue
}
delete(h.allocInterval, allocID)
Expand Down Expand Up @@ -142,6 +142,8 @@ func (h *heartbeatStop) stopAlloc(allocID string) error {
return err
}

h.logger.Debug("stopping alloc for stop_after_client_disconnect", "alloc", allocID)

runner.Destroy()
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,10 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta
tg.ShutdownDelay = taskGroup.ShutdownDelay
}

if taskGroup.StopAfterClientDisconnect != nil {
tg.StopAfterClientDisconnect = taskGroup.StopAfterClientDisconnect
}

if taskGroup.ReschedulePolicy != nil {
tg.ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: *taskGroup.ReschedulePolicy.Attempts,
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"service",
"volume",
"scaling",
"stop_after_client_disconnect",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func TestParse(t *testing.T) {
},
},
},
StopAfterClientDisconnect: helper.TimeToPtr(120 * time.Second),
ReschedulePolicy: &api.ReschedulePolicy{
Interval: helper.TimeToPtr(12 * time.Hour),
Attempts: helper.IntToPtr(5),
Expand Down
2 changes: 2 additions & 0 deletions jobspec/test-fixtures/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ job "binstore-storagelocker" {
}
}

stop_after_client_disconnect = "120s"

task "binstore" {
driver = "docker"
user = "bob"
Expand Down
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4623,7 +4623,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
}
case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
default:
s.logger.Error("invalid old client status for allocatio",
s.logger.Error("invalid old client status for allocation",
"alloc_id", existingAlloc.ID, "client_status", existingAlloc.ClientStatus)
}
summaryChanged = true
Expand Down
14 changes: 14 additions & 0 deletions nomad/structs/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,20 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er
}
}

// StopAfterClientDisconnect diff
if oldPrimitiveFlat != nil && newPrimitiveFlat != nil {
if tg.StopAfterClientDisconnect == nil {
oldPrimitiveFlat["StopAfterClientDisconnect"] = ""
} else {
oldPrimitiveFlat["StopAfterClientDisconnect"] = fmt.Sprintf("%d", *tg.StopAfterClientDisconnect)
}
if other.StopAfterClientDisconnect == nil {
newPrimitiveFlat["StopAfterClientDisconnect"] = ""
} else {
newPrimitiveFlat["StopAfterClientDisconnect"] = fmt.Sprintf("%d", *other.StopAfterClientDisconnect)
}
}

// Diff the primitive fields.
diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, false)

Expand Down
87 changes: 87 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3799,6 +3799,10 @@ func (j *Job) Validate() error {
mErr.Errors = append(mErr.Errors, errors.New("ShutdownDelay must be a positive value"))
}

if tg.StopAfterClientDisconnect != nil && *tg.StopAfterClientDisconnect < 0 {
mErr.Errors = append(mErr.Errors, errors.New("StopAfterClientDisconnect must be a positive value"))
}

if j.Type == "system" && tg.Count > 1 {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Job task group %s has count %d. Count cannot exceed 1 with system scheduler",
Expand Down Expand Up @@ -5265,6 +5269,10 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.ShutdownDelay = tg.ShutdownDelay
}

if tg.StopAfterClientDisconnect != nil {
ntg.StopAfterClientDisconnect = tg.StopAfterClientDisconnect
}

return ntg
}

Expand Down Expand Up @@ -6516,6 +6524,19 @@ func (t *Template) Warnings() error {
return mErr.ErrorOrNil()
}

// AllocState records a single event that changes the state of the whole allocation
type AllocStateField uint8

const (
AllocStateFieldClientStatus AllocStateField = iota
)

type AllocState struct {
Field AllocStateField
Value string
Time time.Time
}

// Set of possible states for a task.
const (
TaskStatePending = "pending" // The task is waiting to be run.
Expand Down Expand Up @@ -8152,6 +8173,9 @@ type Allocation struct {
// TaskStates stores the state of each task,
TaskStates map[string]*TaskState

// AllocStates track meta data associated with changes to the state of the whole allocation, like becoming lost
AllocStates []*AllocState

// PreviousAllocation is the allocation that this allocation is replacing
PreviousAllocation string

Expand Down Expand Up @@ -8420,6 +8444,49 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
return nextRescheduleTime, rescheduleEligible
}

// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration
func (a *Allocation) ShouldClientStop() bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil ||
tg.StopAfterClientDisconnect == nil ||
*tg.StopAfterClientDisconnect == 0*time.Nanosecond {
return false
}
return true
}

// WaitClientStop uses the reschedule delay mechanism to block rescheduling until
// StopAfterClientDisconnect's block interval passes
func (a *Allocation) WaitClientStop() time.Time {
tg := a.Job.LookupTaskGroup(a.TaskGroup)

// An alloc can only be marked lost once, so use the first lost transition
var t time.Time
for _, s := range a.AllocStates {
if s.Field == AllocStateFieldClientStatus &&
s.Value == AllocClientStatusLost {
t = s.Time
break
}
}

// On the first pass, the alloc hasn't been marked lost yet, and so we start
// counting from now
if t.IsZero() {
t = time.Now().UTC()
}

// Find the max kill timeout
kill := DefaultKillTimeout
for _, t := range tg.Tasks {
if t.KillTimeout > kill {
kill = t.KillTimeout
}
}

return t.Add(*tg.StopAfterClientDisconnect + kill)
}

// NextDelay returns a duration after which the allocation can be rescheduled.
// It is calculated according to the delay function and previous reschedule attempts.
func (a *Allocation) NextDelay() time.Duration {
Expand Down Expand Up @@ -8476,6 +8543,24 @@ func (a *Allocation) Terminated() bool {
return false
}

// SetStopped updates the allocation in place to a DesiredStatus stop, with the ClientStatus
func (a *Allocation) SetStop(clientStatus, clientDesc string) {
a.DesiredStatus = AllocDesiredStatusStop
a.ClientStatus = clientStatus
a.ClientDescription = clientDesc
a.AppendState(AllocStateFieldClientStatus, clientStatus)
}

// AppendState creates and appends an AllocState entry recording the time of the state
// transition. Used to mark the transition to lost
func (a *Allocation) AppendState(field AllocStateField, value string) {
a.AllocStates = append(a.AllocStates, &AllocState{
Field: field,
Value: value,
Time: time.Now().UTC(),
})
}

// RanSuccessfully returns whether the client has ran the allocation and all
// tasks finished successfully. Critically this function returns whether the
// allocation has ran to completion and not just that the alloc has converged to
Expand Down Expand Up @@ -9384,6 +9469,8 @@ func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus s
newAlloc.ClientStatus = clientStatus
}

newAlloc.AppendState(AllocStateFieldClientStatus, clientStatus)

node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, newAlloc)
Expand Down
69 changes: 68 additions & 1 deletion nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3595,13 +3595,21 @@ func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) {

plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost)

appendedAlloc := plan.NodeUpdate[alloc.NodeID][0]
expectedAlloc := new(Allocation)
*expectedAlloc = *alloc
expectedAlloc.DesiredDescription = desiredDesc
expectedAlloc.DesiredStatus = AllocDesiredStatusStop
expectedAlloc.ClientStatus = AllocClientStatusLost
expectedAlloc.Job = nil
expectedAlloc.AllocStates = []*AllocState{{
Field: AllocStateFieldClientStatus,
Value: "lost",
}}

// This value is set to time.Now() in AppendStoppedAlloc, so clear it
appendedAlloc := plan.NodeUpdate[alloc.NodeID][0]
appendedAlloc.AllocStates[0].Time = time.Time{}

assert.Equal(t, expectedAlloc, appendedAlloc)
assert.Equal(t, alloc.Job, plan.Job)
}
Expand Down Expand Up @@ -4372,6 +4380,65 @@ func TestAllocation_NextDelay(t *testing.T) {

}

func TestAllocation_WaitClientStop(t *testing.T) {
type testCase struct {
desc string
stop time.Duration
status string
expectedShould bool
expectedRescheduleTime time.Time
}
now := time.Now().UTC()
testCases := []testCase{
{
desc: "running",
stop: 2 * time.Second,
status: AllocClientStatusRunning,
expectedShould: true,
},
{
desc: "no stop_after_client_disconnect",
status: AllocClientStatusLost,
expectedShould: false,
},
{
desc: "stop",
status: AllocClientStatusLost,
stop: 2 * time.Second,
expectedShould: true,
expectedRescheduleTime: now.Add((2 + 5) * time.Second),
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
j := testJob()
a := &Allocation{
ClientStatus: tc.status,
Job: j,
TaskStates: map[string]*TaskState{},
}

if tc.status == AllocClientStatusLost {
a.AppendState(AllocStateFieldClientStatus, AllocClientStatusLost)
}

j.TaskGroups[0].StopAfterClientDisconnect = &tc.stop
a.TaskGroup = j.TaskGroups[0].Name

require.Equal(t, tc.expectedShould, a.ShouldClientStop())

if !tc.expectedShould || tc.status != AllocClientStatusLost {
return
}

// the reschedTime is close to the expectedRescheduleTime
reschedTime := a.WaitClientStop()
e := reschedTime.Unix() - tc.expectedRescheduleTime.Unix()
require.Less(t, e, int64(2))
})
}
}

func TestAllocation_Canonicalize_Old(t *testing.T) {
alloc := MockAlloc()
alloc.AllocatedResources = nil
Expand Down
8 changes: 5 additions & 3 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,10 @@ func (s *GenericScheduler) process() (bool, error) {
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available. If the
// current evaluation is already a blocked eval, we reuse it by submitting
// a new eval to the planner in createBlockedEval
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil {
// a new eval to the planner in createBlockedEval. If the current eval is
// pending with WaitUntil set, it's delayed rather than blocked.
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil &&
s.eval.WaitUntil.IsZero() {
if err := s.createBlockedEval(false); err != nil {
s.logger.Error("failed to make blocked eval", "error", err)
return false, err
Expand Down Expand Up @@ -338,7 +340,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}

// Update the allocations which are in pending/running state on tainted
// nodes to lost
// nodes to lost, but only if the scheduler has already marked them
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)

reconciler := NewAllocReconciler(s.logger,
Expand Down
Loading

0 comments on commit cd6d344

Please sign in to comment.