Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drivers: Use go-getter #288

Merged
merged 1 commit into from
Oct 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package driver
import (
"fmt"
"log"
"path/filepath"
"sync"

"github.com/hashicorp/nomad/client/allocdir"
Expand Down Expand Up @@ -114,6 +115,12 @@ func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment.

if ctx.AllocDir != nil {
env.SetAllocDir(ctx.AllocDir.SharedDir)
taskdir, ok := ctx.AllocDir.TaskDirs[task.Name]
if !ok {
// TODO: Update this to return an error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should update this function to return an error and handle it in the drivers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened #326 to track this

}

env.SetTaskLocalDir(filepath.Join(taskdir, allocdir.TaskLocal))
}

if task.Resources != nil {
Expand Down
35 changes: 34 additions & 1 deletion client/driver/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package driver

import (
"fmt"
"log"
"path"
"path/filepath"
"runtime"
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/executor"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -41,12 +46,40 @@ func (d *ExecDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool,
}

func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
// Get the command
// Get the command to be ran
command, ok := task.Config["command"]
if !ok || command == "" {
return nil, fmt.Errorf("missing command for exec driver")
}

// Check if an artificat is specified and attempt to download it
source, ok := task.Config["artifact_source"]
if ok && source != "" {
// Proceed to download an artifact to be executed.
// We use go-getter to support a variety of protocols, but need to change
// file permissions of the resulted download to be executable

// Create a location to download the artifact.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
destDir := filepath.Join(taskDir, allocdir.TaskLocal)

artifactName := path.Base(source)
artifactFile := filepath.Join(destDir, artifactName)
if err := getter.GetFile(artifactFile, source); err != nil {
return nil, fmt.Errorf("Error downloading artifact for Exec driver: %s", err)
}

// Add execution permissions to the newly downloaded artifact
if runtime.GOOS != "windows" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We drop permissions for some of the drivers but not others. Could we move this into a shared "drop perms" function that can handle the various OSs and share between drivers?

if err := syscall.Chmod(artifactFile, 0755); err != nil {
log.Printf("[ERR] driver.Exec: Error making artifact executable: %s", err)
}
}
}

// Get the environment variables.
envVars := TaskEnvironmentVariables(ctx, task)

Expand Down
99 changes: 99 additions & 0 deletions client/driver/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"path/filepath"
"reflect"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -120,6 +121,104 @@ func TestExecDriver_Start_Wait(t *testing.T) {
}
}

func TestExecDriver_Start_Artifact_basic(t *testing.T) {
ctestutils.ExecCompatible(t)
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}

task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": filepath.Join("$NOMAD_TASK_DIR", file),
},
Resources: basicResources,
}

driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)

handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}

// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}

// Task should terminate quickly
select {
case err := <-handle.WaitCh():
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}

func TestExecDriver_Start_Artifact_expanded(t *testing.T) {
ctestutils.ExecCompatible(t)
var file string
switch runtime.GOOS {
case "darwin":
file = "hi_darwin_amd64"
default:
file = "hi_linux_amd64"
}

task := &structs.Task{
Name: "sleep",
Config: map[string]string{
"artifact_source": fmt.Sprintf("https://dl.dropboxusercontent.com/u/47675/jar_thing/%s", file),
"command": "/bin/bash",
"args": fmt.Sprintf("-c '/bin/sleep 1 && %s'", filepath.Join("$NOMAD_TASK_DIR", file)),
},
Resources: basicResources,
}

driverCtx := testDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewExecDriver(driverCtx)

handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}

// Update should be a no-op
err = handle.Update(task)
if err != nil {
t.Fatalf("err: %v", err)
}

