Skip to content

Commit

Permalink
Merge pull request #2128 from hashicorp/f-dispatch
Browse files Browse the repository at this point in the history
Nomad Constructor Jobs and Dispatch
  • Loading branch information
dadgar authored Jan 5, 2017
2 parents 9a3447d + 6f2e752 commit 248c069
Show file tree
Hide file tree
Showing 51 changed files with 5,148 additions and 208 deletions.
58 changes: 56 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,21 @@ func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta,
return &resp, qm, nil
}

func (j *Jobs) Dispatch(jobID string, meta map[string]string,
payload []byte, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
var resp JobDispatchResponse
req := &JobDispatchRequest{
JobID: jobID,
Meta: meta,
Payload: payload,
}
wm, err := j.client.write("/v1/job/"+jobID+"/dispatch", req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, wm, nil
}

// periodicForceResponse is used to deserialize a force response
type periodicForceResponse struct {
EvalID string
Expand All @@ -198,6 +213,13 @@ type PeriodicConfig struct {
ProhibitOverlap bool
}

// ConstructorConfig is used to configure the constructor job
type ConstructorConfig struct {
Payload string
MetaRequired []string
MetaOptional []string
}

// Job is used to serialize a job.
type Job struct {
Region string
Expand All @@ -212,6 +234,8 @@ type Job struct {
TaskGroups []*TaskGroup
Update *UpdateStrategy
Periodic *PeriodicConfig
Constructor *ConstructorConfig
Payload []byte
Meta map[string]string
VaultToken string
Status string
Expand All @@ -223,14 +247,30 @@ type Job struct {

// JobSummary summarizes the state of the allocations of a job
type JobSummary struct {
JobID string
Summary map[string]TaskGroupSummary
JobID string
Summary map[string]TaskGroupSummary
Children *JobChildrenSummary

// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}

// JobChildrenSummary contains the summary of children job status
type JobChildrenSummary struct {
Pending int64
Running int64
Dead int64
}

func (jc *JobChildrenSummary) Sum() int {
if jc == nil {
return 0
}

return int(jc.Pending + jc.Running + jc.Dead)
}

// TaskGroup summarizes the state of all the allocations of a particular
// TaskGroup
type TaskGroupSummary struct {
Expand Down Expand Up @@ -413,3 +453,17 @@ type DesiredUpdates struct {
InPlaceUpdate uint64
DestructiveUpdate uint64
}

type JobDispatchRequest struct {
JobID string
Payload []byte
Meta map[string]string
}

type JobDispatchResponse struct {
DispatchedJobID string
EvalID string
EvalCreateIndex uint64
JobCreateIndex uint64
QueryMeta
}
34 changes: 20 additions & 14 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,22 +141,28 @@ type LogConfig struct {
MaxFileSizeMB int
}

// DispatchInputConfig configures how a task gets its input from a job dispatch
type DispatchInputConfig struct {
File string
}

// Task is a single process in a task group.
type Task struct {
Name string
Driver string
User string
Config map[string]interface{}
Constraints []*Constraint
Env map[string]string
Services []Service
Resources *Resources
Meta map[string]string
KillTimeout time.Duration
LogConfig *LogConfig
Artifacts []*TaskArtifact
Vault *Vault
Templates []*Template
Name string
Driver string
User string
Config map[string]interface{}
Constraints []*Constraint
Env map[string]string
Services []Service
Resources *Resources
Meta map[string]string
KillTimeout time.Duration
LogConfig *LogConfig
Artifacts []*TaskArtifact
Vault *Vault
Templates []*Template
DispatchInput *DispatchInputConfig
}

// TaskArtifact is used to download artifacts before running a task.
Expand Down
10 changes: 5 additions & 5 deletions client/allocdir/alloc_dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func (d *AllocDir) LogDir() string {

// List returns the list of files at a path relative to the alloc dir
func (d *AllocDir) List(path string) ([]*AllocFileInfo, error) {
if escapes, err := structs.PathEscapesAllocDir(path); err != nil {
if escapes, err := structs.PathEscapesAllocDir("", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
Expand All @@ -437,7 +437,7 @@ func (d *AllocDir) List(path string) ([]*AllocFileInfo, error) {

// Stat returns information about the file at a path relative to the alloc dir
func (d *AllocDir) Stat(path string) (*AllocFileInfo, error) {
if escapes, err := structs.PathEscapesAllocDir(path); err != nil {
if escapes, err := structs.PathEscapesAllocDir("", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
Expand All @@ -460,7 +460,7 @@ func (d *AllocDir) Stat(path string) (*AllocFileInfo, error) {

// ReadAt returns a reader for a file at the path relative to the alloc dir
func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
if escapes, err := structs.PathEscapesAllocDir(path); err != nil {
if escapes, err := structs.PathEscapesAllocDir("", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
Expand Down Expand Up @@ -489,7 +489,7 @@ func (d *AllocDir) ReadAt(path string, offset int64) (io.ReadCloser, error) {
// BlockUntilExists blocks until the passed file relative the allocation
// directory exists. The block can be cancelled with the passed tomb.
func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) (chan error, error) {
if escapes, err := structs.PathEscapesAllocDir(path); err != nil {
if escapes, err := structs.PathEscapesAllocDir("", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
Expand All @@ -510,7 +510,7 @@ func (d *AllocDir) BlockUntilExists(path string, t *tomb.Tomb) (chan error, erro
// allocation directory. The offset should be the last read offset. The tomb is
// used to clean up the watch.
func (d *AllocDir) ChangeEvents(path string, curOffset int64, t *tomb.Tomb) (*watch.FileChanges, error) {
if escapes, err := structs.PathEscapesAllocDir(path); err != nil {
if escapes, err := structs.PathEscapesAllocDir("", path); err != nil {
return nil, fmt.Errorf("Failed to check if path escapes alloc directory: %v", err)
} else if escapes {
return nil, fmt.Errorf("Path escapes the alloc directory")
Expand Down
5 changes: 1 addition & 4 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,8 @@ func GetTaskEnv(allocDir *allocdir.AllocDir, node *structs.Node,
task *structs.Task, alloc *structs.Allocation, conf *config.Config,
vaultToken string) (*env.TaskEnvironment, error) {

tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
env := env.NewTaskEnvironment(node).
SetTaskMeta(task.Meta).
SetTaskGroupMeta(tg.Meta).
SetJobMeta(alloc.Job.Meta).
SetTaskMeta(alloc.Job.CombinedTaskMeta(alloc.TaskGroup, task.Name)).
SetJobName(alloc.Job.Name).
SetEnvvars(task.Env).
SetTaskName(task.Name)
Expand Down
1 change: 1 addition & 0 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func TestDriver_GetTaskEnv(t *testing.T) {
}

alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Tasks[0] = task
alloc.Name = "Bar"
env, err := GetTaskEnv(nil, nil, task, alloc, testConfig(), "")
if err != nil {
Expand Down
30 changes: 3 additions & 27 deletions client/driver/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ const (
type TaskEnvironment struct {
Env map[string]string
TaskMeta map[string]string
TaskGroupMeta map[string]string
JobMeta map[string]string
AllocDir string
TaskDir string
SecretsDir string
Expand Down Expand Up @@ -139,11 +137,9 @@ func (t *TaskEnvironment) Build() *TaskEnvironment {
t.NodeValues = make(map[string]string)
t.TaskEnv = make(map[string]string)

// Build the meta with the following precedence: task, task group, job.
for _, meta := range []map[string]string{t.JobMeta, t.TaskGroupMeta, t.TaskMeta} {
for k, v := range meta {
t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
}
// Build the meta
for k, v := range t.TaskMeta {
t.TaskEnv[fmt.Sprintf("%s%s", MetaPrefix, strings.ToUpper(k))] = v
}

// Build the ports
Expand Down Expand Up @@ -337,26 +333,6 @@ func (t *TaskEnvironment) ClearTaskMeta() *TaskEnvironment {
return t
}

func (t *TaskEnvironment) SetTaskGroupMeta(m map[string]string) *TaskEnvironment {
t.TaskGroupMeta = m
return t
}

func (t *TaskEnvironment) ClearTaskGroupMeta() *TaskEnvironment {
t.TaskGroupMeta = nil
return t
}

func (t *TaskEnvironment) SetJobMeta(m map[string]string) *TaskEnvironment {
t.JobMeta = m
return t
}

func (t *TaskEnvironment) ClearJobMeta() *TaskEnvironment {
t.JobMeta = nil
return t
}

func (t *TaskEnvironment) SetEnvvars(m map[string]string) *TaskEnvironment {
t.Env = m
return t
Expand Down
22 changes: 0 additions & 22 deletions client/driver/env/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ func TestEnvironment_AsList(t *testing.T) {
env := NewTaskEnvironment(n).
SetNetworks(networks).
SetPortMap(portMap).
SetTaskGroupMeta(map[string]string{"foo": "bar", "baz": "bam"}).
SetTaskMeta(map[string]string{"foo": "baz"}).Build()

act := env.EnvList()
Expand All @@ -154,7 +153,6 @@ func TestEnvironment_AsList(t *testing.T) {
"NOMAD_HOST_PORT_http=80",
"NOMAD_HOST_PORT_https=8080",
"NOMAD_META_FOO=baz",
"NOMAD_META_BAZ=bam",
}
sort.Strings(act)
sort.Strings(exp)
Expand Down Expand Up @@ -259,23 +257,3 @@ func TestEnvironment_AppendHostEnvVars(t *testing.T) {
t.Fatalf("Didn't filter environment variable %q", skip)
}
}

func TestEnvironment_MetaPrecedence(t *testing.T) {
n := mock.Node()
env := NewTaskEnvironment(n).
SetJobMeta(map[string]string{"foo": "job", "bar": "job", "baz": "job"}).
SetTaskGroupMeta(map[string]string{"foo": "tg", "bar": "tg"}).
SetTaskMeta(map[string]string{"foo": "task"}).Build()

act := env.EnvList()
exp := []string{
"NOMAD_META_FOO=task",
"NOMAD_META_BAR=tg",
"NOMAD_META_BAZ=job",
}
sort.Strings(act)
sort.Strings(exp)
if !reflect.DeepEqual(act, exp) {
t.Fatalf("env.List() returned %v; want %v", act, exp)
}
}
33 changes: 33 additions & 0 deletions client/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"time"

"github.com/armon/go-metrics"
"github.com/golang/snappy"
"github.com/hashicorp/consul-template/signals"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/getter"
Expand Down Expand Up @@ -85,6 +87,9 @@ type TaskRunner struct {
// downloaded
artifactsDownloaded bool

// payloadRendered tracks whether the payload has been rendered to disk
payloadRendered bool

// vaultFuture is the means to wait for and get a Vault token
vaultFuture *tokenFuture

Expand Down Expand Up @@ -129,6 +134,7 @@ type taskRunnerState struct {
Task *structs.Task
HandleID string
ArtifactDownloaded bool
PayloadRendered bool
}

// TaskStateUpdater is used to signal that tasks state has changed.
Expand Down Expand Up @@ -231,6 +237,7 @@ func (r *TaskRunner) RestoreState() error {
r.task = snap.Task
}
r.artifactsDownloaded = snap.ArtifactDownloaded
r.payloadRendered = snap.PayloadRendered

if err := r.setTaskEnv(); err != nil {
return fmt.Errorf("client: failed to create task environment for task %q in allocation %q: %v",
Expand Down Expand Up @@ -293,6 +300,7 @@ func (r *TaskRunner) SaveState() error {
Task: r.task,
Version: r.config.Version,
ArtifactDownloaded: r.artifactsDownloaded,
PayloadRendered: r.payloadRendered,
}
r.handleLock.Lock()
if r.handle != nil {
Expand Down Expand Up @@ -713,6 +721,31 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
return
}

// If the job is a dispatch job and there is a payload write it to disk
requirePayload := len(r.alloc.Job.Payload) != 0 &&
(r.task.DispatchInput != nil && r.task.DispatchInput.File != "")
if !r.payloadRendered && requirePayload {
renderTo := filepath.Join(r.taskDir, allocdir.TaskLocal, r.task.DispatchInput.File)
decoded, err := snappy.Decode(nil, r.alloc.Job.Payload)
if err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
resultCh <- false
return
}

if err := ioutil.WriteFile(renderTo, decoded, 0777); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
resultCh <- false
return
}

r.payloadRendered = true
}

for {
// Download the task's artifacts
if !r.artifactsDownloaded && len(r.task.Artifacts) > 0 {
Expand Down
Loading

0 comments on commit 248c069

Please sign in to comment.