Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interpolate nomad variables in task environment variables #653

Merged
merged 4 commits into from
Jan 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func (c *Client) setupDrivers() error {

var avail []string
var skipped []string
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger)
driverCtx := driver.NewDriverContext("", c.config, c.config.Node, c.logger, nil)
for name := range driver.BuiltinDrivers {
// Skip fingerprinting drivers that are not in the whitelist if it is
// enabled.
Expand Down
20 changes: 11 additions & 9 deletions client/driver/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/helper/args"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/mapstructure"
)
Expand Down Expand Up @@ -188,9 +187,12 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
}

// Create environment variables.
env := TaskEnvironmentVariables(ctx, task)
env.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName))
env.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal))
taskEnv, err := GetTaskEnv(ctx.AllocDir, d.node, task)
if err != nil {
return c, err
}
taskEnv.SetAllocDir(filepath.Join("/", allocdir.SharedAllocName))
taskEnv.SetTaskLocalDir(filepath.Join("/", allocdir.TaskLocal))

config := &docker.Config{
Image: driverConfig.ImageName,
Expand Down Expand Up @@ -343,20 +345,20 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
d.logger.Printf("[DEBUG] driver.docker: exposed port %s", containerPort)
}

// This was set above in a call to TaskEnvironmentVariables but if we
// This was set above in a call to GetTaskEnv but if we
// have mapped any ports we will need to override them.
//
// TODO refactor the implementation in TaskEnvironmentVariables to match
// TODO refactor the implementation in GetTaskEnv to match
// the 0.2 ports world view. Docker seems to be the only place where
// this is actually needed, but this is kinda hacky.
if len(driverConfig.PortMap) > 0 {
env.SetPorts(network.MapLabelToValues(driverConfig.PortMap))
taskEnv.SetPorts(network.MapLabelToValues(driverConfig.PortMap))
}
hostConfig.PortBindings = publishedPorts
config.ExposedPorts = exposedPorts
}

parsedArgs := args.ParseAndReplace(driverConfig.Args, env.Map())
parsedArgs := taskEnv.ParseAndReplace(driverConfig.Args)

// If the user specified a custom command to run as their entrypoint, we'll
// inject it here.
Expand All @@ -376,7 +378,7 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task, dri
d.logger.Printf("[DEBUG] driver.docker: applied labels on the container: %+v", config.Labels)
}

config.Env = env.List()
config.Env = taskEnv.EnvList()

containerName := fmt.Sprintf("%s-%s", task.Name, ctx.AllocID)
d.logger.Printf("[DEBUG] driver.docker: setting container name to: %s", containerName)
Expand Down
56 changes: 23 additions & 33 deletions client/driver/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,11 @@ import (

docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/environment"
"github.com/hashicorp/nomad/client/driver/env"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/nomad/structs"
)

func testDockerDriverContext(task string) *DriverContext {
cfg := testConfig()
cfg.DevMode = true
return NewDriverContext(task, cfg, cfg.Node, testLogger())
}

// dockerIsConnected checks to see if a docker daemon is available (local or remote)
func dockerIsConnected(t *testing.T) bool {
client, err := docker.NewClientFromEnv()
Expand Down Expand Up @@ -96,23 +90,22 @@ func dockerSetup(t *testing.T, task *structs.Task) (*docker.Client, DriverHandle
t.Fatalf("Failed to initialize client: %s\nStack\n%s", err, debug.Stack())
}

driverCtx := testDockerDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
driverCtx, execCtx := testDriverContexts(task)
driver := NewDockerDriver(driverCtx)

handle, err := driver.Start(ctx, task)
handle, err := driver.Start(execCtx, task)
if err != nil {
ctx.AllocDir.Destroy()
execCtx.AllocDir.Destroy()
t.Fatalf("Failed to start driver: %s\nStack\n%s", err, debug.Stack())
}
if handle == nil {
ctx.AllocDir.Destroy()
execCtx.AllocDir.Destroy()
t.Fatalf("handle is nil\nStack\n%s", debug.Stack())
}

cleanup := func() {
handle.Kill()
ctx.AllocDir.Destroy()
execCtx.AllocDir.Destroy()
}

return client, handle, cleanup
Expand All @@ -138,7 +131,8 @@ func TestDockerDriver_Handle(t *testing.T) {
// This test should always pass, even if docker daemon is not available
func TestDockerDriver_Fingerprint(t *testing.T) {
t.Parallel()
d := NewDockerDriver(testDockerDriverContext(""))
driverCtx, _ := testDriverContexts(&structs.Task{Name: "foo"})
d := NewDockerDriver(driverCtx)
node := &structs.Node{
Attributes: make(map[string]string),
}
Expand Down Expand Up @@ -169,12 +163,11 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
Resources: basicResources,
}

driverCtx := testDockerDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handle, err := d.Start(ctx, task)
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -184,7 +177,7 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) {
defer handle.Kill()

// Attempt to open
handle2, err := d.Open(ctx, handle.ID())
handle2, err := d.Open(execCtx, handle.ID())
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down Expand Up @@ -246,7 +239,7 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
"args": []string{
"-c",
fmt.Sprintf(`sleep 1; echo -n %s > $%s/%s`,
string(exp), environment.AllocDir, file),
string(exp), env.AllocDir, file),
},
},
Resources: &structs.Resources{
Expand All @@ -255,12 +248,11 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
},
}

