diff --git a/api/tasks.go b/api/tasks.go index b22938e1ea3..d64bc7bb247 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -163,6 +163,7 @@ type Task struct { Vault *Vault Templates []*Template DispatchPayload *DispatchPayloadConfig + Leader *bool } // TaskArtifact is used to download artifacts before running a task. @@ -256,10 +257,10 @@ const ( TaskNotRestarting = "Not Restarting" TaskDownloadingArtifacts = "Downloading Artifacts" TaskArtifactDownloadFailed = "Failed Artifact Download" - TaskVaultRenewalFailed = "Vault token renewal failed" - TaskSiblingFailed = "Sibling task failed" + TaskSiblingFailed = "Sibling Task Failed" TaskSignaling = "Signaling" TaskRestartSignal = "Restart Signaled" + TaskLeaderDead = "Leader Task Dead" ) // TaskEvent is an event that effects the state of a task and contains meta-data diff --git a/client/alloc_runner.go b/client/alloc_runner.go index cc3db0a5d7e..ff2dd964297 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -383,19 +383,39 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv taskState.State = state if state == structs.TaskStateDead { + // Find all tasks that are not the one that is dead and check if the one + // that is dead is a leader + var otherTaskRunners []*TaskRunner + var otherTaskNames []string + leader := false + for task, tr := range r.tasks { + if task != taskName { + otherTaskRunners = append(otherTaskRunners, tr) + otherTaskNames = append(otherTaskNames, task) + } else if tr.task.Leader { + leader = true + } + } + // If the task failed, we should kill all the other tasks in the task group. if taskState.Failed { - var destroyingTasks []string - for task, tr := range r.tasks { - if task != taskName { - destroyingTasks = append(destroyingTasks, task) - tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName)) - } + for _, tr := range otherTaskRunners { + tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName)) } - if len(destroyingTasks) > 0 { - r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks) + if len(otherTaskRunners) > 0 { + r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, otherTaskNames) + } + } else if leader { + for _, tr := range otherTaskRunners { + tr.Destroy(structs.NewTaskEvent(structs.TaskLeaderDead)) + } + if len(otherTaskRunners) > 0 { + r.logger.Printf("[DEBUG] client: leader task %q is dead, destroying other tasks in task group: %v", taskName, otherTaskNames) } } + + // If the task was a leader task we should kill all the other + // tasks. } select { diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 6bc93b9ce29..ca055ef8b1b 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -663,6 +663,70 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { }) } +func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { + upd, ar := testAllocRunner(false) + + // Create two tasks in the task group + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.KillTimeout = 10 * time.Millisecond + task.Config = map[string]interface{}{ + "run_for": "10s", + } + + task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy() + task2.Name = "task 2" + task2.Driver = "mock_driver" + task2.Leader = true + task2.Config = map[string]interface{}{ + "run_for": "1s", + } + ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2) + ar.alloc.TaskResources[task2.Name] = task2.Resources + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete) + } + + // Task One should be killed + state1 := last.TaskStates[task.Name] + if state1.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead) + } + if len(state1.Events) < 2 { + // At least have a received and destroyed + return false, fmt.Errorf("Unexpected number of events") + } + + found := false + for _, e := range state1.Events { + if e.Type != structs.TaskLeaderDead { + found = true + } + } + + if !found { + return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead) + } + + // Task Two should be dead + state2 := last.TaskStates[task2.Name] + if state2.State != structs.TaskStateDead { + return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +} + func TestAllocRunner_MoveAllocDir(t *testing.T) { // Create an alloc runner alloc := mock.Alloc() diff --git a/command/alloc_status.go b/command/alloc_status.go index 45cc15b7ccd..d5ea53d17f0 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -349,12 +349,6 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { } else { desc = "Task exceeded restart policy" } - case api.TaskVaultRenewalFailed: - if event.VaultError != "" { - desc = event.VaultError - } else { - desc = "Task's Vault token failed to be renewed" - } case api.TaskSiblingFailed: if event.FailedSibling != "" { desc = fmt.Sprintf("Task's sibling %q failed", event.FailedSibling) @@ -382,6 +376,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { } case api.TaskDriverMessage: desc = event.DriverMessage + case api.TaskLeaderDead: + desc = "Leader Task in Group dead" } // Reverse order so we are sorted by time diff --git a/jobspec/parse.go b/jobspec/parse.go index e109854c4ca..628e177873b 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -568,6 +568,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l "driver", "env", "kill_timeout", + "leader", "logs", "meta", "resources", diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 1e3485b696a..068618d9657 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -181,6 +181,7 @@ func TestParse(t *testing.T) { Perms: "777", }, }, + Leader: true, }, &structs.Task{ Name: "storagelocker", diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index b70ad03dd07..72870620f9f 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -50,6 +50,7 @@ job "binstore-storagelocker" { task "binstore" { driver = "docker" user = "bob" + leader = true config { image = "hashicorp/binstore" diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index f4ec05e6913..2e3751cfe43 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -403,7 +403,7 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) { diff.Objects = append(diff.Objects, vDiff) } - // Artifacts diff + // Template diff tmplDiffs := primitiveObjectSetDiff( interfaceSlice(t.Templates), interfaceSlice(other.Templates), diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 852e91db51e..5bf1c0a24d5 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -1777,6 +1777,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "", New: "0", }, + { + Type: DiffTypeAdded, + Name: "Leader", + Old: "", + New: "false", + }, }, }, { @@ -1811,6 +1817,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "0", New: "", }, + { + Type: DiffTypeDeleted, + Name: "Leader", + Old: "false", + New: "", + }, }, }, }, @@ -1880,6 +1892,7 @@ func TestTaskDiff(t *testing.T) { "foo": "bar", }, KillTimeout: 1 * time.Second, + Leader: true, }, New: &Task{ Name: "foo", @@ -1892,6 +1905,7 @@ func TestTaskDiff(t *testing.T) { "foo": "bar", }, KillTimeout: 1 * time.Second, + Leader: true, }, Expected: &TaskDiff{ Type: DiffTypeNone, @@ -1911,6 +1925,7 @@ func TestTaskDiff(t *testing.T) { "foo": "bar", }, KillTimeout: 1 * time.Second, + Leader: true, }, New: &Task{ Name: "foo", @@ -1923,6 +1938,7 @@ func TestTaskDiff(t *testing.T) { "foo": "baz", }, KillTimeout: 2 * time.Second, + Leader: false, }, Expected: &TaskDiff{ Type: DiffTypeEdited, @@ -1946,6 +1962,12 @@ func TestTaskDiff(t *testing.T) { Old: "1000000000", New: "2000000000", }, + { + Type: DiffTypeEdited, + Name: "Leader", + Old: "true", + New: "false", + }, { Type: DiffTypeEdited, Name: "Meta[foo]", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1c24b087215..0b29d7f9a03 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1937,8 +1937,9 @@ func (tg *TaskGroup) Validate() error { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have an ephemeral disk object", tg.Name)) } - // Check for duplicate tasks + // Check for duplicate tasks and that there is only leader task if any tasks := make(map[string]int) + leaderTasks := 0 for idx, task := range tg.Tasks { if task.Name == "" { mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %d missing name", idx+1)) @@ -1947,6 +1948,14 @@ func (tg *TaskGroup) Validate() error { } else { tasks[task.Name] = idx } + + if task.Leader { + leaderTasks++ + } + } + + if leaderTasks > 1 { + mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader")) } // Validate the tasks @@ -2295,6 +2304,10 @@ type Task struct { // Artifacts is a list of artifacts to download and extract before running // the task. Artifacts []*TaskArtifact + + // Leader marks the task as the leader within the group. When the leader + // task exits, other tasks will be gracefully terminated. + Leader bool } func (t *Task) Copy() *Task { @@ -2781,12 +2794,15 @@ const ( // TaskSiblingFailed indicates that a sibling task in the task group has // failed. - TaskSiblingFailed = "Sibling task failed" + TaskSiblingFailed = "Sibling Task Failed" // TaskDriverMessage is an informational event message emitted by // drivers such as when they're performing a long running action like // downloading an image. TaskDriverMessage = "Driver" + + // TaskLeaderDead indicates that the leader task within the has finished. + TaskLeaderDead = "Leader Task Dead" ) // TaskEvent is an event that effects the state of a task and contains meta-data diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index e9bc0248923..9cbfd26a174 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -419,8 +419,8 @@ func TestTaskGroup_Validate(t *testing.T) { Name: "web", Count: 1, Tasks: []*Task{ - &Task{Name: "web"}, - &Task{Name: "web"}, + &Task{Name: "web", Leader: true}, + &Task{Name: "web", Leader: true}, &Task{}, }, RestartPolicy: &RestartPolicy{ @@ -442,7 +442,10 @@ func TestTaskGroup_Validate(t *testing.T) { if !strings.Contains(mErr.Errors[2].Error(), "Task 3 missing name") { t.Fatalf("err: %s", err) } - if !strings.Contains(mErr.Errors[3].Error(), "Task web validation failed") { + if !strings.Contains(mErr.Errors[3].Error(), "Only one task may be marked as leader") { + t.Fatalf("err: %s", err) + } + if !strings.Contains(mErr.Errors[4].Error(), "Task web validation failed") { t.Fatalf("err: %s", err) } } diff --git a/website/source/docs/http/alloc.html.md b/website/source/docs/http/alloc.html.md index 106e85cbbd8..a3aeb3a8c5b 100644 --- a/website/source/docs/http/alloc.html.md +++ b/website/source/docs/http/alloc.html.md @@ -267,6 +267,8 @@ be specified using the `?region=` query parameter. * `Failed Artifact Download` - Artifact(s) specified in the task failed to download. * `Restart Signaled` - The task was signalled to be restarted. * `Signaling` - The task was is being sent a signal. - * `Sibling task failed` - A task in the same task group failed. + * `Sibling Task Failed` - A task in the same task group failed. + * `Leader Task Dead` - The group's leader task is dead. + * `Driver` - A message from the driver. Depending on the type the event will have applicable annotations. diff --git a/website/source/docs/http/json-jobs.html.md b/website/source/docs/http/json-jobs.html.md index 7d8cc3bbe92..48f600925b9 100644 --- a/website/source/docs/http/json-jobs.html.md +++ b/website/source/docs/http/json-jobs.html.md @@ -358,6 +358,10 @@ The `Task` object supports the following keys: sends `SIGTERM` if the task doesn't die after the `KillTimeout` duration has elapsed. The default `KillTimeout` is 5 seconds. +* `leader` - Specifies whether the task is the leader task of the task group. If + set to true, when the leader task completes, all other tasks within the task + group will be gracefully shutdown. + * `LogConfig` - This allows configuring log rotation for the `stdout` and `stderr` buffers of a Task. See the log rotation reference below for more details. @@ -681,6 +685,9 @@ README][ct]. "SIGUSR1" or "SIGINT". This option is required if the `ChangeMode` is `signal`. +* `perms` - Specifies the rendered template's permissions. File permissions are + given as octal of the unix file permissions rwxrwxrwx. + * `Splay` - Specifies a random amount of time to wait between 0ms and the given splay value before invoking the change mode. Should be specified in nanoseconds. diff --git a/website/source/docs/job-specification/task.html.md b/website/source/docs/job-specification/task.html.md index 0bc47c42cc5..76be98ca130 100644 --- a/website/source/docs/job-specification/task.html.md +++ b/website/source/docs/job-specification/task.html.md @@ -52,6 +52,10 @@ job "docs" { If the task does not exit before the configured timeout, `SIGKILL` is sent to the task. +- `leader` `(bool: false)` - Specifies whether the task is the leader task of + the task group. If set to true, when the leader task completes, all other + tasks within the task group will be gracefully shutdown. + - `logs` ([Logs][]: nil) - Specifies logging configuration for the `stdout` and `stderr` of the task. diff --git a/website/source/docs/operating-a-job/accessing-logs.html.md b/website/source/docs/operating-a-job/accessing-logs.html.md index e0daf7520ea..c006bada177 100644 --- a/website/source/docs/operating-a-job/accessing-logs.html.md +++ b/website/source/docs/operating-a-job/accessing-logs.html.md @@ -79,9 +79,9 @@ $ nomad logs -stderr 04d9627d While the logs command works well for quickly accessing application logs, it generally does not scale to large systems or systems that produce a lot of log -output, especially for the long-term storage of logs. Nomad only retains log -files for a configurable period of time, so chatty applications should use a -better log retention strategy. +output, especially for the long-term storage of logs. Nomad's retention of log +files is best effort, so chatty applications should use a better log retention +strategy. Since applications log to the `alloc/` directory, all tasks within the same task group have access to each others logs. Thus it is possible to have a task group @@ -91,6 +91,10 @@ as follows: group "my-group" { task "server" { # ... + + # Setting the server task as the leader of the task group allows us to + # signal the log shipper task to gracefully shutdown when the server exits. + leader = true } task "log-shipper" { @@ -103,3 +107,11 @@ In the above example, the `server` task is the application that should be run and will be producing the logs. The `log-shipper` reads those logs from the `alloc/logs/` directory and sends them to a longer-term storage solution such as Amazon S3 or an internal log aggregation system. + +When using the log shipper pattern, especially for batch jobs, the main task +should be marked as the [leader task](/docs/job-specification/task.html#leader). +By marking the main task as a leader, when the task completes all other tasks +within the group will be gracefully shutdown. This allows the log shipper to +finish sending any logs and then exiting itself. The log shipper should set a +high enough [`kill_timeout`](/docs/job-specification/task.html#kill_timeout) +such that it can ship any remaining logs before exiting.