Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move event messages logic into task_runner #3486

Merged
merged 9 commits into from
Nov 3, 2017
8 changes: 5 additions & 3 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package api
import (
"fmt"
"path"

"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -591,8 +590,11 @@ const (
// TaskEvent is an event that effects the state of a task and contains meta-data
// appropriate to the events type.
type TaskEvent struct {
Type string
Time int64
Type string
Time int64
DisplayMessage string
Details map[string]string
// DEPRECATION NOTICE: The following fields are all deprecated. see TaskEvent struct in structs.go for details.
FailsTask bool
RestartReason string
SetupError string
Expand Down
6 changes: 3 additions & 3 deletions client/alloc_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_BadStart(t *testing.T) {
assert.NotNil(state)
assert.NotEmpty(state.Events)
last := state.Events[len(state.Events)-1]
assert.Equal(allocHealthEventSource, last.GenericSource)
assert.Equal(allocHealthEventSource, last.Type)
assert.Contains(last.Message, "failed task")
}

Expand Down Expand Up @@ -202,7 +202,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Deadline(t *testing.T) {
assert.NotNil(state)
assert.NotEmpty(state.Events)
last := state.Events[len(state.Events)-1]
assert.Equal(allocHealthEventSource, last.GenericSource)
assert.Equal(allocHealthEventSource, last.Type)
assert.Contains(last.Message, "not running by deadline")
}

Expand Down Expand Up @@ -412,7 +412,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) {
assert.NotNil(state)
assert.NotEmpty(state.Events)
last := state.Events[len(state.Events)-1]
assert.Equal(allocHealthEventSource, last.GenericSource)
assert.Equal(allocHealthEventSource, last.Type)
assert.Contains(last.Message, "Services not healthy by deadline")
}

Expand Down
6 changes: 4 additions & 2 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ func (r *TaskRunner) DestroyState() error {

// setState is used to update the state of the task runner
func (r *TaskRunner) setState(state string, event *structs.TaskEvent, lazySync bool) {
event.PopulateEventDisplayMessage()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put a blank line between.


// Persist our state to disk.
if err := r.SaveState(); err != nil {
r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err)
Expand Down Expand Up @@ -1751,8 +1753,8 @@ func (r *TaskRunner) Kill(source, reason string, fail bool) {
}

func (r *TaskRunner) EmitEvent(source, message string) {
event := structs.NewTaskEvent(structs.TaskGenericMessage).
SetGenericSource(source).SetMessage(message)
event := structs.NewTaskEvent(source).
SetMessage(message)
r.setState("", event, false)
r.logger.Printf("[DEBUG] client: event from %q for task %q in alloc %q: %v",
source, r.task.Name, r.alloc.ID, message)
Expand Down
32 changes: 27 additions & 5 deletions client/task_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,21 +210,43 @@ func TestTaskRunner_SimpleRun(t *testing.T) {
t.Fatalf("TaskState %v; want %v", ctx.upd.state, structs.TaskStateDead)
}

if ctx.upd.events[0].Type != structs.TaskReceived {
event := ctx.upd.events[0]

if event.Type != structs.TaskReceived {
t.Fatalf("First Event was %v; want %v", ctx.upd.events[0].Type, structs.TaskReceived)
}

if ctx.upd.events[1].Type != structs.TaskSetup {
event = ctx.upd.events[1]
if event.Type != structs.TaskSetup {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[1].Type, structs.TaskSetup)
}
displayMsg := event.DisplayMessage

if ctx.upd.events[2].Type != structs.TaskStarted {
if displayMsg != "Building Task Directory" {
t.Fatalf("Bad display message:%v", displayMsg)
}

event = ctx.upd.events[2]
if event.Type != structs.TaskStarted {
t.Fatalf("Second Event was %v; want %v", ctx.upd.events[2].Type, structs.TaskStarted)
}
displayMsg = event.DisplayMessage
if displayMsg != "Task started by client" {
t.Fatalf("Bad display message:%v", displayMsg)
}

if ctx.upd.events[3].Type != structs.TaskTerminated {
t.Fatalf("Third Event was %v; want %v", ctx.upd.events[3].Type, structs.TaskTerminated)
event = ctx.upd.events[3]
if event.Type != structs.TaskTerminated {
t.Fatalf("Third Event was %v; want %v", event.Type, structs.TaskTerminated)
}
displayMsg = event.DisplayMessage
if displayMsg != "Exit Code: 0" {
t.Fatalf("Bad display message:%v", displayMsg)
}
if event.Details["exit_code"] != "0" {
t.Fatalf("Bad details map :%v", event.Details)
}

}

func TestTaskRunner_Run_RecoverableStartError(t *testing.T) {
Expand Down
225 changes: 116 additions & 109 deletions command/alloc_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,120 +325,127 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {

size := len(state.Events)
for i, event := range state.Events {
formatedTime := formatUnixNanoTime(event.Time)

// Build up the description based on the event type.
var desc string
switch event.Type {
case api.TaskSetup:
desc = event.Message
case api.TaskStarted:
desc = "Task started by client"
case api.TaskReceived:
desc = "Task received by client"
case api.TaskFailedValidation:
if event.ValidationError != "" {
desc = event.ValidationError
} else {
desc = "Validation of task failed"
}
case api.TaskSetupFailure:
if event.SetupError != "" {
desc = event.SetupError
} else {
desc = "Task setup failed"
}
case api.TaskDriverFailure:
if event.DriverError != "" {
desc = event.DriverError
} else {
desc = "Failed to start task"
}
case api.TaskDownloadingArtifacts:
desc = "Client is downloading artifacts"
case api.TaskArtifactDownloadFailed:
if event.DownloadError != "" {
desc = event.DownloadError
} else {
desc = "Failed to download artifacts"
}
case api.TaskKilling:
if event.KillReason != "" {
desc = fmt.Sprintf("Killing task: %v", event.KillReason)
} else if event.KillTimeout != 0 {
desc = fmt.Sprintf("Sent interrupt. Waiting %v before force killing", event.KillTimeout)
} else {
desc = "Sent interrupt"
}
case api.TaskKilled:
if event.KillError != "" {
desc = event.KillError
} else {
desc = "Task successfully killed"
}
case api.TaskTerminated:
var parts []string
parts = append(parts, fmt.Sprintf("Exit Code: %d", event.ExitCode))
msg := event.DisplayMessage
if msg == "" {
msg = buildDisplayMessage(event)
}
formattedTime := formatUnixNanoTime(event.Time)
events[size-i] = fmt.Sprintf("%s|%s|%s", formattedTime, event.Type, msg)
// Reverse order so we are sorted by time
}
c.Ui.Output(formatList(events))
}

if event.Signal != 0 {
parts = append(parts, fmt.Sprintf("Signal: %d", event.Signal))
}
func buildDisplayMessage(event *api.TaskEvent) string {
// Build up the description based on the event type.
var desc string
switch event.Type {
case api.TaskSetup:
desc = event.Message
case api.TaskStarted:
desc = "Task started by client"
case api.TaskReceived:
desc = "Task received by client"
case api.TaskFailedValidation:
if event.ValidationError != "" {
desc = event.ValidationError
} else {
desc = "Validation of task failed"
}
case api.TaskSetupFailure:
if event.SetupError != "" {
desc = event.SetupError
} else {
desc = "Task setup failed"
}
case api.TaskDriverFailure:
if event.DriverError != "" {
desc = event.DriverError
} else {
desc = "Failed to start task"
}
case api.TaskDownloadingArtifacts:
desc = "Client is downloading artifacts"
case api.TaskArtifactDownloadFailed:
if event.DownloadError != "" {
desc = event.DownloadError
} else {
desc = "Failed to download artifacts"
}
case api.TaskKilling:
if event.KillReason != "" {
desc = fmt.Sprintf("Killing task: %v", event.KillReason)
} else if event.KillTimeout != 0 {
desc = fmt.Sprintf("Sent interrupt. Waiting %v before force killing", event.KillTimeout)
} else {
desc = "Sent interrupt"
}
case api.TaskKilled:
if event.KillError != "" {
desc = event.KillError
} else {
desc = "Task successfully killed"
}
case api.TaskTerminated:
var parts []string
parts = append(parts, fmt.Sprintf("Exit Code: %d", event.ExitCode))

if event.Message != "" {
parts = append(parts, fmt.Sprintf("Exit Message: %q", event.Message))
}
desc = strings.Join(parts, ", ")
case api.TaskRestarting:
in := fmt.Sprintf("Task restarting in %v", time.Duration(event.StartDelay))
if event.RestartReason != "" && event.RestartReason != client.ReasonWithinPolicy {
desc = fmt.Sprintf("%s - %s", event.RestartReason, in)
} else {
desc = in
}
case api.TaskNotRestarting:
if event.RestartReason != "" {
desc = event.RestartReason
} else {
desc = "Task exceeded restart policy"
}
case api.TaskSiblingFailed:
if event.FailedSibling != "" {
desc = fmt.Sprintf("Task's sibling %q failed", event.FailedSibling)
} else {
desc = "Task's sibling failed"
}
case api.TaskSignaling:
sig := event.TaskSignal
reason := event.TaskSignalReason

if sig == "" && reason == "" {
desc = "Task being sent a signal"
} else if sig == "" {
desc = reason
} else if reason == "" {
desc = fmt.Sprintf("Task being sent signal %v", sig)
} else {
desc = fmt.Sprintf("Task being sent signal %v: %v", sig, reason)
}
case api.TaskRestartSignal:
if event.RestartReason != "" {
desc = event.RestartReason
} else {
desc = "Task signaled to restart"
}
case api.TaskDriverMessage:
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
case api.TaskGenericMessage:
event.Type = event.GenericSource
desc = event.Message
if event.Signal != 0 {
parts = append(parts, fmt.Sprintf("Signal: %d", event.Signal))
}

// Reverse order so we are sorted by time
events[size-i] = fmt.Sprintf("%s|%s|%s", formatedTime, event.Type, desc)
if event.Message != "" {
parts = append(parts, fmt.Sprintf("Exit Message: %q", event.Message))
}
desc = strings.Join(parts, ", ")
case api.TaskRestarting:
in := fmt.Sprintf("Task restarting in %v", time.Duration(event.StartDelay))
if event.RestartReason != "" && event.RestartReason != client.ReasonWithinPolicy {
desc = fmt.Sprintf("%s - %s", event.RestartReason, in)
} else {
desc = in
}
case api.TaskNotRestarting:
if event.RestartReason != "" {
desc = event.RestartReason
} else {
desc = "Task exceeded restart policy"
}
case api.TaskSiblingFailed:
if event.FailedSibling != "" {
desc = fmt.Sprintf("Task's sibling %q failed", event.FailedSibling)
} else {
desc = "Task's sibling failed"
}
case api.TaskSignaling:
sig := event.TaskSignal
reason := event.TaskSignalReason

if sig == "" && reason == "" {
desc = "Task being sent a signal"
} else if sig == "" {
desc = reason
} else if reason == "" {
desc = fmt.Sprintf("Task being sent signal %v", sig)
} else {
desc = fmt.Sprintf("Task being sent signal %v: %v", sig, reason)
}
case api.TaskRestartSignal:
if event.RestartReason != "" {
desc = event.RestartReason
} else {
desc = "Task signaled to restart"
}
case api.TaskDriverMessage:
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
case api.TaskGenericMessage:
event.Type = event.GenericSource
desc = event.Message
}
c.Ui.Output(formatList(events))

return desc
}

// outputTaskResources prints the task resources for the passed task and if
Expand Down
Loading