driverCtx := testDockerDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handle, err := d.Start(ctx, task)
handle, err := d.Start(execCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -279,7 +271,7 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) {
}

// Check that data was written to the shared alloc directory.
outputFile := filepath.Join(ctx.AllocDir.SharedDir, file)
outputFile := filepath.Join(execCtx.AllocDir.SharedDir, file)
act, err := ioutil.ReadFile(outputFile)
if err != nil {
t.Fatalf("Couldn't read expected output: %v", err)
Expand Down Expand Up @@ -350,12 +342,11 @@ func TestDocker_StartN(t *testing.T) {
// Let's spin up a bunch of things
var err error
for idx, task := range taskList {
driverCtx := testDockerDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handles[idx], err = d.Start(ctx, task)
handles[idx], err = d.Start(execCtx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
}
Expand Down Expand Up @@ -408,12 +399,11 @@ func TestDocker_StartNVersions(t *testing.T) {
// Let's spin up a bunch of things
var err error
for idx, task := range taskList {
driverCtx := testDockerDriverContext(task.Name)
ctx := testDriverExecContext(task, driverCtx)
defer ctx.AllocDir.Destroy()
driverCtx, execCtx := testDriverContexts(task)
defer execCtx.AllocDir.Destroy()
d := NewDockerDriver(driverCtx)

handles[idx], err = d.Start(ctx, task)
handles[idx], err = d.Start(execCtx, task)
if err != nil {
t.Errorf("Failed starting task #%d: %s", idx+1, err)
}
Expand Down
32 changes: 16 additions & 16 deletions client/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver/environment"
"github.com/hashicorp/nomad/client/driver/env"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/nomad/structs"

Expand Down Expand Up @@ -66,18 +66,21 @@ type DriverContext struct {
config *config.Config
logger *log.Logger
node *structs.Node
taskEnv *env.TaskEnvironment
}

// NewDriverContext initializes a new DriverContext with the specified fields.
// This enables other packages to create DriverContexts but keeps the fields
// private to the driver. If we want to change this later we can gorename all of
// the fields in DriverContext.
func NewDriverContext(taskName string, config *config.Config, node *structs.Node, logger *log.Logger) *DriverContext {
func NewDriverContext(taskName string, config *config.Config, node *structs.Node,
logger *log.Logger, taskEnv *env.TaskEnvironment) *DriverContext {
return &DriverContext{
taskName: taskName,
config: config,
node: node,
logger: logger,
taskEnv: taskEnv,
}
}

Expand Down Expand Up @@ -125,17 +128,18 @@ func NewExecContext(alloc *allocdir.AllocDir, allocID string) *ExecContext {
return &ExecContext{AllocDir: alloc, AllocID: allocID}
}

// TaskEnvironmentVariables converts exec context and task configuration into a
// GetTaskEnv converts the alloc dir, the node and task configuration into a
// TaskEnvironment.
func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment.TaskEnvironment {
env := environment.NewTaskEnivornment()
env.SetMeta(task.Meta)

if ctx.AllocDir != nil {
env.SetAllocDir(ctx.AllocDir.SharedDir)
taskdir, ok := ctx.AllocDir.TaskDirs[task.Name]
func GetTaskEnv(alloc *allocdir.AllocDir, node *structs.Node, task *structs.Task) (*env.TaskEnvironment, error) {
env := env.NewTaskEnvironment(node).
SetMeta(task.Meta).
SetEnvvars(task.Env)

if alloc != nil {
env.SetAllocDir(alloc.SharedDir)
taskdir, ok := alloc.TaskDirs[task.Name]
if !ok {
// TODO: Update this to return an error
return nil, fmt.Errorf("failed to get task directory for task %q", task.Name)
}

env.SetTaskLocalDir(filepath.Join(taskdir, allocdir.TaskLocal))
Expand All @@ -152,11 +156,7 @@ func TaskEnvironmentVariables(ctx *ExecContext, task *structs.Task) environment.
}
}

if task.Env != nil {
env.SetEnvvars(task.Env)
}

return env
return env.Build(), nil
}

func mapMergeStrInt(maps ...map[string]int) map[string]int {
Expand Down
37 changes: 21 additions & 16 deletions client/driver/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,26 @@ func testConfig() *config.Config {
return conf
}

func testDriverContext(task string) *DriverContext {
func testDriverContexts(task *structs.Task) (*DriverContext, *ExecContext) {
cfg := testConfig()
return NewDriverContext(task, cfg, cfg.Node, testLogger())
}

func testDriverExecContext(task *structs.Task, driverCtx *DriverContext) *ExecContext {
allocDir := allocdir.NewAllocDir(filepath.Join(driverCtx.config.AllocDir, structs.GenerateUUID()))
allocDir := allocdir.NewAllocDir(filepath.Join(cfg.AllocDir, structs.GenerateUUID()))
allocDir.Build([]*structs.Task{task})
ctx := NewExecContext(allocDir, fmt.Sprintf("alloc-id-%d", int(rand.Int31())))
return ctx
execCtx := NewExecContext(allocDir, fmt.Sprintf("alloc-id-%d", int(rand.Int31())))

taskEnv, err := GetTaskEnv(allocDir, cfg.Node, task)
if err != nil {
return nil, nil
}

driverCtx := NewDriverContext(task.Name, cfg, cfg.Node, testLogger(), taskEnv)
return driverCtx, execCtx
}

func TestDriver_KillTimeout(t *testing.T) {
ctx := testDriverContext("foo")
ctx.config.MaxKillTimeout = 10 * time.Second
expected := 1 * time.Second
task := &structs.Task{KillTimeout: expected}
task := &structs.Task{Name: "foo", KillTimeout: expected}
ctx, _ := testDriverContexts(task)
ctx.config.MaxKillTimeout = 10 * time.Second

if actual := ctx.KillTimeout(task); expected != actual {
t.Fatalf("KillTimeout(%v) returned %v; want %v", task, actual, expected)
Expand All @@ -79,9 +82,8 @@ func TestDriver_KillTimeout(t *testing.T) {
}
}

func TestDriver_TaskEnvironmentVariables(t *testing.T) {
func TestDriver_GetTaskEnv(t *testing.T) {
t.Parallel()
ctx := &ExecContext{}
task := &structs.Task{
Env: map[string]string{
"HELLO": "world",
Expand All @@ -104,7 +106,10 @@ func TestDriver_TaskEnvironmentVariables(t *testing.T) {
},
}

env := TaskEnvironmentVariables(ctx, task)
env, err := GetTaskEnv(nil, nil, task)
if err != nil {
t.Fatalf("GetTaskEnv() failed: %v", err)
}
exp := map[string]string{
"NOMAD_CPU_LIMIT": "1000",
"NOMAD_MEMORY_LIMIT": "500",
Expand All @@ -121,9 +126,9 @@ func TestDriver_TaskEnvironmentVariables(t *testing.T) {
"lorem": "ipsum",
}

act := env.Map()
act := env.EnvMap()
if !reflect.DeepEqual(act, exp) {
t.Fatalf("TaskEnvironmentVariables(%#v, %#v) returned %#v; want %#v", ctx, task, act, exp)
t.Fatalf("GetTaskEnv() returned %#v; want %#v", act, exp)
}
}

Expand Down
Loading