Skip to content

Commit

Permalink
core: implement system batch scheduler
Browse files Browse the repository at this point in the history
This PR implements a new "System Batch" scheduler type. Jobs can
make use of this new scheduler by setting their type to 'sysbatch'.

Like the name implies, sysbatch can be thought of as a hybrid between
system and batch jobs - it is for running short lived jobs intended to
run on every compatible node in the cluster.

As with batch jobs, sysbatch jobs can also be periodic and/or parameterized
dispatch jobs. A sysbatch job is considered complete when it has been run
on all compatible nodes until reaching a terminal state (success or failed
on retries).

Feasibility and preemption are governed the same as with system jobs. In
this PR, the update stanza is not yet supported. The update stanza is sill
limited in functionality for the underlying system scheduler, and is
not useful yet for sysbatch jobs. Further work in #4740 will improve
support for the update stanza and deployments.

Closes #2527
  • Loading branch information
shoenig authored and Mahmood Ali committed Aug 3, 2021
1 parent 52c37e1 commit 61ee443
Show file tree
Hide file tree
Showing 36 changed files with 2,757 additions and 603 deletions.
7 changes: 4 additions & 3 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ const (

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
SystemSchedulerEnabled bool
SysBatchSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
Expand Down
18 changes: 11 additions & 7 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ const (
// jitter is the percent of jitter added to restart delays.
jitter = 0.25

ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableErrror = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableError = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
)

func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch
onSuccess := true

// Batch & SysBatch jobs should not restart if they exit successfully
if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch {
onSuccess = false
}

// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
Expand Down Expand Up @@ -201,7 +205,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
r.reason = ReasonUnrecoverableError
return structs.TaskNotRestarting, 0
}
} else if r.exitRes != nil {
Expand Down
7 changes: 4 additions & 3 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
}

if err := args.Config.Validate(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) {

// Only system jobs can preempt other jobs by default.
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(out.SchedulerConfig.MemoryOversubscriptionEnabled)
Expand Down Expand Up @@ -319,6 +320,8 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(reply.SchedulerConfig.MemoryOversubscriptionEnabled)
})
Expand All @@ -330,6 +333,7 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"SysBatchSchedulerEnabled":true,
"BatchSchedulerEnabled":true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
Expand All @@ -352,7 +356,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)

// Create a CAS request, bad index
{
Expand Down Expand Up @@ -393,7 +399,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
})
}

Expand Down
3 changes: 2 additions & 1 deletion e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
_ "github.com/hashicorp/nomad/e2e/rescheduling"
_ "github.com/hashicorp/nomad/e2e/scaling"
_ "github.com/hashicorp/nomad/e2e/scalingpolicies"
_ "github.com/hashicorp/nomad/e2e/scheduler_sysbatch"
_ "github.com/hashicorp/nomad/e2e/scheduler_system"
_ "github.com/hashicorp/nomad/e2e/spread"
_ "github.com/hashicorp/nomad/e2e/systemsched"
_ "github.com/hashicorp/nomad/e2e/taskevents"
_ "github.com/hashicorp/nomad/e2e/vaultsecrets"
_ "github.com/hashicorp/nomad/e2e/volumes"
Expand Down
24 changes: 24 additions & 0 deletions e2e/e2eutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,30 @@ func WaitForAllocStopped(t *testing.T, nomadClient *api.Client, allocID string)
})
}

func WaitForAllocStatus(t *testing.T, nomadClient *api.Client, allocID string, status string) {
testutil.WaitForResultRetries(retries, func() (bool, error) {
time.Sleep(time.Millisecond * 100)
alloc, _, err := nomadClient.Allocations().Info(allocID, nil)
if err != nil {
return false, err
}
switch alloc.ClientStatus {
case status:
return true, nil
default:
return false, fmt.Errorf("expected %s alloc, but was: %s", status, alloc.ClientStatus)
}
}, func(err error) {
t.Fatalf("failed to wait on alloc: %v", err)
})
}

func WaitForAllocsStatus(t *testing.T, nomadClient *api.Client, allocIDs []string, status string) {
for _, allocID := range allocIDs {
WaitForAllocStatus(t, nomadClient, allocID, status)
}
}

func AllocIDsFromAllocationListStubs(allocs []*api.AllocationListStub) []string {
allocIDs := make([]string, 0, len(allocs))
for _, alloc := range allocs {
Expand Down
30 changes: 30 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_dispatch.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

parameterized {
payload = "forbidden"
meta_required = ["KEY"]
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "bash:5"

command = "bash"
args = ["-c", "ping -c 10 example.com"]
}
}
}
}
25 changes: 25 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_job_fast.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "bash:5"

command = "bash"
args = ["-c", "ping -c 10 example.com"]
}
}
}
}
25 changes: 25 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_job_slow.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "bash:5"

command = "bash"
args = ["-c", "ping -c 100000 example.com"]
}
}
}
}
30 changes: 30 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_periodic.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

periodic {
cron = "*/15 * * * * *"
prohibit_overlap = true
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "bash:5"

command = "bash"
args = ["-c", "ping -c 10 example.com"]
}
}
}
}
Loading

0 comments on commit 61ee443

Please sign in to comment.