Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader task #2308

Merged
merged 6 commits into from
Feb 14, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ type Task struct {
Vault *Vault
Templates []*Template
DispatchPayload *DispatchPayloadConfig
Leader bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a pointer

}

// TaskArtifact is used to download artifacts before running a task.
Expand Down Expand Up @@ -256,10 +257,10 @@ const (
TaskNotRestarting = "Not Restarting"
TaskDownloadingArtifacts = "Downloading Artifacts"
TaskArtifactDownloadFailed = "Failed Artifact Download"
TaskVaultRenewalFailed = "Vault token renewal failed"
TaskSiblingFailed = "Sibling task failed"
TaskSiblingFailed = "Sibling Task Failed"
TaskSignaling = "Signaling"
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand Down
36 changes: 28 additions & 8 deletions client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,19 +383,39 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv

taskState.State = state
if state == structs.TaskStateDead {
// Find all tasks that are not the one that is dead and check if the one
// that is dead is a leader
var otherTaskRunners []*TaskRunner
var otherTaskNames []string
leader := false
for task, tr := range r.tasks {
if task != taskName {
otherTaskRunners = append(otherTaskRunners, tr)
otherTaskNames = append(otherTaskNames, task)
} else if tr.task.Leader {
leader = true
}
}

// If the task failed, we should kill all the other tasks in the task group.
if taskState.Failed {
var destroyingTasks []string
for task, tr := range r.tasks {
if task != taskName {
destroyingTasks = append(destroyingTasks, task)
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
}
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskSiblingFailed).SetFailedSibling(taskName))
}
if len(destroyingTasks) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, destroyingTasks)
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: task %q failed, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
} else if leader {
for _, tr := range otherTaskRunners {
tr.Destroy(structs.NewTaskEvent(structs.TaskLeaderDead))
}
if len(otherTaskRunners) > 0 {
r.logger.Printf("[DEBUG] client: leader task %q is dead, destroying other tasks in task group: %v", taskName, otherTaskNames)
}
}

// If the task was a leader task we should kill all the other
// tasks.
}

select {
Expand Down
64 changes: 64 additions & 0 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,70 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
})
}

func TestAllocRunner_TaskLeader_KillTG(t *testing.T) {
upd, ar := testAllocRunner(false)

// Create two tasks in the task group
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Driver = "mock_driver"
task.KillTimeout = 10 * time.Millisecond
task.Config = map[string]interface{}{
"run_for": "10s",
}

task2 := ar.alloc.Job.TaskGroups[0].Tasks[0].Copy()
task2.Name = "task 2"
task2.Driver = "mock_driver"
task2.Leader = true
task2.Config = map[string]interface{}{
"run_for": "1s",
}
ar.alloc.Job.TaskGroups[0].Tasks = append(ar.alloc.Job.TaskGroups[0].Tasks, task2)
ar.alloc.TaskResources[task2.Name] = task2.Resources
go ar.Run()

testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, fmt.Errorf("No updates")
}
last := upd.Allocs[upd.Count-1]
if last.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusComplete)
}

// Task One should be killed
state1 := last.TaskStates[task.Name]
if state1.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state1.State, structs.TaskStateDead)
}
if len(state1.Events) < 2 {
// At least have a received and destroyed
return false, fmt.Errorf("Unexpected number of events")
}

found := false
for _, e := range state1.Events {
if e.Type != structs.TaskLeaderDead {
found = true
}
}

if !found {
return false, fmt.Errorf("Did not find event %v", structs.TaskLeaderDead)
}

// Task Two should be dead
state2 := last.TaskStates[task2.Name]
if state2.State != structs.TaskStateDead {
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
}

return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}

func TestAllocRunner_MoveAllocDir(t *testing.T) {
// Create an alloc runner
alloc := mock.Alloc()
Expand Down
2 changes: 2 additions & 0 deletions command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
}
case api.TaskDriverMessage:
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
}

