Skip to content

Commit

Permalink
Consul with CNI and host_network addresses (#9095)
Browse files Browse the repository at this point in the history
* consul: advertise cni and multi host interface addresses

* structs: add service/check address_mode validation

* ar/groupservices: fetch networkstatus at hook runtime

* ar/groupservice: nil check network status getter before calling

* consul: comment network status can be nil
  • Loading branch information
nickethier authored Oct 15, 2020
1 parent 5f56b58 commit 7b50685
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ IMPROVEMENTS:
* client: Updated consul-template to v0.25.0 - config `function_blacklist` deprecated and replaced with `function_denylist` [[GH-8988](https://github.com/hashicorp/nomad/pull/8988)]
* config: Deprecated terms `blacklist` and `whitelist` from configuration and replaced them with `denylist` and `allowlist`. [[GH-9019](https://github.com/hashicorp/nomad/issues/9019)]
* consul: Support Consul namespace (Consul Enterprise) in client configuration. [[GH-8849](https://github.com/hashicorp/nomad/pull/8849)]
* consul: Support advertising CNI and multi-host network addresses to consul [[GH-8801](https://github.com/hashicorp/nomad/issues/8801)]
* consul/connect: Dynamically select envoy sidecar at runtime [[GH-8945](https://github.com/hashicorp/nomad/pull/8945)]
* csi: Relaxed validation requirements when checking volume capabilities with controller plugins, to accommodate existing plugin behaviors. [[GH-9049](https://github.com/hashicorp/nomad/issues/9049)]
* driver/docker: Upgrade pause container and detect architecture [[GH-8957](https://github.com/hashicorp/nomad/pull/8957)]
Expand Down
6 changes: 6 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ func (ar *allocRunner) SetNetworkStatus(s *structs.AllocNetworkStatus) {
ar.state.NetworkStatus = s.Copy()
}

func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
return ar.state.NetworkStatus.Copy()
}

// AllocState returns a copy of allocation state including a snapshot of task
// states.
func (ar *allocRunner) AllocState() *state.State {
Expand Down
11 changes: 6 additions & 5 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: ar.consulClient,
restarter: ar,
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
logger: hookLogger,
alloc: alloc,
consul: ar.consulClient,
restarter: ar,
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
networkStatusGetter: ar,
logger: hookLogger,
}),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
Expand Down
53 changes: 35 additions & 18 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,29 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
)

type networkStatusGetter interface {
NetworkStatus() *structs.AllocNetworkStatus
}

// groupServiceHook manages task group Consul service registration and
// deregistration.
type groupServiceHook struct {
allocID string
group string
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
delay time.Duration
deregistered bool
allocID string
group string
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
delay time.Duration
deregistered bool
networkStatusGetter networkStatusGetter

logger log.Logger

// The following fields may be updated
canary bool
services []*structs.Service
networks structs.Networks
ports structs.AllocatedPorts
taskEnvBuilder *taskenv.Builder

// Since Update() may be called concurrently with any other hook all
Expand All @@ -38,11 +44,12 @@ type groupServiceHook struct {
}

type groupServiceHookConfig struct {
alloc *structs.Allocation
consul consul.ConsulServiceAPI
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
logger log.Logger
alloc *structs.Allocation
consul consul.ConsulServiceAPI
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
networkStatusGetter networkStatusGetter
logger log.Logger
}

func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
Expand All @@ -54,18 +61,20 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
}

h := &groupServiceHook{
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
networkStatusGetter: cfg.networkStatusGetter,
}
h.logger = cfg.logger.Named(h.Name())
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services

if cfg.alloc.AllocatedResources != nil {
h.networks = cfg.alloc.AllocatedResources.Shared.Networks
h.ports = cfg.alloc.AllocatedResources.Shared.Ports
}

if cfg.alloc.DeploymentStatus != nil {
Expand Down Expand Up @@ -109,6 +118,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
var networks structs.Networks
if req.Alloc.AllocatedResources != nil {
networks = req.Alloc.AllocatedResources.Shared.Networks
h.ports = req.Alloc.AllocatedResources.Shared.Ports
}

tg := req.Alloc.Job.LookupTaskGroup(h.group)
Expand Down Expand Up @@ -200,6 +210,11 @@ func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)

var netStatus *structs.AllocNetworkStatus
if h.networkStatusGetter != nil {
netStatus = h.networkStatusGetter.NetworkStatus()
}

// Create task services struct with request's driver metadata
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
Expand All @@ -208,6 +223,8 @@ func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
Services: interpolatedServices,
DriverNetwork: h.driverNet(),
Networks: h.networks,
NetworkStatus: netStatus,
Ports: h.ports,
Canary: h.canary,
}
}
4 changes: 4 additions & 0 deletions client/allocrunner/taskrunner/service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type serviceHook struct {
canary bool
services []*structs.Service
networks structs.Networks
ports structs.AllocatedPorts
taskEnv *taskenv.TaskEnv

// initialRegistrations tracks if Poststart has completed, initializing
Expand All @@ -62,6 +63,7 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
taskName: c.task.Name,
services: c.task.Services,
restarter: c.restarter,
ports: c.alloc.AllocatedResources.Shared.Ports,
}

if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
Expand Down Expand Up @@ -141,6 +143,7 @@ func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error
h.services = task.Services
h.networks = networks
h.canary = canary
h.ports = req.Alloc.AllocatedResources.Shared.Ports

return nil
}
Expand Down Expand Up @@ -195,5 +198,6 @@ func (h *serviceHook) getWorkloadServices() *agentconsul.WorkloadServices {
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
Ports: h.ports,
}
}
53 changes: 45 additions & 8 deletions command/agent/consul/service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
}

// Determine the address to advertise based on the mode
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork)
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
if err != nil {
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
}
Expand Down Expand Up @@ -934,7 +934,7 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
addrMode = structs.AddressModeHost
}

ip, port, err := getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork)
ip, port, err := getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
if err != nil {
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
}
Expand Down Expand Up @@ -1448,15 +1448,15 @@ func getNomadSidecar(id string, services map[string]*api.AgentService) *api.Agen
// getAddress returns the IP and port to use for a service or check. If no port
// label is specified (an empty value), zero values are returned because no
// address could be resolved.
func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork) (string, int, error) {
func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork, ports structs.AllocatedPorts, netStatus *structs.AllocNetworkStatus) (string, int, error) {
switch addrMode {
case structs.AddressModeAuto:
if driverNet.Advertise() {
addrMode = structs.AddressModeDriver
} else {
addrMode = structs.AddressModeHost
}
return getAddress(addrMode, portLabel, networks, driverNet)
return getAddress(addrMode, portLabel, networks, driverNet, ports, netStatus)
case structs.AddressModeHost:
if portLabel == "" {
if len(networks) != 1 {
Expand All @@ -1471,11 +1471,17 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet
}

// Default path: use host ip:port
ip, port := networks.Port(portLabel)
if ip == "" && port <= 0 {
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
// Try finding port in the AllocatedPorts struct first
// Check in Networks struct for backwards compatibility if not found
mapping, ok := ports.Get(portLabel)
if !ok {
ip, port := networks.Port(portLabel)
if ip == "" && port <= 0 {
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
}
return ip, port, nil
}
return ip, port, nil
return mapping.HostIP, mapping.Value, nil

case structs.AddressModeDriver:
// Require a driver network if driver address mode is used
Expand All @@ -1489,6 +1495,11 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet
}

// If the port is a label, use the driver's port (not the host's)
if port, ok := ports.Get(portLabel); ok {
return driverNet.IP, port.To, nil
}

// Check if old style driver portmap is used
if port, ok := driverNet.PortMap[portLabel]; ok {
return driverNet.IP, port, nil
}
Expand All @@ -1507,6 +1518,32 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet

return driverNet.IP, port, nil

case "alloc":
if netStatus == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="alloc": no allocation network status reported`)
}

// If no port label is specified just return the IP
if portLabel == "" {
return netStatus.Address, 0, nil
}

// If port is a label and is found then return it
if port, ok := ports.Get(portLabel); ok {
return netStatus.Address, port.Value, nil
}

// Check if port is a literal number
port, err := strconv.Atoi(portLabel)
if err != nil {
// User likely specified wrong port label here
return "", 0, fmt.Errorf("invalid port %q: port label not found or is not numeric", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
return netStatus.Address, port, nil

default:
// Shouldn't happen due to validation, but enforce invariants
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
Expand Down
8 changes: 8 additions & 0 deletions command/agent/consul/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ type WorkloadServices struct {
Services []*structs.Service

// Networks from the task's resources stanza.
// TODO: remove and use Ports
Networks structs.Networks

// NetworkStatus from alloc if network namespace is created
// Can be nil
NetworkStatus *structs.AllocNetworkStatus

// AllocatedPorts is the list of port mappings
Ports structs.AllocatedPorts

// DriverExec is the script executor for the task's driver.
// For group services this is nil and script execution is managed by
// a tasklet in the taskrunner script_check_hook
Expand Down
Loading

0 comments on commit 7b50685

Please sign in to comment.