Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: implement system batch scheduler #9160

Merged
merged 2 commits into from
Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,10 @@ const (

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
type PreemptionConfig struct {
SystemSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
SystemSchedulerEnabled bool
SysBatchSchedulerEnabled bool
BatchSchedulerEnabled bool
ServiceSchedulerEnabled bool
}

// SchedulerGetConfiguration is used to query the current Scheduler configuration.
Expand Down
18 changes: 11 additions & 7 deletions client/allocrunner/taskrunner/restarts/restarts.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@ const (
// jitter is the percent of jitter added to restart delays.
jitter = 0.25

ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableErrror = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
ReasonNoRestartsAllowed = "Policy allows no restarts"
ReasonUnrecoverableError = "Error was unrecoverable"
ReasonWithinPolicy = "Restart within policy"
ReasonDelay = "Exceeded allowed attempts, applying a delay"
)

func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker {
// Batch jobs should not restart if they exit successfully
onSuccess := jobType != structs.JobTypeBatch
onSuccess := true

// Batch & SysBatch jobs should not restart if they exit successfully
if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch {
onSuccess = false
}

// Prestart sidecars should get restarted on success
if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart {
Expand Down Expand Up @@ -201,7 +205,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
if r.startErr != nil {
// If the error is not recoverable, do not restart.
if !structs.IsRecoverable(r.startErr) {
r.reason = ReasonUnrecoverableErrror
r.reason = ReasonUnrecoverableError
return structs.TaskNotRestarting, 0
}
} else if r.exitRes != nil {
Expand Down
7 changes: 4 additions & 3 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,10 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R
SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm),
MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled,
PreemptionConfig: structs.PreemptionConfig{
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled,
SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled,
BatchSchedulerEnabled: conf.PreemptionConfig.BatchSchedulerEnabled,
ServiceSchedulerEnabled: conf.PreemptionConfig.ServiceSchedulerEnabled},
}

if err := args.Config.Validate(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) {

// Only system jobs can preempt other jobs by default.
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.False(out.SchedulerConfig.MemoryOversubscriptionEnabled)
Expand Down Expand Up @@ -319,6 +320,8 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply)
require.Nil(err)
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
require.True(reply.SchedulerConfig.MemoryOversubscriptionEnabled)
})
Expand All @@ -330,6 +333,7 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
require := require.New(t)
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true,
"SysBatchSchedulerEnabled":true,
"BatchSchedulerEnabled":true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
Expand All @@ -352,7 +356,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.True(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)

// Create a CAS request, bad index
{
Expand Down Expand Up @@ -393,7 +399,9 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
t.Fatalf("err: %v", err)
}
require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled)
require.False(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled)
})
}

Expand Down
3 changes: 2 additions & 1 deletion e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ import (
_ "github.com/hashicorp/nomad/e2e/rescheduling"
_ "github.com/hashicorp/nomad/e2e/scaling"
_ "github.com/hashicorp/nomad/e2e/scalingpolicies"
_ "github.com/hashicorp/nomad/e2e/scheduler_sysbatch"
_ "github.com/hashicorp/nomad/e2e/scheduler_system"
_ "github.com/hashicorp/nomad/e2e/spread"
_ "github.com/hashicorp/nomad/e2e/systemsched"
_ "github.com/hashicorp/nomad/e2e/taskevents"
_ "github.com/hashicorp/nomad/e2e/vaultsecrets"
_ "github.com/hashicorp/nomad/e2e/volumes"
Expand Down
24 changes: 24 additions & 0 deletions e2e/e2eutil/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,30 @@ func WaitForAllocStopped(t *testing.T, nomadClient *api.Client, allocID string)
})
}

func WaitForAllocStatus(t *testing.T, nomadClient *api.Client, allocID string, status string) {
testutil.WaitForResultRetries(retries, func() (bool, error) {
time.Sleep(time.Millisecond * 100)
alloc, _, err := nomadClient.Allocations().Info(allocID, nil)
if err != nil {
return false, err
}
switch alloc.ClientStatus {
case status:
return true, nil
default:
return false, fmt.Errorf("expected %s alloc, but was: %s", status, alloc.ClientStatus)
}
}, func(err error) {
t.Fatalf("failed to wait on alloc: %v", err)
})
}

func WaitForAllocsStatus(t *testing.T, nomadClient *api.Client, allocIDs []string, status string) {
for _, allocID := range allocIDs {
WaitForAllocStatus(t, nomadClient, allocID, status)
}
}

func AllocIDsFromAllocationListStubs(allocs []*api.AllocationListStub) []string {
allocIDs := make([]string, 0, len(allocs))
for _, alloc := range allocs {
Expand Down
30 changes: 30 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_dispatch.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

parameterized {
payload = "forbidden"
meta_required = ["KEY"]
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}
25 changes: 25 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_job_fast.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}
25 changes: 25 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_job_slow.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "echo hi; sleep 1000000"]
}
}
}
}
30 changes: 30 additions & 0 deletions e2e/scheduler_sysbatch/input/sysbatch_periodic.nomad
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
job "sysbatchjob" {
datacenters = ["dc1"]

type = "sysbatch"

constraint {
attribute = "${attr.kernel.name}"
value = "linux"
}

periodic {
cron = "*/15 * * * * *"
prohibit_overlap = true
}

group "sysbatch_job_group" {
count = 1

task "sysbatch_task" {
driver = "docker"

config {
image = "busybox:1"

command = "/bin/sh"
args = ["-c", "echo hi; sleep 1"]
}
}
}
}
Loading