diff --git a/api/tasks.go b/api/tasks.go index 5450dece120..d09339f34e2 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -92,6 +92,13 @@ type Task struct { Meta map[string]string KillTimeout time.Duration LogConfig *LogConfig + Artifacts []*TaskArtifact +} + +// TaskArtifact is used to download artifacts before running a task. +type TaskArtifact struct { + GetterSource string + GetterOptions map[string]string } // NewTask creates and initializes a new Task. @@ -147,24 +154,27 @@ type TaskState struct { } const ( - TaskDriverFailure = "Driver Failure" - TaskReceived = "Received" - TaskStarted = "Started" - TaskTerminated = "Terminated" - TaskKilled = "Killed" - TaskRestarting = "Restarting" - TaskNotRestarting = "Restarts Exceeded" + TaskDriverFailure = "Driver Failure" + TaskReceived = "Received" + TaskStarted = "Started" + TaskTerminated = "Terminated" + TaskKilled = "Killed" + TaskRestarting = "Restarting" + TaskNotRestarting = "Restarts Exceeded" + TaskDownloadingArtifacts = "Downloading Artifacts" + TaskArtifactDownloadFailed = "Failed Artifact Download" ) // TaskEvent is an event that effects the state of a task and contains meta-data // appropriate to the events type. type TaskEvent struct { - Type string - Time int64 - DriverError string - ExitCode int - Signal int - Message string - KillError string - StartDelay int64 + Type string + Time int64 + DriverError string + ExitCode int + Signal int + Message string + KillError string + StartDelay int64 + DownloadError string } diff --git a/client/driver/driver_test.go b/client/driver/driver_test.go index d1ce7963c0c..ced93cbb593 100644 --- a/client/driver/driver_test.go +++ b/client/driver/driver_test.go @@ -1,6 +1,7 @@ package driver import ( + "io" "log" "math/rand" "os" @@ -38,6 +39,31 @@ func TestMain(m *testing.M) { } } +// copyFile moves an existing file to the destination +func copyFile(src, dst string, t *testing.T) { + in, err := os.Open(src) + if err != nil { + t.Fatalf("copying %v -> %v failed: %v", src, dst, err) + } + defer in.Close() + out, err := os.Create(dst) + if err != nil { + t.Fatalf("copying %v -> %v failed: %v", src, dst, err) + } + defer func() { + if err := out.Close(); err != nil { + t.Fatalf("copying %v -> %v failed: %v", src, dst, err) + } + }() + if _, err = io.Copy(out, in); err != nil { + t.Fatalf("copying %v -> %v failed: %v", src, dst, err) + } + if err := out.Sync(); err != nil { + t.Fatalf("copying %v -> %v failed: %v", src, dst, err) + } + return +} + func testLogger() *log.Logger { return log.New(os.Stderr, "", log.LstdFlags) } diff --git a/client/driver/exec.go b/client/driver/exec.go index b134cc3decd..c832ab9de72 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -15,7 +15,6 @@ import ( "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" - "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" @@ -28,10 +27,8 @@ type ExecDriver struct { } type ExecDriverConfig struct { - ArtifactSource string `mapstructure:"artifact_source"` - Checksum string `mapstructure:"checksum"` - Command string `mapstructure:"command"` - Args []string `mapstructure:"args"` + Command string `mapstructure:"command"` + Args []string `mapstructure:"args"` } // execHandle is returned from Start/Open as a handle to the PID @@ -89,21 +86,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) } - // Check if an artificat is specified and attempt to download it - source, ok := task.Config["artifact_source"] - if ok && source != "" { - // Proceed to download an artifact to be executed. - _, err := getter.GetArtifact( - taskDir, - driverConfig.ArtifactSource, - driverConfig.Checksum, - d.logger, - ) - if err != nil { - return nil, err - } - } - bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) diff --git a/client/driver/exec_test.go b/client/driver/exec_test.go index 43855b10c03..0eb3d7f971f 100644 --- a/client/driver/exec_test.go +++ b/client/driver/exec_test.go @@ -188,54 +188,6 @@ func TestExecDriver_Start_Wait(t *testing.T) { } } -func TestExecDriver_Start_Artifact_basic(t *testing.T) { - t.Parallel() - ctestutils.ExecCompatible(t) - file := "hi_linux_amd64" - checksum := "sha256:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c" - - task := &structs.Task{ - Name: "sleep", - Config: map[string]interface{}{ - "artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s?checksum=%s", file, checksum), - "command": file, - }, - LogConfig: &structs.LogConfig{ - MaxFiles: 10, - MaxFileSizeMB: 10, - }, - Resources: basicResources, - } - - driverCtx, execCtx := testDriverContexts(task) - defer execCtx.AllocDir.Destroy() - d := NewExecDriver(driverCtx) - - handle, err := d.Start(execCtx, task) - if err != nil { - t.Fatalf("err: %v", err) - } - if handle == nil { - t.Fatalf("missing handle") - } - - // Update should be a no-op - err = handle.Update(task) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Task should terminate quickly - select { - case res := <-handle.WaitCh(): - if !res.Successful() { - t.Fatalf("err: %v", res) - } - case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): - t.Fatalf("timeout") - } -} - func TestExecDriver_Start_Wait_AllocDir(t *testing.T) { t.Parallel() ctestutils.ExecCompatible(t) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index dbed461c6b4..3c0596bdbb7 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -5,7 +5,6 @@ import ( "log" "os" "os/exec" - "path/filepath" "runtime" "strings" "sync" @@ -151,11 +150,10 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext e.cmd.Env = ctx.TaskEnv.EnvList() e.cmd.Path = ctx.TaskEnv.ReplaceEnv(command.Cmd) e.cmd.Args = append([]string{e.cmd.Path}, ctx.TaskEnv.ParseAndReplace(command.Args)...) - if filepath.Base(command.Cmd) == command.Cmd { - if lp, err := exec.LookPath(command.Cmd); err != nil { - } else { - e.cmd.Path = lp - } + + // Ensure that the binary being started is executable. + if err := e.makeExecutable(e.cmd.Path); err != nil { + return nil, err } // starting the process @@ -280,3 +278,24 @@ func (e *UniversalExecutor) configureTaskDir() error { e.cmd.Dir = taskDir return nil } + +// makeExecutablePosix makes the given file executable for root,group,others. +func (e *UniversalExecutor) makeExecutablePosix(binPath string) error { + fi, err := os.Stat(binPath) + if err != nil { + if os.IsNotExist(err) { + return fmt.Errorf("binary %q does not exist", binPath) + } + return fmt.Errorf("specified binary is invalid: %v", err) + } + + // If it is not executable, make it so. + perm := fi.Mode().Perm() + req := os.FileMode(0555) + if perm&req != req { + if err := os.Chmod(binPath, perm|req); err != nil { + return fmt.Errorf("error making %q executable: %s", binPath, err) + } + } + return nil +} diff --git a/client/driver/executor/executor_basic.go b/client/driver/executor/executor_basic.go index 9e531a547a7..00480e9dcbd 100644 --- a/client/driver/executor/executor_basic.go +++ b/client/driver/executor/executor_basic.go @@ -3,9 +3,25 @@ package executor import ( + "path/filepath" + "runtime" + cgroupConfig "github.com/opencontainers/runc/libcontainer/configs" ) +func (e *UniversalExecutor) makeExecutable(binPath string) error { + if runtime.GOOS == "windows" { + return nil + } + + path := binPath + if !filepath.IsAbs(binPath) { + // The path must be relative the allocations directory. + path = filepath.Join(e.taskDir, binPath) + } + return e.makeExecutablePosix(path) +} + func (e *UniversalExecutor) configureChroot() error { return nil } diff --git a/client/driver/executor/executor_linux.go b/client/driver/executor/executor_linux.go index da3a89a3f89..eb188c32ed1 100644 --- a/client/driver/executor/executor_linux.go +++ b/client/driver/executor/executor_linux.go @@ -36,6 +36,18 @@ var ( } ) +func (e *UniversalExecutor) makeExecutable(binPath string) error { + path := binPath + if e.ctx.FSIsolation { + // The path must be relative the chroot + path = filepath.Join(e.taskDir, binPath) + } else if !filepath.IsAbs(binPath) { + // The path must be relative the allocations directory. + path = filepath.Join(e.taskDir, binPath) + } + return e.makeExecutablePosix(path) +} + // configureIsolation configures chroot and creates cgroups func (e *UniversalExecutor) configureIsolation() error { if e.ctx.FSIsolation { diff --git a/client/driver/java.go b/client/driver/java.go index e662052678b..c8849f95127 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -21,7 +21,6 @@ import ( "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" - "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" ) @@ -34,10 +33,9 @@ type JavaDriver struct { } type JavaDriverConfig struct { - JvmOpts []string `mapstructure:"jvm_options"` - ArtifactSource string `mapstructure:"artifact_source"` - Checksum string `mapstructure:"checksum"` - Args []string `mapstructure:"args"` + JarPath string `mapstructure:"jar_path"` + JvmOpts []string `mapstructure:"jvm_options"` + Args []string `mapstructure:"args"` } // javaHandle is returned from Start/Open as a handle to the PID @@ -124,19 +122,10 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) } - // Proceed to download an artifact to be executed. - path, err := getter.GetArtifact( - taskDir, - driverConfig.ArtifactSource, - driverConfig.Checksum, - d.logger, - ) - if err != nil { - return nil, err + if driverConfig.JarPath == "" { + return nil, fmt.Errorf("jar_path must be specified") } - jarName := filepath.Base(path) - args := []string{} // Look for jvm options if len(driverConfig.JvmOpts) != 0 { @@ -145,7 +134,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } // Build the argument list. - args = append(args, "-jar", jarName) + args = append(args, "-jar", driverConfig.JarPath) if len(driverConfig.Args) != 0 { args = append(args, driverConfig.Args...) } @@ -160,7 +149,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, Cmd: exec.Command(bin, "executor", pluginLogFile), } - exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) + execIntf, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { return nil, err } @@ -175,7 +164,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, ResourceLimits: true, } - ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx) + absPath, err := GetAbsolutePath("java") + if err != nil { + return nil, err + } + + ps, err := execIntf.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: args}, executorCtx) if err != nil { pluginClient.Kill() return nil, fmt.Errorf("error starting process via the plugin: %v", err) @@ -186,7 +180,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, maxKill := d.DriverContext.config.MaxKillTimeout h := &javaHandle{ pluginClient: pluginClient, - executor: exec, + executor: execIntf, userPid: ps.Pid, isolationConfig: ps.IsolationConfig, taskDir: taskDir, diff --git a/client/driver/java_test.go b/client/driver/java_test.go index e49bb095b41..eeca35b41f5 100644 --- a/client/driver/java_test.go +++ b/client/driver/java_test.go @@ -58,9 +58,8 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { task := &structs.Task{ Name: "demo-app", Config: map[string]interface{}{ - "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", - "jvm_options": []string{"-Xmx64m", "-Xms32m"}, - "checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8", + "jar_path": "demoapp.jar", + "jvm_options": []string{"-Xmx64m", "-Xms32m"}, }, LogConfig: &structs.LogConfig{ MaxFiles: 10, @@ -73,6 +72,10 @@ func TestJavaDriver_StartOpen_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewJavaDriver(driverCtx) + // Copy the test jar into the task's directory + dst, _ := execCtx.AllocDir.TaskDirs[task.Name] + copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -108,8 +111,7 @@ func TestJavaDriver_Start_Wait(t *testing.T) { task := &structs.Task{ Name: "demo-app", Config: map[string]interface{}{ - "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", - "checksum": "sha256:58d6e8130308d32e197c5108edd4f56ddf1417408f743097c2e662df0f0b17c8", + "jar_path": "demoapp.jar", }, LogConfig: &structs.LogConfig{ MaxFiles: 10, @@ -119,9 +121,12 @@ func TestJavaDriver_Start_Wait(t *testing.T) { } driverCtx, execCtx := testDriverContexts(task) - defer execCtx.AllocDir.Destroy() d := NewJavaDriver(driverCtx) + // Copy the test jar into the task's directory + dst, _ := execCtx.AllocDir.TaskDirs[task.Name] + copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -168,7 +173,7 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { task := &structs.Task{ Name: "demo-app", Config: map[string]interface{}{ - "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/demoapp.jar", + "jar_path": "demoapp.jar", }, LogConfig: &structs.LogConfig{ MaxFiles: 10, @@ -181,6 +186,10 @@ func TestJavaDriver_Start_Kill_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewJavaDriver(driverCtx) + // Copy the test jar into the task's directory + dst, _ := execCtx.AllocDir.TaskDirs[task.Name] + copyFile("./test-resources/java/demoapp.jar", filepath.Join(dst, "demoapp.jar"), t) + handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) diff --git a/client/driver/qemu.go b/client/driver/qemu.go index e4efe585888..1c8d89cf59f 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" - "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" @@ -36,10 +35,9 @@ type QemuDriver struct { } type QemuDriverConfig struct { - ArtifactSource string `mapstructure:"artifact_source"` - Checksum string `mapstructure:"checksum"` - Accelerator string `mapstructure:"accelerator"` - PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and to guest ports. + ImagePath string `mapstructure:"image_path"` + Accelerator string `mapstructure:"accelerator"` + PortMap []map[string]int `mapstructure:"port_map"` // A map of host port labels and to guest ports. } // qemuHandle is returned from Start/Open as a handle to the PID @@ -98,16 +96,11 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } // Get the image source - source, ok := task.Config["artifact_source"] - if !ok || source == "" { - return nil, fmt.Errorf("Missing source image Qemu driver") - } - - // Qemu defaults to 128M of RAM for a given VM. Instead, we force users to - // supply a memory size in the tasks resources - if task.Resources == nil || task.Resources.MemoryMB == 0 { - return nil, fmt.Errorf("Missing required Task Resource: Memory") + vmPath := driverConfig.ImagePath + if vmPath == "" { + return nil, fmt.Errorf("image_path must be set") } + vmID := filepath.Base(vmPath) // Get the tasks local directory. taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName] @@ -115,19 +108,6 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName) } - // Proceed to download an artifact to be executed. - vmPath, err := getter.GetArtifact( - taskDir, - driverConfig.ArtifactSource, - driverConfig.Checksum, - d.logger, - ) - if err != nil { - return nil, err - } - - vmID := filepath.Base(vmPath) - // Parse configuration arguments // Create the base arguments accelerator := "tcg" @@ -137,8 +117,13 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, // TODO: Check a lower bounds, e.g. the default 128 of Qemu mem := fmt.Sprintf("%dM", task.Resources.MemoryMB) + absPath, err := GetAbsolutePath("qemu-system-x86_64") + if err != nil { + return nil, err + } + args := []string{ - "qemu-system-x86_64", + absPath, "-machine", "type=pc,accel=" + accelerator, "-name", vmID, "-m", mem, diff --git a/client/driver/qemu_test.go b/client/driver/qemu_test.go index ff163f9752f..c5cd0b31d0c 100644 --- a/client/driver/qemu_test.go +++ b/client/driver/qemu_test.go @@ -2,6 +2,7 @@ package driver import ( "fmt" + "path/filepath" "testing" "github.com/hashicorp/nomad/client/config" @@ -37,13 +38,11 @@ func TestQemuDriver_Fingerprint(t *testing.T) { func TestQemuDriver_StartOpen_Wait(t *testing.T) { t.Parallel() ctestutils.QemuCompatible(t) - // TODO: use test server to load from a fixture task := &structs.Task{ Name: "linux", Config: map[string]interface{}{ - "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/linux-0.2.img", - "checksum": "sha256:a5e836985934c3392cbbd9b26db55a7d35a8d7ae1deb7ca559dd9c0159572544", - "accelerator": "tcg", + "image_path": "linux-0.2.img", + "accelerator": "tcg", "port_map": []map[string]int{{ "main": 22, "web": 8080, @@ -68,6 +67,10 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) { defer execCtx.AllocDir.Destroy() d := NewQemuDriver(driverCtx) + // Copy the test image into the task's directory + dst, _ := execCtx.AllocDir.TaskDirs[task.Name] + copyFile("./test-resources/qemu/linux-0.2.img", filepath.Join(dst, "linux-0.2.img"), t) + handle, err := d.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) @@ -90,33 +93,3 @@ func TestQemuDriver_StartOpen_Wait(t *testing.T) { fmt.Printf("\nError killing Qemu test: %s", err) } } - -func TestQemuDriver_RequiresMemory(t *testing.T) { - t.Parallel() - ctestutils.QemuCompatible(t) - // TODO: use test server to load from a fixture - task := &structs.Task{ - Name: "linux", - Config: map[string]interface{}{ - "artifact_source": "https://dl.dropboxusercontent.com/u/47675/jar_thing/linux-0.2.img", - "accelerator": "tcg", - "host_port": "8080", - "guest_port": "8081", - "checksum": "sha256:a5e836985934c3392cbbd9b26db55a7d35a8d7ae1deb7ca559dd9c0159572544", - // ssh u/p would be here - }, - LogConfig: &structs.LogConfig{ - MaxFiles: 10, - MaxFileSizeMB: 10, - }, - } - - driverCtx, execCtx := testDriverContexts(task) - defer execCtx.AllocDir.Destroy() - d := NewQemuDriver(driverCtx) - - _, err := d.Start(execCtx, task) - if err == nil { - t.Fatalf("Expected error when not specifying memory") - } -} diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index f029525b589..3b1c8737287 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -14,7 +14,6 @@ import ( "github.com/hashicorp/nomad/client/driver/executor" cstructs "github.com/hashicorp/nomad/client/driver/structs" "github.com/hashicorp/nomad/client/fingerprint" - "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/helper/discover" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/mapstructure" @@ -83,21 +82,6 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return nil, err } - // Check if an artificat is specified and attempt to download it - source, ok := task.Config["artifact_source"] - if ok && source != "" { - // Proceed to download an artifact to be executed. - _, err := getter.GetArtifact( - taskDir, - driverConfig.ArtifactSource, - driverConfig.Checksum, - d.logger, - ) - if err != nil { - return nil, err - } - } - bin, err := discover.NomadExecutable() if err != nil { return nil, fmt.Errorf("unable to find the nomad binary: %v", err) diff --git a/client/driver/raw_exec_test.go b/client/driver/raw_exec_test.go index 6a36da94212..f35be1ef1da 100644 --- a/client/driver/raw_exec_test.go +++ b/client/driver/raw_exec_test.go @@ -3,8 +3,6 @@ package driver import ( "fmt" "io/ioutil" - "net/http" - "net/http/httptest" "path/filepath" "reflect" "testing" @@ -99,57 +97,6 @@ func TestRawExecDriver_StartOpen_Wait(t *testing.T) { handle2.Kill() } -func TestRawExecDriver_Start_Artifact_basic(t *testing.T) { - t.Parallel() - path := testtask.Path() - ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir(path)))) - defer ts.Close() - - file := filepath.Base(path) - task := &structs.Task{ - Name: "sleep", - Config: map[string]interface{}{ - "artifact_source": fmt.Sprintf("%s/%s", ts.URL, file), - "command": file, - "args": []string{"sleep", "1s"}, - }, - LogConfig: &structs.LogConfig{ - MaxFiles: 10, - MaxFileSizeMB: 10, - }, - Resources: basicResources, - } - testtask.SetTaskEnv(task) - - driverCtx, execCtx := testDriverContexts(task) - defer execCtx.AllocDir.Destroy() - d := NewRawExecDriver(driverCtx) - - handle, err := d.Start(execCtx, task) - if err != nil { - t.Fatalf("err: %v", err) - } - if handle == nil { - t.Fatalf("missing handle") - } - - // Attempt to open - handle2, err := d.Open(execCtx, handle.ID()) - if err != nil { - t.Fatalf("err: %v", err) - } - if handle2 == nil { - t.Fatalf("missing handle") - } - - // Task should terminate quickly - select { - case <-handle2.WaitCh(): - case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): - t.Fatalf("timeout") - } -} - func TestRawExecDriver_Start_Wait(t *testing.T) { t.Parallel() task := &structs.Task{ diff --git a/client/driver/rkt.go b/client/driver/rkt.go index fa24118959c..3dea50b04ca 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -228,7 +228,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e Cmd: exec.Command(bin, "executor", pluginLogFile), } - exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) + execIntf, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config) if err != nil { return nil, err } @@ -241,7 +241,12 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e LogConfig: task.LogConfig, } - ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "rkt", Args: cmdArgs}, executorCtx) + absPath, err := GetAbsolutePath("rkt") + if err != nil { + return nil, err + } + + ps, err := execIntf.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: cmdArgs}, executorCtx) if err != nil { pluginClient.Kill() return nil, fmt.Errorf("error starting process via the plugin: %v", err) @@ -251,7 +256,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e maxKill := d.DriverContext.config.MaxKillTimeout h := &rktHandle{ pluginClient: pluginClient, - executor: exec, + executor: execIntf, executorPid: ps.Pid, allocDir: ctx.AllocDir, logger: d.logger, diff --git a/client/driver/utils.go b/client/driver/utils.go index e8921c54227..1bafb8a0de4 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -4,6 +4,8 @@ import ( "fmt" "io" "os" + "os/exec" + "path/filepath" "strings" "time" @@ -133,3 +135,14 @@ func GetKillTimeout(desired, max time.Duration) time.Duration { return max } + +// GetAbsolutePath returns the absolute path of the passed binary by resolving +// it in the path and following symlinks. +func GetAbsolutePath(bin string) (string, error) { + lp, err := exec.LookPath(bin) + if err != nil { + return "", fmt.Errorf("failed to resolve path to %q executable: %v", bin, err) + } + + return filepath.EvalSymlinks(lp) +} diff --git a/client/getter/getter.go b/client/getter/getter.go index 7dc1b7bd52a..92d295f473f 100644 --- a/client/getter/getter.go +++ b/client/getter/getter.go @@ -4,14 +4,10 @@ import ( "fmt" "log" "net/url" - "path" - "path/filepath" - "runtime" - "strings" "sync" - "syscall" gg "github.com/hashicorp/go-getter" + "github.com/hashicorp/nomad/nomad/structs" ) var ( @@ -24,7 +20,7 @@ var ( supported = []string{"http", "https", "s3"} ) -// getClient returns a client that is suitable for Nomad. +// getClient returns a client that is suitable for Nomad downloading artifacts. func getClient(src, dst string) *gg.Client { lock.Lock() defer lock.Unlock() @@ -42,36 +38,38 @@ func getClient(src, dst string) *gg.Client { return &gg.Client{ Src: src, Dst: dst, - Dir: false, // Only support a single file for now. + Mode: gg.ClientModeAny, Getters: getters, } } -func GetArtifact(destDir, source, checksum string, logger *log.Logger) (string, error) { - if source == "" { - return "", fmt.Errorf("Source url is empty in Artifact Getter") - } - u, err := url.Parse(source) +// getGetterUrl returns the go-getter URL to download the artifact. +func getGetterUrl(artifact *structs.TaskArtifact) (string, error) { + u, err := url.Parse(artifact.GetterSource) if err != nil { - return "", err + return "", fmt.Errorf("failed to parse source URL %q: %v", artifact.GetterSource, err) } - // if checksum is seperate, apply to source - if checksum != "" { - source = strings.Join([]string{source, fmt.Sprintf("checksum=%s", checksum)}, "?") - logger.Printf("[DEBUG] client.getter: Applying checksum to Artifact Source URL, new url: %s", source) + // Build the url + q := u.Query() + for k, v := range artifact.GetterOptions { + q.Add(k, v) } + u.RawQuery = q.Encode() + return u.String(), nil +} - artifactFile := filepath.Join(destDir, path.Base(u.Path)) - if err := getClient(source, artifactFile).Get(); err != nil { - return "", fmt.Errorf("Error downloading artifact: %s", err) +// GetArtifact downloads an artifact into the specified destination directory. +func GetArtifact(artifact *structs.TaskArtifact, destDir string, logger *log.Logger) error { + url, err := getGetterUrl(artifact) + if err != nil { + return err } - // Add execution permissions to the newly downloaded artifact - if runtime.GOOS != "windows" { - if err := syscall.Chmod(artifactFile, 0755); err != nil { - logger.Printf("[ERR] driver.raw_exec: Error making artifact executable: %s", err) - } + // Download the artifact + if err := getClient(url, destDir).Get(); err != nil { + return err } - return artifactFile, nil + + return nil } diff --git a/client/getter/getter_test.go b/client/getter/getter_test.go index 54eff20c666..7282000df01 100644 --- a/client/getter/getter_test.go +++ b/client/getter/getter_test.go @@ -4,108 +4,149 @@ import ( "fmt" "io/ioutil" "log" + "net/http" + "net/http/httptest" "os" + "path/filepath" + "reflect" "strings" "testing" + + "github.com/hashicorp/nomad/nomad/structs" ) -func TestGetArtifact_basic(t *testing.T) { +func TestGetArtifact_FileAndChecksum(t *testing.T) { + // Create the test server hosting the file to download + ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("./test-fixtures/")))) + defer ts.Close() - logger := log.New(os.Stderr, "", log.LstdFlags) + // Create a temp directory to download into + destDir, err := ioutil.TempDir("", "nomad-test") + if err != nil { + t.Fatalf("failed to make temp directory: %v", err) + } + defer os.RemoveAll(destDir) - // TODO: Use http.TestServer to serve these files from fixtures dir - passing := []struct { - Source, Checksum string - }{ - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_darwin_amd64", - "sha256:66aa0f05fc0cfcf1e5ed8cc5307b5df51e33871d6b295a60e0f9f6dd573da977", - }, - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64", - "sha256:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c", - }, - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64", - "md5:a9b14903a8942748e4f8474e11f795d3", - }, - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64?checksum=sha256:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c", - "", + // Create the artifact + file := "test.sh" + artifact := &structs.TaskArtifact{ + GetterSource: fmt.Sprintf("%s/%s", ts.URL, file), + GetterOptions: map[string]string{ + "checksum": "md5:bce963762aa2dbfed13caf492a45fb72", }, - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64", - "", + } + + // Download the artifact + logger := log.New(os.Stderr, "", log.LstdFlags) + if err := GetArtifact(artifact, destDir, logger); err != nil { + t.Fatalf("GetArtifact failed: %v", err) + } + + // Verify artifact exists + if _, err := os.Stat(filepath.Join(destDir, file)); err != nil { + t.Fatalf("source path error: %s", err) + } +} + +func TestGetArtifact_InvalidChecksum(t *testing.T) { + // Create the test server hosting the file to download + ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("./test-fixtures/")))) + defer ts.Close() + + // Create a temp directory to download into + destDir, err := ioutil.TempDir("", "nomad-test") + if err != nil { + t.Fatalf("failed to make temp directory: %v", err) + } + defer os.RemoveAll(destDir) + + // Create the artifact with an incorrect checksum + file := "test.sh" + artifact := &structs.TaskArtifact{ + GetterSource: fmt.Sprintf("%s/%s", ts.URL, file), + GetterOptions: map[string]string{ + "checksum": "md5:aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", }, } - for i, p := range passing { - destDir, err := ioutil.TempDir("", fmt.Sprintf("nomad-test-%d", i)) - if err != nil { - t.Fatalf("Error in TestGetArtifact_basic makeing TempDir: %s", err) + // Download the artifact and expect an error + logger := log.New(os.Stderr, "", log.LstdFlags) + if err := GetArtifact(artifact, destDir, logger); err == nil { + t.Fatalf("GetArtifact should have failed") + } +} + +func createContents(basedir string, fileContents map[string]string, t *testing.T) { + for relPath, content := range fileContents { + folder := basedir + if strings.Index(relPath, "/") != -1 { + // Create the folder. + folder = filepath.Join(basedir, filepath.Dir(relPath)) + if err := os.Mkdir(folder, 0777); err != nil { + t.Fatalf("failed to make directory: %v", err) + } } - path, err := GetArtifact(destDir, p.Source, p.Checksum, logger) - if err != nil { - t.Fatalf("TestGetArtifact_basic unexpected failure here: %s", err) + // Create a file in the existing folder. + file := filepath.Join(folder, filepath.Base(relPath)) + if err := ioutil.WriteFile(file, []byte(content), 0777); err != nil { + t.Fatalf("failed to write data to file %v: %v", file, err) } + } +} - if p.Checksum != "" { - if ok := strings.Contains(path, p.Checksum); ok { - t.Fatalf("path result should not contain the checksum, got: %s", path) - } +func checkContents(basedir string, fileContents map[string]string, t *testing.T) { + for relPath, content := range fileContents { + path := filepath.Join(basedir, relPath) + actual, err := ioutil.ReadFile(path) + if err != nil { + t.Fatalf("failed to read file %q: %v", path, err) } - // verify artifact exists - if _, err := os.Stat(path); err != nil { - t.Fatalf("source path error: %s", err) + if !reflect.DeepEqual(actual, []byte(content)) { + t.Fatalf("%q: expected %q; got %q", path, content, string(actual)) } } } -func TestGetArtifact_fails(t *testing.T) { +func TestGetArtifact_Archive(t *testing.T) { + // Create the test server hosting the file to download + ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir("./test-fixtures/")))) + defer ts.Close() - logger := log.New(os.Stderr, "", log.LstdFlags) + // Create a temp directory to download into and create some of the same + // files that exist in the artifact to ensure they are overriden + destDir, err := ioutil.TempDir("", "nomad-test") + if err != nil { + t.Fatalf("failed to make temp directory: %v", err) + } + defer os.RemoveAll(destDir) - failing := []struct { - Source, Checksum string - }{ - { - "", - "sha256:66aa0f05fc0cfcf1e5ed8cc5307b5d", - }, - { - "/u/47675/jar_thing/hi_darwin_amd64", - "sha256:66aa0f05fc0cfcf1e5ed8cc5307b5d", - }, - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_darwin_amd64", - "sha256:66aa0f05fc0cfcf1e5ed8cc5307b5d", - }, - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64", - "sha257:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c", - }, - // malformed checksum - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd64", - "6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c", - }, - // 404 - { - "https://dl.dropboxusercontent.com/u/47675/jar_thing/hi_linux_amd86", - "", + create := map[string]string{ + "exist/my.config": "to be replaced", + "untouched": "existing top-level", + } + createContents(destDir, create, t) + + file := "archive.tar.gz" + artifact := &structs.TaskArtifact{ + GetterSource: fmt.Sprintf("%s/%s", ts.URL, file), + GetterOptions: map[string]string{ + "checksum": "sha1:20bab73c72c56490856f913cf594bad9a4d730f6", }, } - for i, p := range failing { - destDir, err := ioutil.TempDir("", fmt.Sprintf("nomad-test-%d", i)) - if err != nil { - t.Fatalf("Error in TestGetArtifact_basic makeing TempDir: %s", err) - } - _, err = GetArtifact(destDir, p.Source, p.Checksum, logger) - if err == nil { - t.Fatalf("TestGetArtifact_basic expected failure, but got none") - } + logger := log.New(os.Stderr, "", log.LstdFlags) + if err := GetArtifact(artifact, destDir, logger); err != nil { + t.Fatalf("GetArtifact failed: %v", err) + } + + // Verify the unarchiving overrode files properly. + expected := map[string]string{ + "untouched": "existing top-level", + "exist/my.config": "hello world\n", + "new/my.config": "hello world\n", + "test.sh": "sleep 1\n", } + checkContents(destDir, expected, t) } diff --git a/client/getter/test-fixtures/archive.tar.gz b/client/getter/test-fixtures/archive.tar.gz new file mode 100644 index 00000000000..2b1446bc5ac Binary files /dev/null and b/client/getter/test-fixtures/archive.tar.gz differ diff --git a/client/getter/test-fixtures/archive/exist/my.config b/client/getter/test-fixtures/archive/exist/my.config new file mode 100644 index 00000000000..3b18e512dba --- /dev/null +++ b/client/getter/test-fixtures/archive/exist/my.config @@ -0,0 +1 @@ +hello world diff --git a/client/getter/test-fixtures/archive/new/my.config b/client/getter/test-fixtures/archive/new/my.config new file mode 100644 index 00000000000..3b18e512dba --- /dev/null +++ b/client/getter/test-fixtures/archive/new/my.config @@ -0,0 +1 @@ +hello world diff --git a/client/getter/test-fixtures/archive/test.sh b/client/getter/test-fixtures/archive/test.sh new file mode 100644 index 00000000000..08bfc4afe6d --- /dev/null +++ b/client/getter/test-fixtures/archive/test.sh @@ -0,0 +1 @@ +sleep 1 diff --git a/client/getter/test-fixtures/test.sh b/client/getter/test-fixtures/test.sh new file mode 100644 index 00000000000..08bfc4afe6d --- /dev/null +++ b/client/getter/test-fixtures/test.sh @@ -0,0 +1 @@ +sleep 1 diff --git a/client/task_runner.go b/client/task_runner.go index 20d1c984309..32571acb7bd 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/driver" + "github.com/hashicorp/nomad/client/getter" "github.com/hashicorp/nomad/nomad/structs" "github.com/mitchellh/hashstructure" @@ -48,6 +49,10 @@ type TaskRunner struct { handle driver.DriverHandle handleLock sync.Mutex + // artifactsDownloaded tracks whether the tasks artifacts have been + // downloaded + artifactsDownloaded bool + destroy bool destroyCh chan struct{} destroyLock sync.Mutex @@ -146,6 +151,10 @@ func (r *TaskRunner) RestoreState() error { } r.handleLock.Lock() r.handle = handle + + // If we have previously created the driver, the artifacts have been + // downloaded. + r.artifactsDownloaded = true r.handleLock.Unlock() } return nil @@ -214,12 +223,40 @@ func (r *TaskRunner) Run() { } func (r *TaskRunner) run() { + // Predeclare things so we an jump to the RESTART + var handleEmpty bool + for { + // Download the task's artifacts + if !r.artifactsDownloaded && len(r.task.Artifacts) > 0 { + r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) + taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name] + if !ok { + err := fmt.Errorf("task directory couldn't be found") + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)) + r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name) + + // Non-restartable error + return + } + + for _, artifact := range r.task.Artifacts { + if err := getter.GetArtifact(artifact, taskDir, r.logger); err != nil { + r.setState(structs.TaskStateDead, + structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(err)) + r.restartTracker.SetStartError(cstructs.NewRecoverableError(err, true)) + goto RESTART + } + } + + r.artifactsDownloaded = true + } + // Start the task if not yet started or it is being forced. This logic // is necessary because in the case of a restore the handle already // exists. r.handleLock.Lock() - handleEmpty := r.handle == nil + handleEmpty = r.handle == nil r.handleLock.Unlock() if handleEmpty { startErr := r.startTask() @@ -277,6 +314,7 @@ func (r *TaskRunner) run() { RESTART: state, when := r.restartTracker.GetState() + r.restartTracker.SetStartError(nil).SetWaitResult(nil) switch state { case structs.TaskNotRestarting, structs.TaskTerminated: r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID) diff --git a/client/task_runner_test.go b/client/task_runner_test.go index a0870379476..c73a57140eb 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -3,6 +3,8 @@ package client import ( "fmt" "log" + "net/http" + "net/http/httptest" "os" "path/filepath" "testing" @@ -32,12 +34,17 @@ func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEve } func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { + return testTaskRunnerFromAlloc(restarts, mock.Alloc()) +} + +// Creates a mock task runner using the first task in the first task group of +// the passed allocation. +func testTaskRunnerFromAlloc(restarts bool, alloc *structs.Allocation) (*MockTaskStateUpdater, *TaskRunner) { logger := testLogger() conf := DefaultConfig() conf.StateDir = os.TempDir() conf.AllocDir = os.TempDir() upd := &MockTaskStateUpdater{} - alloc := mock.Alloc() task := alloc.Job.TaskGroups[0].Tasks[0] consulClient, _ := NewConsulService(&consulServiceConfig{logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) // Initialize the port listing. This should be done by the offer process but @@ -48,7 +55,7 @@ func testTaskRunner(restarts bool) (*MockTaskStateUpdater, *TaskRunner) { allocDir.Build([]*structs.Task{task}) ctx := driver.NewExecContext(allocDir, alloc.ID) - tr := NewTaskRunner(logger, conf, upd.Update, ctx, mock.Alloc(), task, consulClient) + tr := NewTaskRunner(logger, conf, upd.Update, ctx, alloc, task, consulClient) if !restarts { tr.restartTracker = noRestartsTracker() } @@ -227,3 +234,134 @@ func TestTaskRunner_SaveRestoreState(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestTaskRunner_Download_List(t *testing.T) { + ctestutil.ExecCompatible(t) + + ts := httptest.NewServer(http.FileServer(http.Dir(filepath.Dir(".")))) + defer ts.Close() + + // Create an allocation that has a task with a list of artifacts. + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + f1 := "task_runner_test.go" + f2 := "task_runner.go" + artifact1 := structs.TaskArtifact{ + GetterSource: fmt.Sprintf("%s/%s", ts.URL, f1), + } + artifact2 := structs.TaskArtifact{ + GetterSource: fmt.Sprintf("%s/%s", ts.URL, f2), + } + task.Artifacts = []*structs.TaskArtifact{&artifact1, &artifact2} + + upd, tr := testTaskRunnerFromAlloc(false, alloc) + go tr.Run() + defer tr.Destroy() + defer tr.ctx.AllocDir.Destroy() + + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + t.Fatalf("timeout") + } + + if len(upd.events) != 4 { + t.Fatalf("should have 4 updates: %#v", upd.events) + } + + if upd.state != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) + } + + if upd.events[0].Type != structs.TaskReceived { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) + } + + if upd.events[1].Type != structs.TaskDownloadingArtifacts { + t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskDownloadingArtifacts) + } + + if upd.events[2].Type != structs.TaskStarted { + t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskStarted) + } + + if upd.events[3].Type != structs.TaskTerminated { + t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskTerminated) + } + + // Check that both files exist. + taskDir := tr.ctx.AllocDir.TaskDirs[task.Name] + if _, err := os.Stat(filepath.Join(taskDir, f1)); err != nil { + t.Fatalf("%v not downloaded", f1) + } + if _, err := os.Stat(filepath.Join(taskDir, f2)); err != nil { + t.Fatalf("%v not downloaded", f2) + } +} + +func TestTaskRunner_Download_Retries(t *testing.T) { + ctestutil.ExecCompatible(t) + + // Create an allocation that has a task with bad artifacts. + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + artifact := structs.TaskArtifact{ + GetterSource: "http://127.1.1.111:12315/foo/bar/baz", + } + task.Artifacts = []*structs.TaskArtifact{&artifact} + + // Make the restart policy try one update + alloc.Job.TaskGroups[0].RestartPolicy = &structs.RestartPolicy{ + Attempts: 1, + Interval: 10 * time.Minute, + Delay: 1 * time.Second, + Mode: structs.RestartPolicyModeFail, + } + + upd, tr := testTaskRunnerFromAlloc(true, alloc) + go tr.Run() + defer tr.Destroy() + defer tr.ctx.AllocDir.Destroy() + + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + t.Fatalf("timeout") + } + + if len(upd.events) != 7 { + t.Fatalf("should have 7 updates: %#v", upd.events) + } + + if upd.state != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) + } + + if upd.events[0].Type != structs.TaskReceived { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) + } + + if upd.events[1].Type != structs.TaskDownloadingArtifacts { + t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskDownloadingArtifacts) + } + + if upd.events[2].Type != structs.TaskArtifactDownloadFailed { + t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskArtifactDownloadFailed) + } + + if upd.events[3].Type != structs.TaskRestarting { + t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskRestarting) + } + + if upd.events[4].Type != structs.TaskDownloadingArtifacts { + t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskDownloadingArtifacts) + } + + if upd.events[5].Type != structs.TaskArtifactDownloadFailed { + t.Fatalf("Sixth Event was %v; want %v", upd.events[5].Type, structs.TaskArtifactDownloadFailed) + } + + if upd.events[6].Type != structs.TaskNotRestarting { + t.Fatalf("Seventh Event was %v; want %v", upd.events[6].Type, structs.TaskNotRestarting) + } +} diff --git a/command/alloc_status.go b/command/alloc_status.go index ecba378f976..49e3844fa14 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -205,6 +205,14 @@ func (c *AllocStatusCommand) taskStatus(alloc *api.Allocation) { } else { desc = "Failed to start task" } + case api.TaskDownloadingArtifacts: + desc = "Client is downloading artifacts" + case api.TaskArtifactDownloadFailed: + if event.DownloadError != "" { + desc = event.DownloadError + } else { + desc = "Failed to download artifacts" + } case api.TaskKilled: if event.KillError != "" { desc = event.KillError diff --git a/command/init.go b/command/init.go index 723402ac68c..db2a0187610 100644 --- a/command/init.go +++ b/command/init.go @@ -158,6 +158,16 @@ job "example" { } } } + + # The artifact block can be specified one or more times to download + $ artifacts prior to the task being started. This is convenient for + $ shipping configs or data needed by the task. + # artifact { + # source = "http://foo.com/artifact.tar.gz" + # options { + # checksum = "md5:c4aa853ad2215426eb7d70a21922e794" + # } + # } # Specify configuration related to log rotation # logs { diff --git a/jobspec/parse.go b/jobspec/parse.go index 58ee1892403..853640f7509 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -468,6 +468,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l "resources", "logs", "kill_timeout", + "artifact", } if err := checkHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -484,6 +485,7 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l delete(m, "meta") delete(m, "resources") delete(m, "logs") + delete(m, "artifact") // Build the task var t structs.Task @@ -596,12 +598,84 @@ func parseTasks(jobName string, taskGroupName string, result *[]*structs.Task, l } t.LogConfig = logConfig + // Parse artifacts + if o := listVal.Filter("artifact"); len(o.Items) > 0 { + if err := parseArtifacts(&t.Artifacts, o); err != nil { + return multierror.Prefix(err, fmt.Sprintf("'%s', artifact ->", n)) + } + } + *result = append(*result, &t) } return nil } +func parseArtifacts(result *[]*structs.TaskArtifact, list *ast.ObjectList) error { + for _, o := range list.Elem().Items { + // Check for invalid keys + valid := []string{ + "source", + "options", + } + if err := checkHCLKeys(o.Val, valid); err != nil { + return err + } + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + delete(m, "options") + + var ta structs.TaskArtifact + if err := mapstructure.WeakDecode(m, &ta); err != nil { + return err + } + + var optionList *ast.ObjectList + if ot, ok := o.Val.(*ast.ObjectType); ok { + optionList = ot.List + } else { + return fmt.Errorf("artifact should be an object") + } + + options := make(map[string]string) + if oo := optionList.Filter("options"); len(oo.Items) > 0 { + if err := parseArtifactOption(options, oo); err != nil { + return multierror.Prefix(err, "options: ") + } + } + + ta.GetterOptions = options + *result = append(*result, &ta) + } + + return nil +} + +func parseArtifactOption(result map[string]string, list *ast.ObjectList) error { + list = list.Elem() + if len(list.Items) > 1 { + return fmt.Errorf("only one 'options' block allowed per artifact") + } + + // Get our resource object + o := list.Items[0] + + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Val); err != nil { + return err + } + + if err := mapstructure.WeakDecode(m, &result); err != nil { + return err + } + + return nil +} + func parseServices(jobName string, taskGroupName string, task *structs.Task, serviceObjs *ast.ObjectList) error { task.Services = make([]*structs.Service, len(serviceObjs.Items)) var defaultServiceName bool diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index f6a7351b99d..28cc6afe86a 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -128,6 +128,20 @@ func TestParse(t *testing.T) { MaxFiles: 10, MaxFileSizeMB: 100, }, + Artifacts: []*structs.TaskArtifact{ + { + GetterSource: "http://foo.com/artifact", + GetterOptions: map[string]string{ + "checksum": "md5:b8a4f3f72ecab0510a6a31e997461c5f", + }, + }, + { + GetterSource: "http://bar.com/artifact", + GetterOptions: map[string]string{ + "checksum": "md5:ff1cc0d3432dad54d607c1505fb7245c", + }, + }, + }, }, &structs.Task{ Name: "storagelocker", @@ -301,6 +315,11 @@ func TestParse(t *testing.T) { }, false, }, + { + "bad-artifact.hcl", + nil, + true, + }, } for _, tc := range cases { diff --git a/jobspec/test-fixtures/bad-artifact.hcl b/jobspec/test-fixtures/bad-artifact.hcl new file mode 100644 index 00000000000..3027d5e3d5f --- /dev/null +++ b/jobspec/test-fixtures/bad-artifact.hcl @@ -0,0 +1,13 @@ +job "binstore-storagelocker" { + group "binsl" { + count = 5 + task "binstore" { + driver = "docker" + + artifact { + bad = "bad" + } + resources {} + } + } +} diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index efd4fc4dca5..fddab3731fb 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -82,6 +82,20 @@ job "binstore-storagelocker" { } kill_timeout = "22s" + + artifact { + source = "http://foo.com/artifact" + options { + checksum = "md5:b8a4f3f72ecab0510a6a31e997461c5f" + } + } + + artifact { + source = "http://bar.com/artifact" + options { + checksum = "md5:ff1cc0d3432dad54d607c1505fb7245c" + } + } } task "storagelocker" { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 6f409a65c10..97ff2c32f15 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2,10 +2,15 @@ package structs import ( "bytes" + "crypto/md5" "crypto/sha1" + "crypto/sha256" + "crypto/sha512" + "encoding/hex" "errors" "fmt" "io" + "net/url" "reflect" "regexp" "strconv" @@ -1603,6 +1608,10 @@ type Task struct { // LogConfig provides configuration for log rotation LogConfig *LogConfig `mapstructure:"logs"` + + // Artifacts is a list of artifacts to download and extract before running + // the task. + Artifacts []*TaskArtifact } func (t *Task) Copy() *Task { @@ -1623,6 +1632,12 @@ func (t *Task) Copy() *Task { nt.Resources = nt.Resources.Copy() nt.Meta = CopyMapStringString(nt.Meta) + artifacts := make([]*TaskArtifact, len(nt.Artifacts)) + for i, a := range nt.Artifacts { + artifacts[i] = a.Copy() + } + nt.Artifacts = artifacts + if i, err := copystructure.Copy(nt.Config); err != nil { nt.Config = i.(map[string]interface{}) } @@ -1662,6 +1677,72 @@ func (t *Task) FindHostAndPortFor(portLabel string) (string, int) { return "", 0 } +// Validate is used to sanity check a task +func (t *Task) Validate() error { + var mErr multierror.Error + if t.Name == "" { + mErr.Errors = append(mErr.Errors, errors.New("Missing task name")) + } + if t.Driver == "" { + mErr.Errors = append(mErr.Errors, errors.New("Missing task driver")) + } + if t.KillTimeout.Nanoseconds() < 0 { + mErr.Errors = append(mErr.Errors, errors.New("KillTimeout must be a positive value")) + } + + // Validate the resources. + if t.Resources == nil { + mErr.Errors = append(mErr.Errors, errors.New("Missing task resources")) + } else if err := t.Resources.MeetsMinResources(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + + // Validate the log config + if t.LogConfig == nil { + mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config")) + } else if err := t.LogConfig.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + + for idx, constr := range t.Constraints { + if err := constr.Validate(); err != nil { + outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err) + mErr.Errors = append(mErr.Errors, outer) + } + } + + for _, service := range t.Services { + if err := service.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + + if t.LogConfig != nil && t.Resources != nil { + logUsage := (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB) + if t.Resources.DiskMB <= logUsage { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("log storage (%d MB) exceeds requested disk capacity (%d MB)", + logUsage, t.Resources.DiskMB)) + } + } + + for idx, artifact := range t.Artifacts { + if err := artifact.Validate(); err != nil { + outer := fmt.Errorf("Artifact %d validation failed: %v", idx+1, err) + mErr.Errors = append(mErr.Errors, outer) + } + } + + // If the driver is java or qemu ensure that they have specified an + // artifact. + if (t.Driver == "qemu" || t.Driver == "java") && len(t.Artifacts) == 0 { + err := fmt.Errorf("must specify at least one artifact when using %q driver", t.Driver) + mErr.Errors = append(mErr.Errors, err) + } + + return mErr.ErrorOrNil() +} + // Set of possible states for a task. const ( TaskStatePending = "pending" // The task is waiting to be run. @@ -1727,6 +1808,14 @@ const ( // TaskNotRestarting indicates that the task has failed and is not being // restarted because it has exceeded its restart policy. TaskNotRestarting = "Restarts Exceeded" + + // Task Downloading Artifacts means the task is downloading the artifacts + // specified in the task. + TaskDownloadingArtifacts = "Downloading Artifacts" + + // TaskArtifactDownloadFailed indicates that downloading the artifacts + // failed. + TaskArtifactDownloadFailed = "Failed Artifact Download" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -1748,6 +1837,9 @@ type TaskEvent struct { // TaskRestarting fields. StartDelay int64 // The sleep period before restarting the task in unix nanoseconds. + + // Artifact Download fields + DownloadError string // Error downloading artifacts } func (te *TaskEvent) GoString() string { @@ -1806,54 +1898,88 @@ func (e *TaskEvent) SetRestartDelay(delay time.Duration) *TaskEvent { return e } -// Validate is used to sanity check a task group -func (t *Task) Validate() error { - var mErr multierror.Error - if t.Name == "" { - mErr.Errors = append(mErr.Errors, errors.New("Missing task name")) - } - if t.Driver == "" { - mErr.Errors = append(mErr.Errors, errors.New("Missing task driver")) +func (e *TaskEvent) SetDownloadError(err error) *TaskEvent { + if err != nil { + e.DownloadError = err.Error() } - if t.KillTimeout.Nanoseconds() < 0 { - mErr.Errors = append(mErr.Errors, errors.New("KillTimeout must be a positive value")) + return e +} + +// TaskArtifact is an artifact to download before running the task. +type TaskArtifact struct { + // GetterSource is the source to download an artifact using go-getter + GetterSource string `mapstructure:"source"` + + // GetterOptions are options to use when downloading the artifact using + // go-getter. + GetterOptions map[string]string `mapstructure:"options"` +} + +func (ta *TaskArtifact) Copy() *TaskArtifact { + if ta == nil { + return nil } + nta := new(TaskArtifact) + *nta = *ta + nta.GetterOptions = CopyMapStringString(ta.GetterOptions) + return nta +} - // Validate the resources. - if t.Resources == nil { - mErr.Errors = append(mErr.Errors, errors.New("Missing task resources")) - } else if err := t.Resources.MeetsMinResources(); err != nil { - mErr.Errors = append(mErr.Errors, err) +func (ta *TaskArtifact) Validate() error { + // Verify the source + var mErr multierror.Error + if ta.GetterSource == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("source must be specified")) } - // Validate the log config - if t.LogConfig == nil { - mErr.Errors = append(mErr.Errors, errors.New("Missing Log Config")) - } else if err := t.LogConfig.Validate(); err != nil { - mErr.Errors = append(mErr.Errors, err) + _, err := url.Parse(ta.GetterSource) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid source URL %q: %v", ta.GetterSource, err)) } - for idx, constr := range t.Constraints { - if err := constr.Validate(); err != nil { - outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err) - mErr.Errors = append(mErr.Errors, outer) + // Verify the checksum + if check, ok := ta.GetterOptions["checksum"]; ok { + check = strings.TrimSpace(check) + if check == "" { + mErr.Errors = append(mErr.Errors, fmt.Errorf("checksum value can not be empty")) + return mErr.ErrorOrNil() } - } - for _, service := range t.Services { - if err := service.Validate(); err != nil { - mErr.Errors = append(mErr.Errors, err) + parts := strings.Split(check, ":") + if l := len(parts); l != 2 { + mErr.Errors = append(mErr.Errors, fmt.Errorf(`checksum must be given as "type:value"; got %q`, check)) + return mErr.ErrorOrNil() } - } - if t.LogConfig != nil && t.Resources != nil { - logUsage := (t.LogConfig.MaxFiles * t.LogConfig.MaxFileSizeMB) - if t.Resources.DiskMB <= logUsage { - mErr.Errors = append(mErr.Errors, - fmt.Errorf("log storage (%d MB) exceeds requested disk capacity (%d MB)", - logUsage, t.Resources.DiskMB)) + checksumVal := parts[1] + checksumBytes, err := hex.DecodeString(checksumVal) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid checksum: %v", err)) + return mErr.ErrorOrNil() + } + + checksumType := parts[0] + expectedLength := 0 + switch checksumType { + case "md5": + expectedLength = md5.Size + case "sha1": + expectedLength = sha1.Size + case "sha256": + expectedLength = sha256.Size + case "sha512": + expectedLength = sha512.Size + default: + mErr.Errors = append(mErr.Errors, fmt.Errorf("unsupported checksum type: %s", checksumType)) + return mErr.ErrorOrNil() + } + + if len(checksumBytes) != expectedLength { + mErr.Errors = append(mErr.Errors, fmt.Errorf("invalid %s checksum: %v", checksumType, checksumVal)) + return mErr.ErrorOrNil() } } + return mErr.ErrorOrNil() } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 07a532575e1..7b1e0d2b356 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -131,6 +131,11 @@ func testJob() *Job { Env: map[string]string{ "FOO": "bar", }, + Artifacts: []*TaskArtifact{ + { + GetterSource: "http://foo.com", + }, + }, Services: []*Service{ { Name: "${TASK}-frontend", @@ -763,3 +768,53 @@ func TestAllocation_Index(t *testing.T) { t.Fatal() } } + +func TestTaskArtifact_Validate_Source(t *testing.T) { + valid := &TaskArtifact{GetterSource: "google.com"} + if err := valid.Validate(); err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestTaskArtifact_Validate_Checksum(t *testing.T) { + cases := []struct { + Input *TaskArtifact + Err bool + }{ + { + &TaskArtifact{ + GetterSource: "foo.com", + GetterOptions: map[string]string{ + "checksum": "no-type", + }, + }, + true, + }, + { + &TaskArtifact{ + GetterSource: "foo.com", + GetterOptions: map[string]string{ + "checksum": "md5:toosmall", + }, + }, + true, + }, + { + &TaskArtifact{ + GetterSource: "foo.com", + GetterOptions: map[string]string{ + "checksum": "invalid:type", + }, + }, + true, + }, + } + + for i, tc := range cases { + err := tc.Input.Validate() + if (err != nil) != tc.Err { + t.Fatalf("case %d: %v", i, err) + continue + } + } +} diff --git a/scheduler/util.go b/scheduler/util.go index 961b698fe08..38257387138 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -299,6 +299,9 @@ func tasksUpdated(a, b *structs.TaskGroup) bool { if !reflect.DeepEqual(at.Env, bt.Env) { return true } + if !reflect.DeepEqual(at.Artifacts, bt.Artifacts) { + return true + } // Inspect the network to see if the dynamic ports are different if len(at.Resources.Networks) != len(bt.Resources.Networks) { diff --git a/scripts/test.sh b/scripts/test.sh index ea23141d5d8..293216d5428 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -10,4 +10,4 @@ go build -o $TEMPDIR/nomad || exit 1 # Run the tests echo "--> Running tests" -go list ./... | grep -v '/vendor/' | sudo -E PATH=$TEMPDIR:$PATH xargs -n1 go test -cover -timeout=180s +go list ./... | grep -v '/vendor/' | sudo -E PATH=$TEMPDIR:$PATH xargs -n1 go test -cover -timeout=300s diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index 644192659b0..f5e6d3805bb 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -268,6 +268,10 @@ The `task` object supports the following keys: * `logs` - Logs allows configuring log rotation for the `stdout` and `stderr` buffers of a Task. See the log rotation reference below for more details. +* `artifact` - Defines an artifact to be downloaded before the task is run. This + can be provided multiple times to define additional artifacts to download. See + the artifacts reference for more details. + ### Resources The `resources` object supports the following keys: @@ -405,6 +409,40 @@ In the above example we have asked Nomad to retain 3 rotated files for both `stderr` and `stdout` and size of each file is 10MB. The minimum disk space that would be required for the task would be 60MB. +### Artifact + +Nomad downloads artifacts using +[`go-getter`](https://github.com/hashicorp/go-getter). The `go-getter` library +allows downloading of artifacts from various sources using a URL as the input +source. The key/value pairs given in the `options` block map directly to +parameters appended to the supplied `source` url. These are then used by +`go-getter` to appropriately download the artifact. `go-getter` also has a CLI +tool to validate its URL and can be used to check if the Nomad `artifact` is +valid. + +Nomad allows downloading `http`, `https`, and `S3` artifacts. If these artifacts +are archives (zip, tar.gz, bz2, etc.), these will be unarchived before the task +is started. + +The `artifact` object maps supports the following keys: + +* `source` - The path to the artifact to download. + +* `options` - The `options` block allows setting parameters for `go-getter`. An + example is given below: + +``` +options { + # Validate the downloaded artifact + checksum = "md5:c4aa853ad2215426eb7d70a21922e794" + + # S3 options for downloading artifacts from S3 + aws_access_key_id = "" + aws_access_key_secret = "" + aws_access_token = "" +} +``` + ## JSON Syntax Job files can also be specified in JSON. The conversion is straightforward