Skip to content

Commit

Permalink
Merge pull request #290 from hashicorp/f-docker-alloc-mount
Browse files Browse the repository at this point in the history
Mount task_local and alloc dir to docker containers
  • Loading branch information
dadgar committed Oct 16, 2015
2 parents b70e8b0 + 70f4fc5 commit 29ce6e0
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 33 deletions.
6 changes: 5 additions & 1 deletion client/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,11 @@ func (r *AllocRunner) Run() {
// Create the execution context
if r.ctx == nil {
allocDir := allocdir.NewAllocDir(filepath.Join(r.config.AllocDir, r.alloc.ID))
allocDir.Build(tg.Tasks)
if err := allocDir.Build(tg.Tasks); err != nil {
r.logger.Printf("[WARN] client: failed to build task directories: %v", err)
r.setStatus(structs.AllocClientStatusFailed, fmt.Sprintf("failed to build task dirs for '%s'", alloc.TaskGroup))
return
}
r.ctx = driver.NewExecContext(allocDir)
}

Expand Down
91 changes: 64 additions & 27 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"encoding/json"
"fmt"
"log"
"path/filepath"
"strconv"
"strings"

docker "github.com/fsouza/go-dockerclient"

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/args"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -97,12 +100,33 @@ func (d *DockerDriver) Fingerprint(cfg *config.Config, node *structs.Node) (bool
return true, nil
}