// Task should terminate quickly
select {
case err := <-handle.WaitCh():
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}
}
func TestExecDriver_Start_Wait_AllocDir(t *testing.T) {
ctestutils.ExecCompatible(t)

Expand Down
39 changes: 8 additions & 31 deletions client/driver/java.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package driver
import (
"bytes"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
Expand All @@ -14,6 +11,7 @@ import (
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/executor"
Expand Down Expand Up @@ -97,37 +95,18 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("missing jar source for Java Jar driver")
}

// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}

// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)

// Create a location to download the binary.
fName := path.Base(source)
fPath := filepath.Join(taskLocal, fName)
f, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}

defer f.Close()
defer resp.Body.Close()
destDir := filepath.Join(taskDir, allocdir.TaskLocal)

// Copy remote file to local directory for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(f, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying jar from source: %s", ioErr)
// Create a location to download the binary.
jarName := path.Base(source)
jarPath := filepath.Join(destDir, jarName)
if err := getter.GetFile(jarPath, source); err != nil {
return nil, fmt.Errorf("Error downloading source for Java driver: %s", err)
}

// Get the environment variables.
Expand All @@ -141,10 +120,8 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
args = append(args, jvm_options)
}

// Build the argument list
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, fName))

// Build the argument list.
args = append(args, "-jar", filepath.Join(allocdir.TaskLocal, jarName))
if argRaw, ok := task.Config["args"]; ok {
args = append(args, argRaw)
}
Expand Down
38 changes: 9 additions & 29 deletions client/driver/qemu.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"path/filepath"
Expand All @@ -19,6 +18,7 @@ import (
"syscall"
"time"

"github.com/hashicorp/go-getter"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -94,45 +94,25 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
return nil, fmt.Errorf("Missing required Task Resource: Memory")
}

// Attempt to download the thing
// Should be extracted to some kind of Http Fetcher
// Right now, assume publicly accessible HTTP url
resp, err := http.Get(source)
if err != nil {
return nil, fmt.Errorf("Error downloading source for Qemu driver: %s", err)
}

// Get the tasks local directory.
taskDir, ok := ctx.AllocDir.TaskDirs[d.DriverContext.taskName]
if !ok {
return nil, fmt.Errorf("Could not find task directory for task: %v", d.DriverContext.taskName)
}
taskLocal := filepath.Join(taskDir, allocdir.TaskLocal)

// Create a location in the local directory to download and store the image.
// TODO: Caching
// Create a location to download the binary.
destDir := filepath.Join(taskDir, allocdir.TaskLocal)
vmID := fmt.Sprintf("qemu-vm-%s-%s", structs.GenerateUUID(), filepath.Base(source))
fPath := filepath.Join(taskLocal, vmID)
vmPath, err := os.OpenFile(fPath, os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return nil, fmt.Errorf("Error opening file to download to: %s", err)
}

defer vmPath.Close()
defer resp.Body.Close()

// Copy remote file to local AllocDir for execution
// TODO: a retry of sort if io.Copy fails, for large binaries
_, ioErr := io.Copy(vmPath, resp.Body)
if ioErr != nil {
return nil, fmt.Errorf("Error copying Qemu image from source: %s", ioErr)
vmPath := filepath.Join(destDir, vmID)
if err := getter.GetFile(vmPath, source); err != nil {
return nil, fmt.Errorf("Error downloading artifact for Qemu driver: %s", err)
}

// compute and check checksum
if check, ok := task.Config["checksum"]; ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice feature, I wonder if we should make it a generic "artifact_checksum" so that all the drivers can use this? This could even do things like "md5:abcd" or "sha256:abc123"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can address that in a future pull request, I think. I'm (slowly) adding checksums into go-getter directly hashicorp/go-getter#1

d.logger.Printf("[DEBUG] Running checksum on (%s)", vmID)
hasher := sha256.New()
file, err := os.Open(vmPath.Name())
file, err := os.Open(vmPath)
if err != nil {
return nil, fmt.Errorf("Failed to open file for checksum")
}
Expand Down Expand Up @@ -163,7 +143,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
"-machine", "type=pc,accel=" + accelerator,
"-name", vmID,
"-m", mem,
"-drive", "file=" + vmPath.Name(),
"-drive", "file=" + vmPath,
"-nodefconfig",
"-nodefaults",
"-nographic",
Expand Down Expand Up @@ -240,7 +220,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
// Create and Return Handle
h := &qemuHandle{
proc: cmd.Process,
vmID: vmPath.Name(),
vmID: vmPath,
doneCh: make(chan struct{}),
waitCh: make(chan error, 1),
}
Expand Down
Loading