Skip to content

Commit

Permalink
drivers: Add/Use go-getter to fetch remote binaries
Browse files Browse the repository at this point in the history
Updates Qemu, Java drivers to use go-getter to fetch binaries
Adds remote artifact support for Exec, Raw Exec drivers
  • Loading branch information
catsby committed Oct 26, 2015
1 parent c8bd79c commit 1e07daa
Show file tree
Hide file tree
Showing 13 changed files with 407 additions and 74 deletions.
7 changes: 7 additions & 0 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package driver
import (
"fmt"
"log"
"path/filepath"
"sync"

"github.com/hashicorp/nomad/client/allocdir"
Expand Down Expand Up @@ -114,6 +115,12 @@ func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment.

if ctx.AllocDir != nil {
env.SetAllocDir(ctx.AllocDir.SharedDir)
taskdir, ok := ctx.AllocDir.TaskDirs[task.Name]
if !ok {
// TODO: Update this to return an error
}

env.SetTaskLocalDir(filepath.Join(taskdir, allocdir.TaskLocal))
}

if task.Resources != nil {
Expand Down
35 changes: 34 additions & 1 deletion client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package driver

import (
"fmt"
"log"
"path"
"path/filepath"
"runtime"
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/executor"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -41,12 +46,40 @@ func (d *ExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
}

func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// Get the command
// Get the command to be ran
command, ok := task.Config["command"]
if !ok || command == "" {
return nil, fmt.Errorf("missing command for exec driver")
}

// Check if an artificat is specified and attempt to download it
source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
// We use go-getter to support a variety of protocols, but need to change
// file permissions of the resulted download to be executable

// Create a location to download the artifact.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
destDir := filepath.Join(taskDir, allocdir.TaskLocal)

artifactName := path.Base(source)
artifactFile := filepath.Join(destDir, artifactName)
if err := getter.GetFile(artifactFile, source); err != nil {
return nil, fmt.Errorf("Error downloading artifact for Exec driver: %s", err)
}

// Add execution permissions to the newly downloaded artifact
if runtime.GOOS != "windows" {
if err := syscall.Chmod(artifactFile, 0755); err != nil {
log.Printf("[ERR] driver.Exec: Error making artifact executable: %s", err)
}
}
}

// Get the environment variables.
envVars := TaskEnvironmentVariables(ctx, task)

Expand Down
99 changes: 99 additions & 0 deletions client/driver/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"path/filepath"
"reflect"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -120,6 +121,104 @@ func TestExecDriver_Start_Wait(t *testing.T) {
}
}

func TestExecDriver_Start_Artifact_basic(t *testing.T) {
ctestutils.ExecCompatible(t)
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}

task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": filepath.Join("$NOMAD_TASK_DIR", file),
},
Resources: basicResources,
}

driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)

handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}

// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}

// Task should terminate quickly
select {
case err := <-handle.WaitCh():
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}

func TestExecDriver_Start_Artifact_expanded(t *testing.T) {
ctestutils.ExecCompatible(t)
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}

task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": "/bin/bash",
"args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)),
},
Resources: basicResources,
}

driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)

handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}

// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}

// Task should terminate quickly
select {
case err := <-handle.WaitCh():
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}
func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
ctestutils.ExecCompatible(t)

Expand Down
39 changes: 8 additions & 31 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package driver
import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
Expand All @@ -14,6 +11,7 @@ import (
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/executor"
Expand Down Expand Up @@ -97,37 +95,18 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("missing jar source for Java Jar driver")
}

// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}

// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)

// Create a location to download the binary.
fName := path.Base(source)
fPath := filepath.Join(taskLocal, fName)
f, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}

defer f.Close()
defer resp.Body.Close()
destDir := filepath.Join(taskDir, allocdir.TaskLocal)

// Copy remote file to local directory for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(f, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying jar from source: %s", ioErr)
// Create a location to download the binary.
jarName := path.Base(source)
jarPath := filepath.Join(destDir, jarName)
if err := getter.GetFile(jarPath, source); err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}

// Get the environment variables.
Expand All @@ -141,10 +120,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
args = append(args, jvm_options)
}

// Build the argument list
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, fName))

// Build the argument list.
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, jarName))
if argRaw, ok := task.Config["args"]; ok {
args = append(args, argRaw)
}
Expand Down
38 changes: 9 additions & 29 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
Expand All @@ -19,6 +18,7 @@ import (
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -94,45 +94,25 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Missing required Task Resource: Memory")
}

// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Qemu driver: %s", err)
}

// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)

// Create a location in the local directory to download and store the image.
// TODO: Caching
// Create a location to download the binary.
destDir := filepath.Join(taskDir, allocdir.TaskLocal)
vmID := fmt.Sprintf("qemu-vm-%s-%s", structs.GenerateUUID(), filepath.Base(source))
fPath := filepath.Join(taskLocal, vmID)
vmPath, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}

defer vmPath.Close()
defer resp.Body.Close()

// Copy remote file to local AllocDir for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(vmPath, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying Qemu image from source: %s", ioErr)
vmPath := filepath.Join(destDir, vmID)
if err := getter.GetFile(vmPath, source); err != nil {
return nil, fmt.Errorf("Error downloading artifact for Qemu driver: %s", err)
}

// compute and check checksum
if check, ok := task.Config["checksum"]; ok {
d.logger.Printf("[DEBUG] Running checksum on (%s)", vmID)
hasher := sha256.New()
file, err := os.Open(vmPath.Name())
file, err := os.Open(vmPath)
if err != nil {
return nil, fmt.Errorf("Failed to open file for checksum")
}
Expand Down Expand Up @@ -163,7 +143,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
"-machine", "type=pc,accel=" + accelerator,
"-name", vmID,
"-m", mem,
"-drive", "file=" + vmPath.Name(),
"-drive", "file=" + vmPath,
"-nodefconfig",
"-nodefaults",
"-nographic",
Expand Down Expand Up @@ -240,7 +220,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Create and Return Handle
h := &qemuHandle{
proc: cmd.Process,
vmID: vmPath.Name(),
vmID: vmPath,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
}
Expand Down
Loading

0 comments on commit 1e07daa

Please sign in to comment.