Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow downloading many artifacts and support unarchiving #921

Merged
merged 13 commits into from
Mar 16, 2016
39 changes: 24 additions & 15 deletions api/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ type Task struct {
Meta map[string]string
KillTimeout time.Duration
LogConfig *LogConfig
Artifacts []*TaskArtifact
}

type TaskArtifact struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment?

GetterSource string
GetterOptions map[string]string
}

// NewTask creates and initializes a new Task.
Expand Down Expand Up @@ -147,24 +153,27 @@ type TaskState struct {
}

const (
TaskDriverFailure = "Driver Failure"
TaskReceived = "Received"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
TaskRestarting = "Restarting"
TaskNotRestarting = "Restarts Exceeded"
TaskDriverFailure = "Driver Failure"
TaskReceived = "Received"
TaskStarted = "Started"
TaskTerminated = "Terminated"
TaskKilled = "Killed"
TaskRestarting = "Restarting"
TaskNotRestarting = "Restarts Exceeded"
TaskDownloadingArtifacts = "Downloading Artifacts"
TaskArtifactDownloadFailed = "Failed Artifact Download"
)

// TaskEvent is an event that effects the state of a task and contains meta-data
// appropriate to the events type.
type TaskEvent struct {
Type string
Time int64
DriverError string
ExitCode int
Signal int
Message string
KillError string
StartDelay int64
Type string
Time int64
DriverError string
ExitCode int
Signal int
Message string
KillError string
StartDelay int64
DownloadError string
}
26 changes: 26 additions & 0 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package driver

import (
"io"
"log"
"math/rand"
"os"
Expand Down Expand Up @@ -38,6 +39,31 @@ func TestMain(m *testing.M) {
}
}

// copyFile moves an existing file to the destination
func copyFile(src, dst string, t *testing.T) {
in, err := os.Open(src)
if err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
defer func() {
if err := out.Close(); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
}()
if _, err = io.Copy(out, in); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
if err := out.Sync(); err != nil {
t.Fatalf("copying %v -> %v failed: %v", src, dst, err)
}
return
}

func testLogger() *log.Logger {
return log.New(os.Stderr, "", log.LstdFlags)
}
Expand Down
22 changes: 2 additions & 20 deletions client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
Expand All @@ -28,10 +27,8 @@ type ExecDriver struct {
}

type ExecDriverConfig struct {
ArtifactSource string `mapstructure:"artifact_source"`
Checksum string `mapstructure:"checksum"`
Command string `mapstructure:"command"`
Args []string `mapstructure:"args"`
Command string `mapstructure:"command"`
Args []string `mapstructure:"args"`
}

// execHandle is returned from Start/Open as a handle to the PID
Expand Down Expand Up @@ -89,21 +86,6 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}

// Check if an artificat is specified and attempt to download it
source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
_, err := getter.GetArtifact(
taskDir,
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
return nil, err
}
}

bin, err := discover.NomadExecutable()
if err != nil {
return nil, fmt.Errorf("unable to find the nomad binary: %v", err)
Expand Down
48 changes: 0 additions & 48 deletions client/driver/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,54 +188,6 @@ func TestExecDriver_Start_Wait(t *testing.T) {
}
}

func TestExecDriver_Start_Artifact_basic(t *testing.T) {
t.Parallel()
ctestutils.ExecCompatible(t)
file := "hi_linux_amd64"
checksum := "sha256:6f99b4c5184726e601ecb062500aeb9537862434dfe1898dbe5c68d9f50c179c"

task := &structs.Task{
Name: "sleep",
Config: map[string]interface{}{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s?checksum=%s", file, checksum),
"command": file,
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
Resources: basicResources,
}

driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)

handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}

// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}

// Task should terminate quickly
select {
case res := <-handle.WaitCh():
if !res.Successful() {
t.Fatalf("err: %v", res)
}
case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second):
t.Fatalf("timeout")
}
}