// Reverse order so we are sorted by time
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l
"driver",
"env",
"kill_timeout",
"leader",
"logs",
"meta",
"resources",
Expand Down
1 change: 1 addition & 0 deletions jobspec/parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func TestParse(t *testing.T) {
Perms: "777",
},
},
Leader: true,
},
&structs.Task{
Name: "storagelocker",
Expand Down
1 change: 1 addition & 0 deletions jobspec/test-fixtures/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ job "binstore-storagelocker" {
task "binstore" {
driver = "docker"
user = "bob"
leader = true

config {
image = "hashicorp/binstore"
Expand Down
2 changes: 1 addition & 1 deletion nomad/structs/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (t *Task) Diff(other *Task, contextual bool) (*TaskDiff, error) {
diff.Objects = append(diff.Objects, vDiff)
}

// Artifacts diff
// Template diff
tmplDiffs := primitiveObjectSetDiff(
interfaceSlice(t.Templates),
interfaceSlice(other.Templates),
Expand Down
10 changes: 10 additions & 0 deletions nomad/structs/diff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1880,6 +1880,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
New: &Task{
Name: "foo",
Expand All @@ -1892,6 +1893,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
Expected: &TaskDiff{
Type: DiffTypeNone,
Expand All @@ -1911,6 +1913,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "bar",
},
KillTimeout: 1 * time.Second,
Leader: true,
},
New: &Task{
Name: "foo",
Expand All @@ -1923,6 +1926,7 @@ func TestTaskDiff(t *testing.T) {
"foo": "baz",
},
KillTimeout: 2 * time.Second,
Leader: false,
},
Expected: &TaskDiff{
Type: DiffTypeEdited,
Expand All @@ -1946,6 +1950,12 @@ func TestTaskDiff(t *testing.T) {
Old: "1000000000",
New: "2000000000",
},
{
Type: DiffTypeEdited,
Name: "Leader",
Old: "true",
New: "false",
},
{
Type: DiffTypeEdited,
Name: "Meta[foo]",
Expand Down
20 changes: 18 additions & 2 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,8 +1931,9 @@ func (tg *TaskGroup) Validate() error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have an ephemeral disk object", tg.Name))
}

// Check for duplicate tasks
// Check for duplicate tasks and that there is only leader task if any
tasks := make(map[string]int)
leaderTasks := 0
for idx, task := range tg.Tasks {
if task.Name == "" {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task %d missing name", idx+1))
Expand All @@ -1941,6 +1942,14 @@ func (tg *TaskGroup) Validate() error {
} else {
tasks[task.Name] = idx
}

if task.Leader {
leaderTasks++
}
}

if leaderTasks > 1 {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Only one task may be marked as leader"))
}

// Validate the tasks
Expand Down Expand Up @@ -2289,6 +2298,10 @@ type Task struct {
// Artifacts is a list of artifacts to download and extract before running
// the task.
Artifacts []*TaskArtifact

// Leader marks the task as the leader within the group. When the leader
// task exits, other tasks will be gracefully terminated.
Leader bool
}

func (t *Task) Copy() *Task {
Expand Down Expand Up @@ -2775,12 +2788,15 @@ const (

// TaskSiblingFailed indicates that a sibling task in the task group has
// failed.
TaskSiblingFailed = "Sibling task failed"
TaskSiblingFailed = "Sibling Task Failed"

// TaskDriverMessage is an informational event message emitted by
// drivers such as when they're performing a long running action like
// downloading an image.
TaskDriverMessage = "Driver"

// TaskLeaderDead indicates that the leader task within the has finished.
TaskLeaderDead = "Leader Task Dead"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
Expand Down
9 changes: 6 additions & 3 deletions nomad/structs/structs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,8 @@ func TestTaskGroup_Validate(t *testing.T) {
Name: "web",
Count: 1,
Tasks: []*Task{
&Task{Name: "web"},
&Task{Name: "web"},
&Task{Name: "web", Leader: true},
&Task{Name: "web", Leader: true},
&Task{},
},
RestartPolicy: &RestartPolicy{
Expand All @@ -442,7 +442,10 @@ func TestTaskGroup_Validate(t *testing.T) {
if !strings.Contains(mErr.Errors[2].Error(), "Task 3 missing name") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[3].Error(), "Task web validation failed") {
if !strings.Contains(mErr.Errors[3].Error(), "Only one task may be marked as leader") {
t.Fatalf("err: %s", err)
}
if !strings.Contains(mErr.Errors[4].Error(), "Task web validation failed") {
t.Fatalf("err: %s", err)
}
}
Expand Down
4 changes: 3 additions & 1 deletion website/source/docs/http/alloc.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ be specified using the `?region=` query parameter.
* `Failed Artifact Download` - Artifact(s) specified in the task failed to download.
* `Restart Signaled` - The task was signalled to be restarted.
* `Signaling` - The task was is being sent a signal.
* `Sibling task failed` - A task in the same task group failed.
* `Sibling Task Failed` - A task in the same task group failed.
* `Leader Task Dead` - The group's leader task is dead.
* `Driver` - A message from the driver.

Depending on the type the event will have applicable annotations.
7 changes: 7 additions & 0 deletions website/source/docs/http/json-jobs.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,10 @@ The `Task` object supports the following keys:
sends `SIGTERM` if the task doesn't die after the `KillTimeout` duration has
elapsed. The default `KillTimeout` is 5 seconds.

* `leader` - Specifies whether the task is the leader task of the task group. If
Copy link
Contributor

@jippi jippi Feb 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so useful!

set to true, when the leader task completes, all other tasks within the task
group will be gracefully shutdown.

* `LogConfig` - This allows configuring log rotation for the `stdout` and `stderr`
buffers of a Task. See the log rotation reference below for more details.

Expand Down Expand Up @@ -681,6 +685,9 @@ README][ct].
"SIGUSR1" or "SIGINT". This option is required if the `ChangeMode` is
`signal`.

* `perms` - Specifies the rendered template's permissions. File permissions are
given as octal of the unix file permissions rwxrwxrwx.

* `Splay` - Specifies a random amount of time to wait between 0ms and the given
splay value before invoking the change mode. Should be specified in
nanoseconds.
Expand Down
4 changes: 4 additions & 0 deletions website/source/docs/job-specification/task.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ job "docs" {
If the task does not exit before the configured timeout, `SIGKILL` is sent to
the task.

- `leader` `(bool: false)` - Specifies whether the task is the leader task of
the task group. If set to true, when the leader task completes, all other
tasks within the task group will be gracefully shutdown.

- `logs` <code>([Logs][]: nil)</code> - Specifies logging configuration for the
`stdout` and `stderr` of the task.

Expand Down
18 changes: 15 additions & 3 deletions website/source/docs/operating-a-job/accessing-logs.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ $ nomad logs -stderr 04d9627d

While the logs command works well for quickly accessing application logs, it
generally does not scale to large systems or systems that produce a lot of log
output, especially for the long-term storage of logs. Nomad only retains log
files for a configurable period of time, so chatty applications should use a
better log retention strategy.
output, especially for the long-term storage of logs. Nomad's retention of log
files is best effort, so chatty applications should use a better log retention
strategy.

Since applications log to the `alloc/` directory, all tasks within the same task
group have access to each others logs. Thus it is possible to have a task group
Expand All @@ -91,6 +91,10 @@ as follows:
group "my-group" {
task "server" {
# ...

# Setting the server task as the leader of the task group allows us to
# signal the log shipper task to gracefully shutdown when the server exits.
leader = true
}

task "log-shipper" {
Expand All @@ -103,3 +107,11 @@ In the above example, the `server` task is the application that should be run
and will be producing the logs. The `log-shipper` reads those logs from the
`alloc/logs/` directory and sends them to a longer-term storage solution such as
Amazon S3 or an internal log aggregation system.

When using the log shipper pattern, especially for batch jobs, the main task
should be marked as the [leader task](/docs/job-specification/task.html#leader).
By marking the main task as a leader, when the task completes all other tasks
within the group will be gracefully shutdown. This allows the log shipper to
finish sending any logs and then exiting itself. The log shipper should set a
high enough [`kill_timeout`](/docs/job-specification/task.html#kill_timeout)
such that it can ship any remaining logs before exiting.