Skip to content

Commit

Permalink
Merge pull request #921 from hashicorp/f-artifact-download
Browse files Browse the repository at this point in the history
Allow downloading many artifacts and support unarchiving
  • Loading branch information
dadgar committed Mar 16, 2016
2 parents 8167fd5 + edab031 commit a5e9efa
Show file tree
Hide file tree
Showing 35 changed files with 898 additions and 392 deletions.
40 changes: 25 additions & 15 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ type Task struct {
Meta map[string]string
KillTimeout time.Duration
LogConfig *LogConfig
Artifacts []*TaskArtifact
}

// TaskArtifact is used to download artifacts before running a task.
type TaskArtifact struct {
GetterSource string
GetterOptions map[string]string
}

// NewTask creates and initializes a new Task.
Expand Down Expand Up @@ -147,24 +154,27 @@ type TaskState struct {
}

const (
TaskDriverFailure = "Driver Failure"
TaskReceived = "Received"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
TaskRestarting = "Restarting"
TaskNotRestarting = "Restarts Exceeded"
TaskDriverFailure = "Driver Failure"
TaskReceived = "Received"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
TaskRestarting = "Restarting"
TaskNotRestarting = "Restarts Exceeded"
TaskDownloadingArtifacts = "Downloading Artifacts"
TaskArtifactDownloadFailed = "Failed Artifact Download"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
// appropriate to the events type.
type TaskEvent struct {
Type string
Time int64
DriverError string
ExitCode int
Signal int
Message string
KillError string
StartDelay int64
Type string
Time int64
DriverError string
ExitCode int
Signal int
Message string
KillError string
StartDelay int64
DownloadError string
}
26 changes: 26 additions & 0 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package driver

import (
"io"
"log"
"math/rand"
"os"
Expand Down Expand Up @@ -38,6 +39,31 @@ func TestMain(m *testing.M) {
}
}

// copyFile moves an existing file to the destination
func copyFile(src, dst string, t *testing.T) {
in, err := os.Open(src)
if err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
defer func() {
if err := out.Close(); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
}()
if _, err = io.Copy(out, in); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
if err := out.Sync(); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
return
}

func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
}
Expand Down
22 changes: 2 additions & 20 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
Expand All @@ -28,10 +27,8 @@ type ExecDriver struct {
}

type ExecDriverConfig struct {
ArtifactSource string `mapstructure:"artifact_source"`
Checksum string `mapstructure:"checksum"`
Command string `mapstructure:"command"`
Args []string `mapstructure:"args"`
Command string `mapstructure:"command"`
Args []string `mapstructure:"args"`
}

// execHandle is returned from Start/Open as a handle to the PID
Expand Down Expand Up @@ -89,21 +86,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}

// Check if an artificat is specified and attempt to download it
source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
_, err := getter.GetArtifact(
taskDir,
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
return nil, err
}
}

bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
Expand Down
48 changes: 0 additions & 48 deletions client/driver/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,54 +188,6 @@ func TestExecDriver_Start_Wait(t *testing.T) {
}
}

func TestExecDriver_Start_Artifact_basic(t *testing.T) {
t.Parallel()
ctestutils.ExecCompatible(t)
file := "hi_linux_amd64"
checksum := "sha256:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c"

task := &structs.Task{
Name: "sleep",
Config: map[string]interface{}{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s?checksum=%s", file, checksum),
"command": file,
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}

driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)

handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}

// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}

// Task should terminate quickly
select {
case res := <-handle.WaitCh():
if !res.Successful() {
t.Fatalf("err: %v", res)
}
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
t.Fatalf("timeout")
}
}

