diff --git a/api/jobs_test.go b/api/jobs_test.go index 6c4f5f66c81..78f809bca71 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -356,9 +356,10 @@ func TestJobs_Canonicalize(t *testing.T) { }, Services: []*Service{ { - Name: "global-redis-check", - Tags: []string{"global", "cache"}, - PortLabel: "db", + Name: "global-redis-check", + Tags: []string{"global", "cache"}, + PortLabel: "db", + AddressMode: "auto", Checks: []ServiceCheck{ { Name: "alive", diff --git a/api/tasks.go b/api/tasks.go index b6f89c04d6a..aac90ee4070 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -95,17 +95,23 @@ type ServiceCheck struct { // The Service model represents a Consul service definition type Service struct { - Id string - Name string - Tags []string - PortLabel string `mapstructure:"port"` - Checks []ServiceCheck + Id string + Name string + Tags []string + PortLabel string `mapstructure:"port"` + AddressMode string `mapstructure:"address_mode"` + Checks []ServiceCheck } func (s *Service) Canonicalize(t *Task, tg *TaskGroup, job *Job) { if s.Name == "" { s.Name = fmt.Sprintf("%s-%s-%s", *job.Name, *tg.Name, t.Name) } + + // Default to AddressModeAuto + if s.AddressMode == "" { + s.AddressMode = "auto" + } } // EphemeralDisk is an ephemeral disk object diff --git a/client/consul.go b/client/consul.go index 043a17bdb16..5635bc36226 100644 --- a/client/consul.go +++ b/client/consul.go @@ -2,13 +2,14 @@ package client import ( "github.com/hashicorp/nomad/client/driver" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" ) // ConsulServiceAPI is the interface the Nomad Client uses to register and // remove services and checks from Consul. type ConsulServiceAPI interface { - RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor) error + RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error RemoveTask(allocID string, task *structs.Task) - UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor) error + UpdateTask(allocID string, existing, newTask *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error } diff --git a/client/consul_test.go b/client/consul_test.go index 95dacb960e8..b8f282ebd80 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/hashicorp/nomad/client/driver" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/structs" ) @@ -18,9 +19,10 @@ type mockConsulOp struct { allocID string task *structs.Task exec driver.ScriptExecutor + net *cstructs.DriverNetwork } -func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor) mockConsulOp { +func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) mockConsulOp { if op != "add" && op != "remove" && op != "update" { panic(fmt.Errorf("invalid consul op: %s", op)) } @@ -29,6 +31,7 @@ func newMockConsulOp(op, allocID string, task *structs.Task, exec driver.ScriptE allocID: allocID, task: task, exec: exec, + net: net, } } @@ -52,19 +55,19 @@ func newMockConsulServiceClient() *mockConsulServiceClient { return &m } -func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor) error { +func (m *mockConsulServiceClient) UpdateTask(allocID string, old, new *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T)", allocID, old, new, exec) - m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec)) + m.logger.Printf("[TEST] mock_consul: UpdateTask(%q, %v, %v, %T, %x)", allocID, old, new, exec, net.Hash()) + m.ops = append(m.ops, newMockConsulOp("update", allocID, new, exec, net)) return nil } -func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor) error { +func (m *mockConsulServiceClient) RegisterTask(allocID string, task *structs.Task, exec driver.ScriptExecutor, net *cstructs.DriverNetwork) error { m.mu.Lock() defer m.mu.Unlock() - m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T)", allocID, task.Name, exec) - m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec)) + m.logger.Printf("[TEST] mock_consul: RegisterTask(%q, %q, %T, %x)", allocID, task.Name, exec, net.Hash()) + m.ops = append(m.ops, newMockConsulOp("add", allocID, task, exec, net)) return nil } @@ -72,5 +75,5 @@ func (m *mockConsulServiceClient) RemoveTask(allocID string, task *structs.Task) m.mu.Lock() defer m.mu.Unlock() m.logger.Printf("[TEST] mock_consul: RemoveTask(%q, %q)", allocID, task.Name) - m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil)) + m.ops = append(m.ops, newMockConsulOp("remove", allocID, task, nil, nil)) } diff --git a/client/driver/docker.go b/client/driver/docker.go index b28c1a606c8..b76a9b50db4 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -471,7 +471,7 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*Prestart return nil, err } - // Set state needed by Start() + // Set state needed by Start d.driverConfig = driverConfig // Initialize docker API clients @@ -485,15 +485,21 @@ func (d *DockerDriver) Prestart(ctx *ExecContext, task *structs.Task) (*Prestart if err != nil { return nil, err } + d.imageID = id resp := NewPrestartResponse() resp.CreatedResources.Add(dockerImageResKey, id) - resp.PortMap = d.driverConfig.PortMap - d.imageID = id + + // Return the PortMap if it's set + if len(driverConfig.PortMap) > 0 { + resp.Network = &cstructs.DriverNetwork{ + PortMap: driverConfig.PortMap, + } + } return resp, nil } -func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) { +func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (*StartResponse, error) { pluginLogFile := filepath.Join(ctx.TaskDir.Dir, "executor.out") executorConfig := &dstructs.ExecutorConfig{ @@ -560,6 +566,15 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle pluginClient.Kill() return nil, fmt.Errorf("Failed to start container %s: %s", container.ID, err) } + // InspectContainer to get all of the container metadata as + // much of the metadata (eg networking) isn't populated until + // the container is started + if container, err = client.InspectContainer(container.ID); err != nil { + err = fmt.Errorf("failed to inspect started container %s: %s", container.ID, err) + d.logger.Printf("[ERR] driver.docker: %v", err) + pluginClient.Kill() + return nil, structs.NewRecoverableError(err, true) + } d.logger.Printf("[INFO] driver.docker: started container %s", container.ID) } else { d.logger.Printf("[DEBUG] driver.docker: re-attaching to container %s with status %q", @@ -585,7 +600,58 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle } go h.collectStats() go h.run() - return h, nil + + // Detect container address + ip, autoUse := d.detectIP(container) + + // Create a response with the driver handle and container network metadata + resp := &StartResponse{ + Handle: h, + Network: &cstructs.DriverNetwork{ + PortMap: d.driverConfig.PortMap, + IP: ip, + AutoAdvertise: autoUse, + }, + } + return resp, nil +} + +// detectIP of Docker container. Returns the first IP found as well as true if +// the IP should be advertised (bridge network IPs return false). Returns an +// empty string and false if no IP could be found. +func (d *DockerDriver) detectIP(c *docker.Container) (string, bool) { + if c.NetworkSettings == nil { + // This should only happen if there's been a coding error (such + // as not calling InspetContainer after CreateContainer). Code + // defensively in case the Docker API changes subtly. + d.logger.Printf("[ERROR] driver.docker: no network settings for container %s", c.ID) + return "", false + } + + ip, ipName := "", "" + auto := false + for name, net := range c.NetworkSettings.Networks { + if net.IPAddress == "" { + // Ignore networks without an IP address + continue + } + + ip = net.IPAddress + ipName = name + + // Don't auto-advertise bridge IPs + if name != "bridge" { + auto = true + } + + break + } + + if n := len(c.NetworkSettings.Networks); n > 1 { + d.logger.Printf("[WARN] driver.docker: multiple (%d) Docker networks for container %q but Nomad only supports 1: choosing %q", n, c.ID, ipName) + } + + return ip, auto } func (d *DockerDriver) Cleanup(_ *ExecContext, res *CreatedResources) error { diff --git a/client/driver/docker_test.go b/client/driver/docker_test.go index a6cb5e4eb4e..404b9513797 100644 --- a/client/driver/docker_test.go +++ b/client/driver/docker_test.go @@ -8,6 +8,7 @@ import ( "path/filepath" "reflect" "runtime/debug" + "sort" "strconv" "strings" "testing" @@ -108,34 +109,38 @@ func dockerSetupWithClient(t *testing.T, task *structs.Task, client *docker.Clie driver := NewDockerDriver(tctx.DriverCtx) copyImage(t, tctx.ExecCtx.TaskDir, "busybox.tar") - resp, err := driver.Prestart(tctx.ExecCtx, task) + presp, err := driver.Prestart(tctx.ExecCtx, task) if err != nil { + driver.Cleanup(tctx.ExecCtx, presp.CreatedResources) tctx.AllocDir.Destroy() t.Fatalf("error in prestart: %v", err) } + // Update the exec ctx with the driver network env vars + tctx.ExecCtx.TaskEnv = tctx.EnvBuilder.SetDriverNetwork(presp.Network).Build() - // At runtime this is handled by TaskRunner - tctx.EnvBuilder.SetPortMap(resp.PortMap) - tctx.ExecCtx.TaskEnv = tctx.EnvBuilder.Build() - - handle, err := driver.Start(tctx.ExecCtx, task) + sresp, err := driver.Start(tctx.ExecCtx, task) if err != nil { + driver.Cleanup(tctx.ExecCtx, presp.CreatedResources) tctx.AllocDir.Destroy() t.Fatalf("Failed to start driver: %s\nStack\n%s", err, debug.Stack()) } - if handle == nil { + if sresp.Handle == nil { + driver.Cleanup(tctx.ExecCtx, presp.CreatedResources) tctx.AllocDir.Destroy() t.Fatalf("handle is nil\nStack\n%s", debug.Stack()) } + // At runtime this is handled by TaskRunner + tctx.ExecCtx.TaskEnv = tctx.EnvBuilder.SetDriverNetwork(sresp.Network).Build() + cleanup := func() { - driver.Cleanup(tctx.ExecCtx, resp.CreatedResources) - handle.Kill() + driver.Cleanup(tctx.ExecCtx, presp.CreatedResources) + sresp.Handle.Kill() tctx.AllocDir.Destroy() } - return client, handle, cleanup + return client, sresp.Handle, cleanup } func newTestDockerClient(t *testing.T) *docker.Client { @@ -204,21 +209,21 @@ func TestDockerDriver_StartOpen_Wait(t *testing.T) { t.Fatalf("error in prestart: %v", err) } - handle, err := d.Start(ctx.ExecCtx, task) + resp, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) } - if handle == nil { + if resp.Handle == nil { t.Fatalf("missing handle") } - defer handle.Kill() + defer resp.Handle.Kill() // Attempt to open - handle2, err := d.Open(ctx.ExecCtx, handle.ID()) + resp2, err := d.Open(ctx.ExecCtx, resp.Handle.ID()) if err != nil { t.Fatalf("err: %v", err) } - if handle2 == nil { + if resp2 == nil { t.Fatalf("missing handle") } } @@ -299,17 +304,14 @@ func TestDockerDriver_Start_LoadImage(t *testing.T) { if err != nil { t.Fatalf("error in prestart: %v", err) } - handle, err := d.Start(ctx.ExecCtx, task) + resp, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) } - if handle == nil { - t.Fatalf("missing handle") - } - defer handle.Kill() + defer resp.Handle.Kill() select { - case res := <-handle.WaitCh(): + case res := <-resp.Handle.WaitCh(): if !res.Successful() { t.Fatalf("err: %v", res) } @@ -415,17 +417,14 @@ func TestDockerDriver_Start_Wait_AllocDir(t *testing.T) { if err != nil { t.Fatalf("error in prestart: %v", err) } - handle, err := d.Start(ctx.ExecCtx, task) + resp, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) } - if handle == nil { - t.Fatalf("missing handle") - } - defer handle.Kill() + defer resp.Handle.Kill() select { - case res := <-handle.WaitCh(): + case res := <-resp.Handle.WaitCh(): if !res.Successful() { t.Fatalf("err: %v", res) } @@ -509,10 +508,12 @@ func TestDockerDriver_StartN(t *testing.T) { if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } - handles[idx], err = d.Start(ctx.ExecCtx, task) + resp, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) + continue } + handles[idx] = resp.Handle } t.Log("All tasks are started. Terminating...") @@ -569,10 +570,12 @@ func TestDockerDriver_StartNVersions(t *testing.T) { if err != nil { t.Fatalf("error in prestart #%d: %v", idx+1, err) } - handles[idx], err = d.Start(ctx.ExecCtx, task) + resp, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Errorf("Failed starting task #%d: %s", idx+1, err) + continue } + handles[idx] = resp.Handle } t.Log("All tasks are started. Terminating...") @@ -912,11 +915,12 @@ func TestDockerDriver_PortsMapping(t *testing.T) { } expectedEnvironment := map[string]string{ - "NOMAD_ADDR_main": "127.0.0.1:8080", - "NOMAD_ADDR_REDIS": "127.0.0.1:6379", + "NOMAD_PORT_main": "8080", + "NOMAD_PORT_REDIS": "6379", "NOMAD_HOST_PORT_main": strconv.Itoa(docker_reserved), } + sort.Strings(container.Config.Env) for key, val := range expectedEnvironment { search := fmt.Sprintf("%s=%s", key, val) if !inSlice(search, container.Config.Env) { @@ -963,9 +967,9 @@ func TestDockerDriver_User(t *testing.T) { // It should fail because the user "alice" does not exist on the given // image. - handle, err := driver.Start(ctx.ExecCtx, task) + resp, err := driver.Start(ctx.ExecCtx, task) if err == nil { - handle.Kill() + resp.Handle.Kill() t.Fatalf("Should've failed") } @@ -1164,14 +1168,14 @@ func TestDockerDriver_VolumesDisabled(t *testing.T) { if err != nil { t.Fatalf("error in prestart: %v", err) } - handle, err := driver.Start(execCtx, task) + resp, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("err: %v", err) } - defer handle.Kill() + defer resp.Handle.Kill() select { - case res := <-handle.WaitCh(): + case res := <-resp.Handle.WaitCh(): if !res.Successful() { t.Fatalf("unexpected err: %v", res) } @@ -1215,14 +1219,14 @@ func TestDockerDriver_VolumesEnabled(t *testing.T) { if err != nil { t.Fatalf("error in prestart: %v", err) } - handle, err := driver.Start(execCtx, task) + resp, err := driver.Start(execCtx, task) if err != nil { t.Fatalf("Failed to start docker driver: %v", err) } - defer handle.Kill() + defer resp.Handle.Kill() select { - case res := <-handle.WaitCh(): + case res := <-resp.Handle.WaitCh(): if !res.Successful() { t.Fatalf("unexpected err: %v", res) } diff --git a/client/driver/docker_unix_test.go b/client/driver/docker_unix_test.go index ca9f5b1de21..4d6574f0e4e 100644 --- a/client/driver/docker_unix_test.go +++ b/client/driver/docker_unix_test.go @@ -66,24 +66,21 @@ done if err != nil { t.Fatalf("error in prestart: %v", err) } - handle, err := d.Start(ctx.ExecCtx, task) + resp, err := d.Start(ctx.ExecCtx, task) if err != nil { t.Fatalf("err: %v", err) } - if handle == nil { - t.Fatalf("missing handle") - } - defer handle.Kill() + defer resp.Handle.Kill() - waitForExist(t, handle.(*DockerHandle).client, handle.(*DockerHandle)) + waitForExist(t, resp.Handle.(*DockerHandle).client, resp.Handle.(*DockerHandle)) time.Sleep(1 * time.Second) - if err := handle.Signal(syscall.SIGUSR1); err != nil { + if err := resp.Handle.Signal(syscall.SIGUSR1); err != nil { t.Fatalf("Signal returned an error: %v", err) } select { - case res := <-handle.WaitCh(): + case res := <-resp.Handle.WaitCh(): if res.Successful() { t.Fatalf("should err: %v", res) } diff --git a/client/driver/driver.go b/client/driver/driver.go index a04a798d22d..5a762fb94d4 100644 --- a/client/driver/driver.go +++ b/client/driver/driver.go @@ -58,9 +58,13 @@ type PrestartResponse struct { // CreatedResources by the driver. CreatedResources *CreatedResources - // PortMap can be set by drivers to replace ports in environment - // variables with driver-specific mappings. - PortMap map[string]int + // Network contains driver-specific network parameters such as the port + // map between the host and a container. + // + // Since the network configuration may not be fully populated by + // Prestart, it will only be used for creating an environment for + // Start. It will be overridden by the DriverNetwork returned by Start. + Network *cstructs.DriverNetwork } // NewPrestartResponse creates a new PrestartResponse with CreatedResources @@ -183,6 +187,20 @@ func (r *CreatedResources) Hash() []byte { return h.Sum(nil) } +// StartResponse is returned by Driver.Start. +type StartResponse struct { + // Handle to the driver's task executor for controlling the lifecycle + // of the task. + Handle DriverHandle + + // Network contains driver-specific network parameters such as the port + // map between the host and a container. + // + // Network may be nil as not all drivers or configurations create + // networks. + Network *cstructs.DriverNetwork +} + // Driver is used for execution of tasks. This allows Nomad // to support many pluggable implementations of task drivers. // Examples could include LXC, Docker, Qemu, etc. @@ -196,8 +214,11 @@ type Driver interface { // CreatedResources may be non-nil even when an error occurs. Prestart(*ExecContext, *structs.Task) (*PrestartResponse, error) - // Start is used to being task execution - Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) + // Start is used to begin task execution. If error is nil, + // StartResponse.Handle will be the handle to the task's executor. + // StartResponse.Network may be nil if the task doesn't configure a + // network. + Start(ctx *ExecContext, task *structs.Task) (*StartResponse, error) // Open is used to re-open a handle to a task Open(ctx *ExecContext, handleID string) (DriverHandle, error) @@ -208,7 +229,7 @@ type Driver interface { // // If Cleanup returns a recoverable error it may be retried. On retry // it will be passed the same CreatedResources, so all successfully - // cleaned up resources should be removed. + // cleaned up resources should be removed or handled idempotently. Cleanup(*ExecContext, *CreatedResources) error // Drivers must validate their configuration diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 0abc4fd35a7..481bd88535c 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -8,6 +8,7 @@ import ( "strings" "sync" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" hargs "github.com/hashicorp/nomad/helper/args" "github.com/hashicorp/nomad/nomad/structs" @@ -58,16 +59,21 @@ const ( // AddrPrefix is the prefix for passing both dynamic and static port // allocations to tasks. // E.g $NOMAD_ADDR_http=127.0.0.1:80 + // + // The ip:port are always the host's. AddrPrefix = "NOMAD_ADDR_" - // IpPrefix is the prefix for passing the IP of a port allocation to a task. + // IpPrefix is the prefix for passing the host IP of a port allocation + // to a task. IpPrefix = "NOMAD_IP_" // PortPrefix is the prefix for passing the port allocation to a task. + // It will be the task's port if a port map is specified. Task's should + // bind to this port. PortPrefix = "NOMAD_PORT_" - // HostPortPrefix is the prefix for passing the host port when a portmap is - // specified. + // HostPortPrefix is the prefix for passing the host port when a port + // map is specified. HostPortPrefix = "NOMAD_HOST_PORT_" // MetaPrefix is the prefix for passing task meta data. @@ -202,7 +208,6 @@ type Builder struct { region string allocId string allocName string - portMap map[string]string vaultToken string injectVaultToken bool jobName string @@ -210,9 +215,13 @@ type Builder struct { // otherPorts for tasks in the same alloc otherPorts map[string]string + // driverNetwork is the network defined by the driver (or nil if none + // was defined). + driverNetwork *cstructs.DriverNetwork + // network resources from the task; must be lazily turned into env vars - // because portMaps can change after builder creation and affect - // network env vars. + // because portMaps and advertiseIP can change after builder creation + // and affect network env vars. networks []*structs.NetworkResource mu *sync.RWMutex @@ -287,21 +296,8 @@ func (b *Builder) Build() *TaskEnv { nodeAttrs[nodeRegionKey] = b.region } - // Build the addrs for this task - for _, network := range b.networks { - for label, intVal := range network.MapLabelToValues(nil) { - value := strconv.Itoa(intVal) - envMap[fmt.Sprintf("%s%s", IpPrefix, label)] = network.IP - envMap[fmt.Sprintf("%s%s", HostPortPrefix, label)] = value - if forwardedPort, ok := b.portMap[label]; ok { - value = forwardedPort - } - envMap[fmt.Sprintf("%s%s", PortPrefix, label)] = value - ipPort := net.JoinHostPort(network.IP, value) - envMap[fmt.Sprintf("%s%s", AddrPrefix, label)] = ipPort - - } - } + // Build the network related env vars + buildNetworkEnv(envMap, b.networks, b.driverNetwork) // Build the addr of the other tasks for k, v := range b.otherPorts { @@ -455,17 +451,51 @@ func (b *Builder) SetSecretsDir(dir string) *Builder { return b } -func (b *Builder) SetPortMap(portMap map[string]int) *Builder { - newPortMap := make(map[string]string, len(portMap)) - for k, v := range portMap { - newPortMap[k] = strconv.Itoa(v) - } +// SetDriverNetwork defined by the driver. +func (b *Builder) SetDriverNetwork(n *cstructs.DriverNetwork) *Builder { + ncopy := n.Copy() b.mu.Lock() - b.portMap = newPortMap + b.driverNetwork = ncopy b.mu.Unlock() return b } +// buildNetworkEnv env vars in the given map. +// +// Auto: NOMAD_PORT_