Skip to content

Commit

Permalink
GenericScheduler: upsert unknown and queue reconnecting
Browse files Browse the repository at this point in the history
  • Loading branch information
DerekStrickland committed Feb 14, 2022
1 parent 1e013be commit 6cd52b0
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 1 deletion.
20 changes: 19 additions & 1 deletion scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ const (
// allocLost is the status used when an allocation is lost
allocLost = "alloc is lost since its node is down"

// allocUnknown is the status used when an allocation is unknown
allocUnknown = "alloc is unknown since its node is disconnected"

// allocInPlace is the status used when speculating on an in-place update
allocInPlace = "alloc updating in-place"

Expand All @@ -55,6 +58,11 @@ const (
// up evals for delayed rescheduling
reschedulingFollowupEvalDesc = "created for delayed rescheduling"

// disconnectTimeoutFollowupEvalDesc is the description used when creating follow
// up evals for allocations that be should be stopped after its disconnect
// timeout has passed.
disconnectTimeoutFollowupEvalDesc = "created for delayed disconnect timeout"

// maxPastRescheduleEvents is the maximum number of past reschedule event
// that we track when unlimited rescheduling is enabled
maxPastRescheduleEvents = 5
Expand Down Expand Up @@ -148,7 +156,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) {
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption,
structs.EvalTriggerScaling:
structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
Expand Down Expand Up @@ -392,6 +400,11 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus, stop.followupEvalID)
}

// Handle disconnect updates
for _, update := range results.disconnectUpdates {
s.plan.AppendUnknownAlloc(update)
}

// Handle the in-place updates
for _, update := range results.inplaceUpdate {
if update.DeploymentID != s.deployment.GetID() {
Expand All @@ -406,6 +419,11 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update, nil)
}

// Handle reconnect updates
for _, update := range results.reconnectUpdates {
s.ctx.Plan().AppendAlloc(update, nil)
}

// Nothing remaining to do if placement is not required
if len(results.place)+len(results.destructiveUpdate) == 0 {
// If the job has been purged we don't have access to the job. Otherwise
Expand Down
144 changes: 144 additions & 0 deletions scheduler/generic_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6379,3 +6379,147 @@ func TestPropagateTaskState(t *testing.T) {
})
}
}

// Tests that a client disconnect generates attribute updates and follow up evals.
func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T) {
cases := []struct {
maxClientDisconnect time.Duration
rescheduled bool
count int
}{
{
maxClientDisconnect: 10 * time.Minute,
rescheduled: true,
count: 1,
},
}

for _, tc := range cases {
t.Run(fmt.Sprintf(""), func(t *testing.T) {
h := NewHarness(t)

disconnectedNode, job, unknownAllocs := initNodeAndAllocs(t, h, tc.count, tc.maxClientDisconnect,
structs.NodeStatusReady, structs.AllocClientStatusRunning)

// Now disconnect the node
disconnectedNode.Status = structs.NodeStatusDisconnected
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode))

// Create an evaluation triggered by the disconnect
evals := []*structs.Evaluation{{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: disconnectedNode.ID,
Status: structs.EvalStatusPending,
}}
nodeStatusUpdateEval := evals[0]
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals))

// Process the evaluation
err := h.Process(NewServiceScheduler, nodeStatusUpdateEval)
require.NoError(t, err)
require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status)
require.Len(t, h.Plans, 1, "plan")

// One followup delayed eval created
require.Len(t, h.CreateEvals, 1)
followUpEval := h.CreateEvals[0]
require.Equal(t, nodeStatusUpdateEval.ID, followUpEval.PreviousEval)
require.Equal(t, "pending", followUpEval.Status)
require.NotEmpty(t, followUpEval.WaitUntil)

// Insert eval in the state store
ws := memdb.NewWatchSet()
testutil.WaitForResult(func() (bool, error) {
found, err := h.State.EvalByID(ws, followUpEval.ID)
if err != nil {
return false, err
}
if found == nil {
return false, nil
}

require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval)
require.Equal(t, "pending", found.Status)
require.NotEmpty(t, found.WaitUntil)

return true, nil
}, func(err error) {
require.NoError(t, err)
})

// Validate that the ClientStatus updates are part of the plan.
require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], tc.count)
// Pending update should have unknown status.
for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] {
require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown)
}

// Simulate that NodeAllocation got processed.
testutil.WaitForResult(func() (bool, error) {
err = h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), h.Plans[0].NodeAllocation[disconnectedNode.ID])
require.NoError(t, err, "plan.NodeUpdate")
return true, nil
}, func(err error) {
require.NoError(t, err)
})

// Validate that the StateStore Upsert applied the ClientStatus we specified.
for _, alloc := range unknownAllocs {
alloc, err = h.State.AllocByID(ws, alloc.ID)
require.NoError(t, err)
require.Equal(t, alloc.ClientStatus, structs.AllocClientStatusUnknown)

// Allocations have been transitioned to unknown
require.Equal(t, structs.AllocDesiredStatusRun, alloc.DesiredStatus)
require.Equal(t, structs.AllocClientStatusUnknown, alloc.ClientStatus)
}

// If the reschedule policy indicates they should get rescheduled,
// test that they do.
if tc.rescheduled {

}
})
}
}

func initNodeAndAllocs(t *testing.T, h *Harness, allocCount int,
maxClientDisconnect time.Duration, nodeStatus, clientStatus string) (*structs.Node, *structs.Job, []*structs.Allocation) {
// Node, which is ready
node := mock.Node()
node.Status = nodeStatus
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))

// Job with allocations and max_client_disconnect
job := mock.Job()
job.TaskGroups[0].Count = allocCount
job.TaskGroups[0].MaxClientDisconnect = &maxClientDisconnect
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))

allocs := make([]*structs.Allocation, allocCount)
for i := 0; i < allocCount; i++ {
// Alloc for the running group
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = clientStatus
//if !tc.when.IsZero() {
// alloc.AllocStates = []*structs.AllocState{{
// Field: structs.AllocStateFieldClientStatus,
// Value: structs.AllocClientStatusLost,
// Time: tc.when,
// }}
//}
allocs[i] = alloc
}

require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
return node, job, allocs
}

0 comments on commit 6cd52b0

Please sign in to comment.