func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
t.Parallel()
ctestutils.ExecCompatible(t)
Expand Down
31 changes: 25 additions & 6 deletions client/driver/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -151,11 +150,10 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
e.cmd.Env = ctx.TaskEnv.EnvList()
e.cmd.Path = ctx.TaskEnv.ReplaceEnv(command.Cmd)
e.cmd.Args = append([]string{e.cmd.Path}, ctx.TaskEnv.ParseAndReplace(command.Args)...)
if filepath.Base(command.Cmd) == command.Cmd {
if lp, err := exec.LookPath(command.Cmd); err != nil {
} else {
e.cmd.Path = lp
}

// Ensure that the binary being started is executable.
if err := e.makeExecutable(e.cmd.Path); err != nil {
return nil, err
}

// starting the process
Expand Down Expand Up @@ -280,3 +278,24 @@ func (e *UniversalExecutor) configureTaskDir() error {
e.cmd.Dir = taskDir
return nil
}

// makeExecutablePosix makes the given file executable for root,group,others.
func (e *UniversalExecutor) makeExecutablePosix(binPath string) error {
fi, err := os.Stat(binPath)
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("binary %q does not exist", binPath)
}
return fmt.Errorf("specified binary is invalid: %v", err)
}

// If it is not executable, make it so.
perm := fi.Mode().Perm()
req := os.FileMode(0555)
if perm&req != req {
if err := os.Chmod(binPath, perm|req); err != nil {
return fmt.Errorf("error making %q executable: %s", binPath, err)
}
}
return nil
}
16 changes: 16 additions & 0 deletions client/driver/executor/executor_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,25 @@
package executor

import (
"path/filepath"
"runtime"

cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)

func (e *UniversalExecutor) makeExecutable(binPath string) error {
if runtime.GOOS == "windows" {
return nil
}

path := binPath
if !filepath.IsAbs(binPath) {
// The path must be relative the allocations directory.
path = filepath.Join(e.taskDir, binPath)
}
return e.makeExecutablePosix(path)
}

func (e *UniversalExecutor) configureChroot() error {
return nil
}
Expand Down
12 changes: 12 additions & 0 deletions client/driver/executor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ var (
}
)

func (e *UniversalExecutor) makeExecutable(binPath string) error {
path := binPath
if e.ctx.FSIsolation {
// The path must be relative the chroot
path = filepath.Join(e.taskDir, binPath)
} else if !filepath.IsAbs(binPath) {
// The path must be relative the allocations directory.
path = filepath.Join(e.taskDir, binPath)
}
return e.makeExecutablePosix(path)
}

// configureIsolation configures chroot and creates cgroups
func (e *UniversalExecutor) configureIsolation() error {
if e.ctx.FSIsolation {
Expand Down
34 changes: 14 additions & 20 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -34,10 +33,9 @@ type JavaDriver struct {
}

type JavaDriverConfig struct {
JvmOpts []string `mapstructure:"jvm_options"`
ArtifactSource string `mapstructure:"artifact_source"`
Checksum string `mapstructure:"checksum"`
Args []string `mapstructure:"args"`
JarPath string `mapstructure:"jar_path"`
JvmOpts []string `mapstructure:"jvm_options"`
Args []string `mapstructure:"args"`
}

// javaHandle is returned from Start/Open as a handle to the PID
Expand Down Expand Up @@ -124,19 +122,10 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}

// Proceed to download an artifact to be executed.
path, err := getter.GetArtifact(
taskDir,
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
return nil, err
if driverConfig.JarPath == "" {
return nil, fmt.Errorf("jar_path must be specified")
}

jarName := filepath.Base(path)

args := []string{}
// Look for jvm options
if len(driverConfig.JvmOpts) != 0 {
Expand All @@ -145,7 +134,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}

// Build the argument list.
args = append(args, "-jar", jarName)
args = append(args, "-jar", driverConfig.JarPath)
if len(driverConfig.Args) != 0 {
args = append(args, driverConfig.Args...)
}
Expand All @@ -160,7 +149,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
Cmd: exec.Command(bin, "executor", pluginLogFile),
}

exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
execIntf, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
if err != nil {
return nil, err
}
Expand All @@ -175,7 +164,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
ResourceLimits: true,
}

ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx)
absPath, err := GetAbsolutePath("java")
if err != nil {
return nil, err
}

ps, err := execIntf.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: args}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
Expand All @@ -186,7 +180,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
maxKill := d.DriverContext.config.MaxKillTimeout
h := &javaHandle{
pluginClient: pluginClient,
executor: exec,
executor: execIntf,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: taskDir,
Expand Down
Loading

0 comments on commit a5e9efa

Please sign in to comment.