Skip to content

Commit

Permalink
core: implement system batch scheduler
Browse files Browse the repository at this point in the history
This PR implements a new "System Batch" scheduler type. Jobs can
make use of this new scheduler by setting their type to 'sysbatch'.

Like the name implies, sysbatch can be thought of as a hybrid between
system and batch jobs - it is for running short lived jobs intended to
run on every compatible node in the cluster.

As with batch jobs, sysbatch jobs can also be periodic and/or parameterized
dispatch jobs. A sysbatch job is considered complete when it has been run
on all compatible nodes until reaching a terminal state (success or failed
on retries).

Feasibility and preemption are governed the same as with system jobs. In
this PR, the update stanza is not yet supported. The update stanza is sill
limited in functionality for the underlying system scheduler, and is
not useful yet for sysbatch jobs. Further work in #4740 will improve
support for the update stanza and deployments.

Closes #2527
  • Loading branch information
shoenig committed Nov 5, 2020
1 parent 4daf5a4 commit 6eec337
Show file tree
Hide file tree
Showing 41 changed files with 2,800 additions and 615 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ FEATURES:
* **Event Stream**: Subscribe to change events as they occur in real time. [[GH-9013](https://github.com/hashicorp/nomad/issues/9013)]
* **Namespaces OSS**: Namespaces are now available in open source Nomad. [[GH-9135](https://github.com/hashicorp/nomad/issues/9135)]
* **Topology Visualization**: See all of the clients and allocations in a cluster at once. [[GH-9077](https://github.com/hashicorp/nomad/issues/9077)]
* **System Batch Scheduling**: New `sysbatch` scheduler type for running short lived jobs across all nodes. [[GH-9160](https://github.com/hashicorp/nomad/pull/9160)]

IMPROVEMENTS:
* core: Improved job deregistration error logging. [[GH-8745](https://github.com/hashicorp/nomad/issues/8745)]
Expand Down
7 changes: 4 additions & 3 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ const (

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
SystemSchedulerEnabled bool
SysBatchSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
Expand Down
18 changes: 11 additions & 7 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ const (
// jitter is the percent of jitter added to restart delays.
jitter = 0.25

ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableErrror = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableError = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
)

func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch
onSuccess := true

// Batch & SysBatch jobs should not restart if they exit successfully
if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch {
onSuccess = false
}

// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
Expand Down Expand Up @@ -196,7 +200,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
r.reason = ReasonUnrecoverableError
return structs.TaskNotRestarting, 0
}
} else if r.exitRes != nil {
Expand Down
7 changes: 4 additions & 3 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,10 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
args.Config = structs.SchedulerConfiguration{
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
}

if err := args.Config.Validate(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) {

// Only system jobs can preempt other jobs by default.
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
})
Expand Down Expand Up @@ -314,6 +315,8 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
})
}
Expand All @@ -324,6 +327,7 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"SysBatchSchedulerEnabled":true,
"BatchSchedulerEnabled":true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
Expand All @@ -346,7 +350,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)

// Create a CAS request, bad index
{
Expand Down Expand Up @@ -387,7 +393,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
})
}

Expand Down
3 changes: 2 additions & 1 deletion e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
_ "github.com/hashicorp/nomad/e2e/podman"
_ "github.com/hashicorp/nomad/e2e/quotas"
_ "github.com/hashicorp/nomad/e2e/rescheduling"
_ "github.com/hashicorp/nomad/e2e/scheduler_sysbatch"
_ "github.com/hashicorp/nomad/e2e/scheduler_system"
_ "github.com/hashicorp/nomad/e2e/spread"
_ "github.com/hashicorp/nomad/e2e/systemsched"
_ "github.com/hashicorp/nomad/e2e/taskevents"
_ "github.com/hashicorp/nomad/e2e/vaultsecrets"
_ "github.com/hashicorp/nomad/e2e/volumes"
Expand Down
24 changes: 24 additions & 0 deletions e2e/e2eutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,30 @@ func WaitForAllocStopped(t *testing.T, nomadClient *api.Client, allocID string)
})
}

func WaitForAllocStatus(t *testing.T, nomadClient *api.Client, allocID string, status string) {
testutil.WaitForResultRetries(retries, func() (bool, error) {
time.Sleep(time.Millisecond * 100)
alloc, _, err := nomadClient.Allocations().Info(allocID, nil)
if err != nil {
return false, err
}
switch alloc.ClientStatus {
case status:
return true, nil
default:
return false, fmt.Errorf("expected %s alloc, but was: %s", status, alloc.ClientStatus)
}
}, func(err error) {
t.Fatalf("failed to wait on alloc: %v", err)
})
}

func WaitForAllocsStatus(t *testing.T, nomadClient *api.Client, allocIDs []string, status string) {
for _, allocID := range allocIDs {
WaitForAllocStatus(t, nomadClient, allocID, status)
}
}

func AllocIDsFromAllocationListStubs(allocs []*api.AllocationListStub) []string {
allocIDs := make([]string, 0, len(allocs))
for _, alloc := range allocs {
Expand Down
30 changes: 30 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_dispatch.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

parameterized {
payload = "forbidden"
meta_required = ["KEY"]
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "bash:5"

command = "bash"
args = ["-c", "ping -c 10 example.com"]
}
}
}
}
25 changes: 25 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_job_fast.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "bash:5"

command = "bash"
args = ["-c", "ping -c 10 example.com"]
}
}
}
}
25 changes: 25 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_job_slow.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "bash:5"

command = "bash"
args = ["-c", "ping -c 100000 example.com"]
}
}
}
}
30 changes: 30 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_periodic.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

periodic {
cron = "*/15 * * * * *"
prohibit_overlap = true
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "bash:5"

command = "bash"
args = ["-c", "ping -c 10 example.com"]
}
}
}
}
Loading

0 comments on commit 6eec337

Please sign in to comment.