// We have to call this when we create the container AND when we start it so
// we'll make a function.
func createHostConfig(task *structs.Task) *docker.HostConfig {
// hostConfig holds options for the docker container that are unique to this
// machine, such as resource limits and port mappings
return &docker.HostConfig{
func (d *DockerDriver) containerBinds(alloc *allocdir.AllocDir, task *structs.Task) ([]string, error) {
shared := alloc.SharedDir
local, ok := alloc.TaskDirs[task.Name]
if !ok {
return nil, fmt.Errorf("Failed to find task local directory: %v", task.Name)
}

return []string{
fmt.Sprintf("%s:%s", shared, allocdir.SharedAllocName),
fmt.Sprintf("%s:%s", local, allocdir.TaskLocal),
}, nil
}

// createContainer initializes a struct needed to call docker.client.CreateContainer()
func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task) (docker.CreateContainerOptions, error) {
var c docker.CreateContainerOptions
if task.Resources == nil {
d.logger.Printf("[ERR] driver.docker: task.Resources is empty")
return c, fmt.Errorf("task.Resources is nil and we can't constrain resource usage. We shouldn't have been able to schedule this in the first place.")
}

binds, err := d.containerBinds(ctx.AllocDir, task)
if err != nil {
return c, err
}

hostConfig := &docker.HostConfig{
// Convert MB to bytes. This is an absolute value.
//
// This value represents the total amount of memory a process can use.
Expand Down Expand Up @@ -131,50 +155,46 @@ func createHostConfig(task *structs.Task) *docker.HostConfig {
// - https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
CPUShares: int64(task.Resources.CPU),
}
}

// createContainer initializes a struct needed to call docker.client.CreateContainer()
func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) (docker.CreateContainerOptions, error) {
var c docker.CreateContainerOptions
if task.Resources == nil {
logger.Printf("[ERR] driver.docker: task.Resources is empty")
return c, fmt.Errorf("task.Resources is nil and we can't constrain resource usage. We shouldn't have been able to schedule this in the first place.")
// Binds are used to mount a host volume into the container. We mount a
// local directory for storage and a shared alloc directory that can be
// used to share data between different tasks in the same task group.
Binds: binds,
}

hostConfig := createHostConfig(task)
logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"])
logger.Printf("[DEBUG] driver.docker: using %d cpu shares for %s", hostConfig.CPUShares, task.Config["image"])
d.logger.Printf("[DEBUG] driver.docker: using %d bytes memory for %s", hostConfig.Memory, task.Config["image"])
d.logger.Printf("[DEBUG] driver.docker: using %d cpu shares for %s", hostConfig.CPUShares, task.Config["image"])
d.logger.Printf("[DEBUG] driver.docker: binding directories %#v for %s", hostConfig.Binds, task.Config["image"])

mode, ok := task.Config["network_mode"]
if !ok || mode == "" {
// docker default
logger.Printf("[WARN] driver.docker: no mode specified for networking, defaulting to bridge")
d.logger.Printf("[WARN] driver.docker: no mode specified for networking, defaulting to bridge")
mode = "bridge"
}

// Ignore the container mode for now
switch mode {
case "default", "bridge", "none", "host":
logger.Printf("[DEBUG] driver.docker: using %s as network mode", mode)
d.logger.Printf("[DEBUG] driver.docker: using %s as network mode", mode)
default:
logger.Printf("[ERR] driver.docker: invalid setting for network mode: %s", mode)
d.logger.Printf("[ERR] driver.docker: invalid setting for network mode: %s", mode)
return c, fmt.Errorf("Invalid setting for network mode: %s", mode)
}
hostConfig.NetworkMode = mode

// Setup port mapping (equivalent to -p on docker CLI). Ports must already be
// exposed in the container.
if len(task.Resources.Networks) == 0 {
logger.Print("[WARN] driver.docker: No networks are available for port mapping")
d.logger.Print("[WARN] driver.docker: No networks are available for port mapping")
} else {
network := task.Resources.Networks[0]
dockerPorts := map[docker.Port][]docker.PortBinding{}

for _, port := range network.ListStaticPorts() {
dockerPorts[docker.Port(strconv.Itoa(port)+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
dockerPorts[docker.Port(strconv.Itoa(port)+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d (static)\n", network.IP, port, port)
d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d (static)\n", network.IP, port, port)
}

for label, port := range network.MapDynamicPorts() {
Expand All @@ -188,24 +208,41 @@ func createContainer(ctx *ExecContext, task *structs.Task, logger *log.Logger) (
if _, err := strconv.Atoi(label); err == nil {
dockerPorts[docker.Port(label+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
dockerPorts[docker.Port(label+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %s (mapped)", network.IP, port, label)
d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %s (mapped)", network.IP, port, label)
} else {
dockerPorts[docker.Port(strconv.Itoa(port)+"/tcp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
dockerPorts[docker.Port(strconv.Itoa(port)+"/udp")] = []docker.PortBinding{docker.PortBinding{HostIP: network.IP, HostPort: strconv.Itoa(port)}}
logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d for label %s\n", network.IP, port, port, label)
d.logger.Printf("[DEBUG] driver.docker: allocated port %s:%d -> %d for label %s\n", network.IP, port, port, label)
}
}
hostConfig.PortBindings = dockerPorts
}

// Create environment variables.
env := TaskEnvironmentVariables(ctx, task)
env.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName))
env.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal))

config := &docker.Config{
Env: TaskEnvironmentVariables(ctx, task).List(),
Env: env.List(),
Image: task.Config["image"],
}

rawArgs, hasArgs := task.Config["args"]
parsedArgs, err := args.ParseAndReplace(rawArgs, env.Map())
if err != nil {
return c, err
}

// If the user specified a custom command to run, we'll inject it here.
if command, ok := task.Config["command"]; ok {
config.Cmd = strings.Split(command, " ")
cmd := []string{command}
if hasArgs {
cmd = append(cmd, parsedArgs...)
}
config.Cmd = cmd
} else if hasArgs {
d.logger.Println("[DEBUG] driver.docker: ignoring args because command not specified")
}

return docker.CreateContainerOptions{
Expand Down Expand Up @@ -285,7 +322,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
d.logger.Printf("[DEBUG] driver.docker: using image %s", dockerImage.ID)
d.logger.Printf("[INFO] driver.docker: identified image %s as %s", image, dockerImage.ID)

config, err := createContainer(ctx, task, d.logger)
config, err := d.createContainer(ctx, task)
if err != nil {
d.logger.Printf("[ERR] driver.docker: %s", err)
return nil, fmt.Errorf("Failed to create container config for image %s", image)
Expand Down
78 changes: 76 additions & 2 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package driver

import (
"fmt"
"io/ioutil"
"os/exec"
"path/filepath"
"reflect"
"testing"
"time"

"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/environment"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -102,7 +107,8 @@ func TestDockerDriver_Start_Wait(t *testing.T) {
Name: "redis-demo",
Config: map[string]string{
"image": "redis",
"command": "redis-server -v",
"command": "redis-server",
"args": "-v",
},
Resources: &structs.Resources{
MemoryMB: 256,
Expand Down Expand Up @@ -140,6 +146,61 @@ func TestDockerDriver_Start_Wait(t *testing.T) {
}
}

func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
if !dockerLocated() {
t.SkipNow()
}

exp := []byte{'w', 'i', 'n'}
file := "output.txt"
task := &structs.Task{
Name: "redis-demo",
Config: map[string]string{
"image": "redis",
"command": "/bin/bash",
"args": fmt.Sprintf(`-c "sleep 1; echo -n %s > $%s/%s"`, string(exp), environment.AllocDir, file),
},
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
},
}

driverCtx := testDockerDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handle, err := d.Start(ctx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
if handle == nil {
t.Fatalf("missing handle")
}
defer handle.Kill()

select {
case err := <-handle.WaitCh():
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timeout")
}

// Check that data was written to the shared alloc directory.
outputFile := filepath.Join(ctx.AllocDir.SharedDir, file)
act, err := ioutil.ReadFile(outputFile)
if err != nil {
t.Fatalf("Couldn't read expected output: %v", err)
}

if !reflect.DeepEqual(act, exp) {
t.Fatalf("Command outputted %v; want %v", act, exp)
}
}

func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
if !dockerLocated() {
t.SkipNow()
Expand All @@ -149,7 +210,8 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) {
Name: "redis-demo",
Config: map[string]string{
"image": "redis",
"command": "sleep 10",
"command": "/bin/sleep",
"args": "10",
},
Resources: basicResources,
}
Expand Down Expand Up @@ -188,6 +250,7 @@ func TestDockerDriver_Start_Kill_Wait(t *testing.T) {

func taskTemplate() *structs.Task {
return &structs.Task{
Name: "redis-demo",
Config: map[string]string{
"image": "redis",
},
Expand Down Expand Up @@ -242,6 +305,11 @@ func TestDocker_StartN(t *testing.T) {
t.Log("==> All tasks are started. Terminating...")

for idx, handle := range handles {
if handle == nil {
t.Errorf("Bad handle for task #%d", idx+1)
continue
}

err := handle.Kill()
if err != nil {
t.Errorf("Failed stopping task #%d: %s", idx+1, err)
Expand Down Expand Up @@ -291,6 +359,11 @@ func TestDocker_StartNVersions(t *testing.T) {
t.Log("==> All tasks are started. Terminating...")

for idx, handle := range handles {
if handle == nil {
t.Errorf("Bad handle for task #%d", idx+1)
continue
}

err := handle.Kill()
if err != nil {
t.Errorf("Failed stopping task #%d: %s", idx+1, err)
Expand All @@ -306,6 +379,7 @@ func TestDockerHostNet(t *testing.T) {
}

task := &structs.Task{
Name: "redis-demo",
Config: map[string]string{
"image": "redis",
"network_mode": "host",
Expand Down
8 changes: 8 additions & 0 deletions client/driver/environment/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ const (
// group.
AllocDir = "NOMAD_ALLOC_DIR"

// The path to the tasks local directory where it can store data that is
// persisted to the alloc is removed.
TaskLocalDir = "NOMAD_TASK_DIR"

// The tasks memory limit in MBs.
MemLimit = "NOMAD_MEMORY_LIMIT"

Expand Down Expand Up @@ -70,6 +74,10 @@ func (t TaskEnvironment) SetAllocDir(dir string) {
t[AllocDir] = dir
}

func (t TaskEnvironment) SetTaskLocalDir(dir string) {
t[TaskLocalDir] = dir
}

func (t TaskEnvironment) SetMemLimit(limit int) {
t[MemLimit] = strconv.Itoa(limit)
}
Expand Down
4 changes: 2 additions & 2 deletions client/driver/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestExecDriver_Start_Wait(t *testing.T) {
Name: "sleep",
Config: map[string]string{
"command": "/bin/sleep",
"args": "1",
"args": "2",
},
Resources: basicResources,
}
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestExecDriver_Start_Wait(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
case <-time.After(2 * time.Second):
case <-time.After(4 * time.Second):
t.Fatalf("timeout")
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/fingerprint/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"os/exec"
"runtime"
"strings"
"strings"

"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down
1 change: 1 addition & 0 deletions scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ gox \
-arch="${XC_ARCH}" \
-osarch="!linux/arm !darwin/386" \
-ldflags "-X main.GitCommit ${GIT_COMMIT}${GIT_DIRTY}" \
-cgo \
-output "pkg/{{.OS}}_{{.Arch}}/nomad" \
.

Expand Down
3 changes: 3 additions & 0 deletions website/source/docs/drivers/docker.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ The `docker` driver supports the following configuration in the job specificatio

* `command` - (Optional) The command to run when starting the container.

* `args` - (Optional) Arguments to the optional `command`. If no `command` is
present, `args` are ignored.

* `network_mode` - (Optional) The network mode to be used for the container.
Valid options are `default`, `bridge`, `host` or `none`. If nothing is
specified, the container will start in `bridge` mode. The `container`
Expand Down
Loading

0 comments on commit 29ce6e0

Please sign in to comment.