Skip to content

Commit

Permalink
tr: implement dispatch payload hook
Browse files Browse the repository at this point in the history
Now passing the TaskDir struct to prestart hooks instead of just the
root task dir itself as dispatch needs local/.
  • Loading branch information
schmichael committed Oct 16, 2018
1 parent f13a094 commit a44e82f
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 8 deletions.
5 changes: 3 additions & 2 deletions client/allocrunner/interfaces/task_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package interfaces
import (
"context"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/structs"
Expand Down Expand Up @@ -49,8 +50,8 @@ type TaskPrestartRequest struct {
// Vault token may optionally be set if a Vault token is available
VaultToken string

// TaskDir is the task's directory on the host
TaskDir string
// TaskDir contains the task's directory tree on the host
TaskDir *allocdir.TaskDir

// TaskEnv is the task's environment
TaskEnv *env.TaskEnv
Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/artifact_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (h *artifactHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar

for _, artifact := range req.Task.Artifacts {
//XXX add ctx to GetArtifact to allow cancelling long downloads
if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir); err != nil {
if err := getter.GetArtifact(req.TaskEnv, artifact, req.TaskDir.Dir); err != nil {
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
h.logger.Debug(wrapped.Error())
h.eventEmitter.EmitEvent(structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))
Expand Down
71 changes: 71 additions & 0 deletions client/allocrunner/taskrunner/dispatch_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package taskrunner

import (
"context"
"io/ioutil"
"os"
"path/filepath"

"github.com/golang/snappy"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
)

// dispatchHook writes a dispatch payload to the task dir
type dispatchHook struct {
payload []byte

logger hclog.Logger
}

func newDispatchHook(alloc *structs.Allocation, logger hclog.Logger) *dispatchHook {
h := &dispatchHook{
payload: alloc.Job.Payload,
}
h.logger = logger.Named(h.Name())
return h
}

func (*dispatchHook) Name() string {
return "dispatch_payload"
}

func (h *dispatchHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
if len(h.payload) == 0 || req.Task.DispatchPayload == nil || req.Task.DispatchPayload.File == "" {
// No dispatch payload
resp.Done = true
return nil
}

err := writeDispatchPayload(req.TaskDir.LocalDir, req.Task.DispatchPayload.File, h.payload)
if err != nil {
return err
}

h.logger.Trace("dispatch payload written",
"path", req.TaskDir.LocalDir,
"filename", req.Task.DispatchPayload.File,
"bytes", len(h.payload),
)

// Dispatch payload written successfully; mark as done
resp.Done = true
return nil
}

// writeDispatchPayload writes the payload to the given file or returns an
// error.
func writeDispatchPayload(base, filename string, payload []byte) error {
renderTo := filepath.Join(base, filename)
decoded, err := snappy.Decode(nil, payload)
if err != nil {
return err
}

if err := os.MkdirAll(filepath.Dir(renderTo), 0777); err != nil {
return err
}

return ioutil.WriteFile(renderTo, decoded, 0777)
}
144 changes: 144 additions & 0 deletions client/allocrunner/taskrunner/dispatch_hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package taskrunner

import (
"context"
"io/ioutil"
"path/filepath"
"testing"

"github.com/golang/snappy"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)

// Statically assert the stats hook implements the expected interfaces
var _ interfaces.TaskPrestartHook = (*dispatchHook)(nil)

// TestTaskRunner_DispatchHook_NoPayload asserts that the hook is a noop and is
// marked as done if there is no dispatch payload.
func TestTaskRunner_DispatchHook_NoPayload(t *testing.T) {
t.Parallel()

require := require.New(t)
ctx := context.Background()
logger := testlog.HCLogger(t)
allocDir := allocdir.NewAllocDir(logger, "nomadtest_nopayload")
defer allocDir.Destroy()

// Default mock alloc/job is not a dispatch job
alloc := mock.BatchAlloc()
task := alloc.Job.TaskGroups[0].Tasks[0]
taskDir := allocDir.NewTaskDir(task.Name)
require.NoError(taskDir.Build(false, nil, cstructs.FSIsolationNone))

h := newDispatchHook(alloc, logger)

req := interfaces.TaskPrestartRequest{
Task: task,
TaskDir: taskDir,
}
resp := interfaces.TaskPrestartResponse{}

// Assert no error and Done=true as this job has no payload
require.NoError(h.Prestart(ctx, &req, &resp))
require.True(resp.Done)

// Assert payload directory is empty
files, err := ioutil.ReadDir(req.TaskDir.LocalDir)
require.NoError(err)
require.Empty(files)
}

// TestTaskRunner_DispatchHook_Ok asserts that dispatch payloads are written to
// a file in the task dir.
func TestTaskRunner_DispatchHook_Ok(t *testing.T) {
t.Parallel()

require := require.New(t)
ctx := context.Background()
logger := testlog.HCLogger(t)
allocDir := allocdir.NewAllocDir(logger, "nomadtest_dispatchok")
defer allocDir.Destroy()

// Default mock alloc/job is not a dispatch job; update it
alloc := mock.BatchAlloc()
alloc.Job.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: structs.DispatchPayloadRequired,
}
expected := []byte("hello world")
alloc.Job.Payload = snappy.Encode(nil, expected)

// Set the filename and create the task dir
task := alloc.Job.TaskGroups[0].Tasks[0]
task.DispatchPayload = &structs.DispatchPayloadConfig{
File: "out",
}
taskDir := allocDir.NewTaskDir(task.Name)
require.NoError(taskDir.Build(false, nil, cstructs.FSIsolationNone))

h := newDispatchHook(alloc, logger)

req := interfaces.TaskPrestartRequest{
Task: task,
TaskDir: taskDir,
}
resp := interfaces.TaskPrestartResponse{}
require.NoError(h.Prestart(ctx, &req, &resp))
require.True(resp.Done)

filename := filepath.Join(req.TaskDir.LocalDir, task.DispatchPayload.File)
result, err := ioutil.ReadFile(filename)
require.NoError(err)
require.Equal(expected, result)
}

// TestTaskRunner_DispatchHook_Error asserts that on an error dispatch payloads
// are not written and Done=false.
func TestTaskRunner_DispatchHook_Error(t *testing.T) {
t.Parallel()

require := require.New(t)
ctx := context.Background()
logger := testlog.HCLogger(t)
allocDir := allocdir.NewAllocDir(logger, "nomadtest_dispatcherr")
defer allocDir.Destroy()

// Default mock alloc/job is not a dispatch job; update it
alloc := mock.BatchAlloc()
alloc.Job.ParameterizedJob = &structs.ParameterizedJobConfig{
Payload: structs.DispatchPayloadRequired,
}

// Cause an error by not snappy encoding the payload
alloc.Job.Payload = []byte("hello world")

// Set the filename and create the task dir
task := alloc.Job.TaskGroups[0].Tasks[0]
task.DispatchPayload = &structs.DispatchPayloadConfig{
File: "out",
}
taskDir := allocDir.NewTaskDir(task.Name)
require.NoError(taskDir.Build(false, nil, cstructs.FSIsolationNone))

h := newDispatchHook(alloc, logger)

req := interfaces.TaskPrestartRequest{
Task: task,
TaskDir: taskDir,
}
resp := interfaces.TaskPrestartResponse{}

// Assert an error was returned and Done=false
require.Error(h.Prestart(ctx, &req, &resp))
require.False(resp.Done)

// Assert payload directory is empty
files, err := ioutil.ReadDir(req.TaskDir.LocalDir)
require.NoError(err)
require.Empty(files)
}
5 changes: 3 additions & 2 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (tr *TaskRunner) initHooks() {
newValidateHook(tr.clientConfig, hookLogger),
newTaskDirHook(tr, hookLogger),
newLogMonHook(tr.logmonHookConfig, hookLogger),
newDispatchHook(tr.Alloc(), hookLogger),
newArtifactHook(tr, hookLogger),
newShutdownDelayHook(task.ShutdownDelay, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
Expand Down Expand Up @@ -97,7 +98,7 @@ func (tr *TaskRunner) prestart() error {
// Build the request
req := interfaces.TaskPrestartRequest{
Task: tr.Task(),
TaskDir: tr.taskDir.Dir,
TaskDir: tr.taskDir,
TaskEnv: tr.envBuilder.Build(),
}

Expand Down Expand Up @@ -150,7 +151,7 @@ func (tr *TaskRunner) prestart() error {

if tr.logger.IsTrace() {
end := time.Now()
tr.logger.Trace("finished prestart hooks", "name", name, "end", end, "duration", end.Sub(start))
tr.logger.Trace("finished prestart hook", "name", name, "end", end, "duration", end.Sub(start))
}
}

Expand Down
2 changes: 1 addition & 1 deletion client/allocrunner/taskrunner/template_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (h *templateHook) Prestart(ctx context.Context, req *interfaces.TaskPrestar
}

// Store the current Vault token and the task directory
h.taskDir = req.TaskDir
h.taskDir = req.TaskDir.Dir
h.vaultToken = req.VaultToken
unblockCh, err := h.newManager()
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions client/allocrunner/taskrunner/vault_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/hashicorp/consul-template/signals"
log "github.com/hashicorp/go-hclog"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/vaultclient"
Expand Down Expand Up @@ -130,7 +129,7 @@ func (h *vaultHook) Prestart(ctx context.Context, req *interfaces.TaskPrestartRe
// Try to recover a token if it was previously written in the secrets
// directory
recoveredToken := ""
h.tokenPath = filepath.Join(req.TaskDir, allocdir.TaskSecrets, vaultTokenFile)
h.tokenPath = filepath.Join(req.TaskDir.SecretsDir, vaultTokenFile)
data, err := ioutil.ReadFile(h.tokenPath)
if err != nil {
if !os.IsNotExist(err) {
Expand Down

0 comments on commit a44e82f

Please sign in to comment.