func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
t.Parallel()
ctestutils.ExecCompatible(t)
Expand Down
31 changes: 25 additions & 6 deletions client/driver/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"log"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -151,11 +150,10 @@ func (e *UniversalExecutor) LaunchCmd(command *ExecCommand, ctx *ExecutorContext
e.cmd.Env = ctx.TaskEnv.EnvList()
e.cmd.Path = ctx.TaskEnv.ReplaceEnv(command.Cmd)
e.cmd.Args = append([]string{e.cmd.Path}, ctx.TaskEnv.ParseAndReplace(command.Args)...)
if filepath.Base(command.Cmd) == command.Cmd {
if lp, err := exec.LookPath(command.Cmd); err != nil {
} else {
e.cmd.Path = lp
}

// Ensure that the binary being started is executable.
if err := e.makeExecutable(e.cmd.Path); err != nil {
return nil, err
}

// starting the process
Expand Down Expand Up @@ -280,3 +278,24 @@ func (e *UniversalExecutor) configureTaskDir() error {
e.cmd.Dir = taskDir
return nil
}

// makeExecutablePosix makes the given file executable for root,group,others.
func (e *UniversalExecutor) makeExecutablePosix(binPath string) error {
fi, err := os.Stat(binPath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this work if the binPath is not absolute?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it would be relative to Nomads cwd tho

if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("binary %q does not exist", binPath)
}
return fmt.Errorf("specified binary is invalid: %v", err)
}

// If it is not executable, make it so.
perm := fi.Mode().Perm()
req := os.FileMode(0555)
if perm&req != req {
if err := os.Chmod(binPath, perm|req); err != nil {
return fmt.Errorf("error making %q executable: %s", binPath, err)
}
}
return nil
}
16 changes: 16 additions & 0 deletions client/driver/executor/executor_basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,25 @@
package executor

import (
"path/filepath"
"runtime"

cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
)

func (e *UniversalExecutor) makeExecutable(binPath string) error {
if runtime.GOOS == "windows" {
return nil
}

path := binPath
if !filepath.IsAbs(binPath) {
// The path must be relative the allocations directory.
path = filepath.Join(e.taskDir, binPath)
}
return e.makeExecutablePosix(path)
}

func (e *UniversalExecutor) configureChroot() error {
return nil
}
Expand Down
12 changes: 12 additions & 0 deletions client/driver/executor/executor_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ var (
}
)

func (e *UniversalExecutor) makeExecutable(binPath string) error {
path := binPath
if e.ctx.FSIsolation {
// The path must be relative the chroot
path = filepath.Join(e.taskDir, binPath)
} else if !filepath.IsAbs(binPath) {
// The path must be relative the allocations directory.
path = filepath.Join(e.taskDir, binPath)
}
return e.makeExecutablePosix(path)
}

// configureIsolation configures chroot and creates cgroups
func (e *UniversalExecutor) configureIsolation() error {
if e.ctx.FSIsolation {
Expand Down
34 changes: 14 additions & 20 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/hashicorp/nomad/client/driver/executor"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/getter"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
)
Expand All @@ -34,10 +33,9 @@ type JavaDriver struct {
}

type JavaDriverConfig struct {
JvmOpts []string `mapstructure:"jvm_options"`
ArtifactSource string `mapstructure:"artifact_source"`
Checksum string `mapstructure:"checksum"`
Args []string `mapstructure:"args"`
JarPath string `mapstructure:"jar_path"`
JvmOpts []string `mapstructure:"jvm_options"`
Args []string `mapstructure:"args"`
}

// javaHandle is returned from Start/Open as a handle to the PID
Expand Down Expand Up @@ -124,19 +122,10 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}

// Proceed to download an artifact to be executed.
path, err := getter.GetArtifact(
taskDir,
driverConfig.ArtifactSource,
driverConfig.Checksum,
d.logger,
)
if err != nil {
return nil, err
if driverConfig.JarPath == "" {
return nil, fmt.Errorf("jar_path must be specified")
}

jarName := filepath.Base(path)

args := []string{}
// Look for jvm options
if len(driverConfig.JvmOpts) != 0 {
Expand All @@ -145,7 +134,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}

// Build the argument list.
args = append(args, "-jar", jarName)
args = append(args, "-jar", driverConfig.JarPath)
if len(driverConfig.Args) != 0 {
args = append(args, driverConfig.Args...)
}
Expand All @@ -160,7 +149,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
Cmd: exec.Command(bin, "executor", pluginLogFile),
}

exec, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
execImpl, pluginClient, err := createExecutor(pluginConfig, d.config.LogOutput, d.config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We get an interface back not an impl.

if err != nil {
return nil, err
}
Expand All @@ -175,7 +164,12 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
ResourceLimits: true,
}

ps, err := exec.LaunchCmd(&executor.ExecCommand{Cmd: "java", Args: args}, executorCtx)
absPath, err := GetAbsolutePath("java")
if err != nil {
return nil, err
}

ps, err := execImpl.LaunchCmd(&executor.ExecCommand{Cmd: absPath, Args: args}, executorCtx)
if err != nil {
pluginClient.Kill()
return nil, fmt.Errorf("error starting process via the plugin: %v", err)
Expand All @@ -186,7 +180,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
maxKill := d.DriverContext.config.MaxKillTimeout
h := &javaHandle{
pluginClient: pluginClient,
executor: exec,
executor: execImpl,
userPid: ps.Pid,
isolationConfig: ps.IsolationConfig,
taskDir: taskDir,
Expand Down
Loading