Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consul with CNI and host_network addresses #9095

Merged
merged 6 commits into from
Oct 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ IMPROVEMENTS:
* client: Updated consul-template to v0.25.0 - config `function_blacklist` deprecated and replaced with `function_denylist` [[GH-8988](https://github.com/hashicorp/nomad/pull/8988)]
* config: Deprecated terms `blacklist` and `whitelist` from configuration and replaced them with `denylist` and `allowlist`. [[GH-9019](https://github.com/hashicorp/nomad/issues/9019)]
* consul: Support Consul namespace (Consul Enterprise) in client configuration. [[GH-8849](https://github.com/hashicorp/nomad/pull/8849)]
* consul: Support advertising CNI and multi-host network addresses to consul [[GH-8801](https://github.com/hashicorp/nomad/issues/8801)]
* consul/connect: Dynamically select envoy sidecar at runtime [[GH-8945](https://github.com/hashicorp/nomad/pull/8945)]
* csi: Relaxed validation requirements when checking volume capabilities with controller plugins, to accommodate existing plugin behaviors. [[GH-9049](https://github.com/hashicorp/nomad/issues/9049)]
* driver/docker: Upgrade pause container and detect architecture [[GH-8957](https://github.com/hashicorp/nomad/pull/8957)]
Expand Down
6 changes: 6 additions & 0 deletions client/allocrunner/alloc_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,12 @@ func (ar *allocRunner) SetNetworkStatus(s *structs.AllocNetworkStatus) {
ar.state.NetworkStatus = s.Copy()
}

func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus {
ar.stateLock.Lock()
defer ar.stateLock.Unlock()
return ar.state.NetworkStatus.Copy()
}

// AllocState returns a copy of allocation state including a snapshot of task
// states.
func (ar *allocRunner) AllocState() *state.State {
Expand Down
11 changes: 6 additions & 5 deletions client/allocrunner/alloc_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,12 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error {
newAllocHealthWatcherHook(hookLogger, alloc, hs, ar.Listener(), ar.consulClient),
newNetworkHook(hookLogger, ns, alloc, nm, nc, ar),
newGroupServiceHook(groupServiceHookConfig{
alloc: alloc,
consul: ar.consulClient,
restarter: ar,
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
logger: hookLogger,
alloc: alloc,
consul: ar.consulClient,
restarter: ar,
taskEnvBuilder: taskenv.NewBuilder(config.Node, ar.Alloc(), nil, config.Region).SetAllocDir(ar.allocDir.AllocDir),
networkStatusGetter: ar,
logger: hookLogger,
}),
newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig),
Expand Down
53 changes: 35 additions & 18 deletions client/allocrunner/groupservice_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,29 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
)

type networkStatusGetter interface {
NetworkStatus() *structs.AllocNetworkStatus
}

// groupServiceHook manages task group Consul service registration and
// deregistration.
type groupServiceHook struct {
allocID string
group string
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
delay time.Duration
deregistered bool
allocID string
group string
restarter agentconsul.WorkloadRestarter
consulClient consul.ConsulServiceAPI
prerun bool
delay time.Duration
deregistered bool
networkStatusGetter networkStatusGetter

logger log.Logger

// The following fields may be updated
canary bool
services []*structs.Service
networks structs.Networks
ports structs.AllocatedPorts
taskEnvBuilder *taskenv.Builder

// Since Update() may be called concurrently with any other hook all
Expand All @@ -38,11 +44,12 @@ type groupServiceHook struct {
}

type groupServiceHookConfig struct {
alloc *structs.Allocation
consul consul.ConsulServiceAPI
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
logger log.Logger
alloc *structs.Allocation
consul consul.ConsulServiceAPI
restarter agentconsul.WorkloadRestarter
taskEnvBuilder *taskenv.Builder
networkStatusGetter networkStatusGetter
logger log.Logger
}

func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
Expand All @@ -54,18 +61,20 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook {
}

h := &groupServiceHook{
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
allocID: cfg.alloc.ID,
group: cfg.alloc.TaskGroup,
restarter: cfg.restarter,
consulClient: cfg.consul,
taskEnvBuilder: cfg.taskEnvBuilder,
delay: shutdownDelay,
networkStatusGetter: cfg.networkStatusGetter,
}
h.logger = cfg.logger.Named(h.Name())
h.services = cfg.alloc.Job.LookupTaskGroup(h.group).Services

if cfg.alloc.AllocatedResources != nil {
h.networks = cfg.alloc.AllocatedResources.Shared.Networks
h.ports = cfg.alloc.AllocatedResources.Shared.Ports
}

if cfg.alloc.DeploymentStatus != nil {
Expand Down Expand Up @@ -109,6 +118,7 @@ func (h *groupServiceHook) Update(req *interfaces.RunnerUpdateRequest) error {
var networks structs.Networks
if req.Alloc.AllocatedResources != nil {
networks = req.Alloc.AllocatedResources.Shared.Networks
h.ports = req.Alloc.AllocatedResources.Shared.Ports
}

tg := req.Alloc.Job.LookupTaskGroup(h.group)
Expand Down Expand Up @@ -200,6 +210,11 @@ func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
// Interpolate with the task's environment
interpolatedServices := taskenv.InterpolateServices(h.taskEnvBuilder.Build(), h.services)

var netStatus *structs.AllocNetworkStatus
if h.networkStatusGetter != nil {
netStatus = h.networkStatusGetter.NetworkStatus()
}

// Create task services struct with request's driver metadata
return &agentconsul.WorkloadServices{
AllocID: h.allocID,
Expand All @@ -208,6 +223,8 @@ func (h *groupServiceHook) getWorkloadServices() *agentconsul.WorkloadServices {
Services: interpolatedServices,
DriverNetwork: h.driverNet(),
Networks: h.networks,
NetworkStatus: netStatus,
Ports: h.ports,
Canary: h.canary,
}
}
4 changes: 4 additions & 0 deletions client/allocrunner/taskrunner/service_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type serviceHook struct {
canary bool
services []*structs.Service
networks structs.Networks
ports structs.AllocatedPorts
taskEnv *taskenv.TaskEnv

// initialRegistrations tracks if Poststart has completed, initializing
Expand All @@ -62,6 +63,7 @@ func newServiceHook(c serviceHookConfig) *serviceHook {
taskName: c.task.Name,
services: c.task.Services,
restarter: c.restarter,
ports: c.alloc.AllocatedResources.Shared.Ports,
}

if res := c.alloc.AllocatedResources.Tasks[c.task.Name]; res != nil {
Expand Down Expand Up @@ -141,6 +143,7 @@ func (h *serviceHook) updateHookFields(req *interfaces.TaskUpdateRequest) error
h.services = task.Services
h.networks = networks
h.canary = canary
h.ports = req.Alloc.AllocatedResources.Shared.Ports

return nil
}
Expand Down Expand Up @@ -195,5 +198,6 @@ func (h *serviceHook) getWorkloadServices() *agentconsul.WorkloadServices {
DriverNetwork: h.driverNet,
Networks: h.networks,
Canary: h.canary,
Ports: h.ports,
}
}
53 changes: 45 additions & 8 deletions command/agent/consul/service_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ func (c *ServiceClient) serviceRegs(ops *operations, service *structs.Service, w
}

// Determine the address to advertise based on the mode
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork)
ip, port, err := getAddress(addrMode, service.PortLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
if err != nil {
return nil, fmt.Errorf("unable to get address for service %q: %v", service.Name, err)
}
Expand Down Expand Up @@ -934,7 +934,7 @@ func (c *ServiceClient) checkRegs(ops *operations, serviceID string, service *st
addrMode = structs.AddressModeHost
}

ip, port, err := getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork)
ip, port, err := getAddress(addrMode, portLabel, workload.Networks, workload.DriverNetwork, workload.Ports, workload.NetworkStatus)
if err != nil {
return nil, fmt.Errorf("error getting address for check %q: %v", check.Name, err)
}
Expand Down Expand Up @@ -1448,15 +1448,15 @@ func getNomadSidecar(id string, services map[string]*api.AgentService) *api.Agen
// getAddress returns the IP and port to use for a service or check. If no port
// label is specified (an empty value), zero values are returned because no
// address could be resolved.
func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork) (string, int, error) {
func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet *drivers.DriverNetwork, ports structs.AllocatedPorts, netStatus *structs.AllocNetworkStatus) (string, int, error) {
switch addrMode {
case structs.AddressModeAuto:
if driverNet.Advertise() {
addrMode = structs.AddressModeDriver
} else {
addrMode = structs.AddressModeHost
}
return getAddress(addrMode, portLabel, networks, driverNet)
return getAddress(addrMode, portLabel, networks, driverNet, ports, netStatus)
case structs.AddressModeHost:
if portLabel == "" {
if len(networks) != 1 {
Expand All @@ -1471,11 +1471,17 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet
}

// Default path: use host ip:port
ip, port := networks.Port(portLabel)
if ip == "" && port <= 0 {
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
// Try finding port in the AllocatedPorts struct first
// Check in Networks struct for backwards compatibility if not found
mapping, ok := ports.Get(portLabel)
if !ok {
ip, port := networks.Port(portLabel)
if ip == "" && port <= 0 {
return "", 0, fmt.Errorf("invalid port %q: port label not found", portLabel)
}
return ip, port, nil
}
return ip, port, nil
return mapping.HostIP, mapping.Value, nil

case structs.AddressModeDriver:
// Require a driver network if driver address mode is used
Expand All @@ -1489,6 +1495,11 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet
}

// If the port is a label, use the driver's port (not the host's)
if port, ok := ports.Get(portLabel); ok {
return driverNet.IP, port.To, nil
}

// Check if old style driver portmap is used
if port, ok := driverNet.PortMap[portLabel]; ok {
return driverNet.IP, port, nil
}
Expand All @@ -1507,6 +1518,32 @@ func getAddress(addrMode, portLabel string, networks structs.Networks, driverNet

return driverNet.IP, port, nil

case "alloc":
if netStatus == nil {
return "", 0, fmt.Errorf(`cannot use address_mode="alloc": no allocation network status reported`)
}

// If no port label is specified just return the IP
if portLabel == "" {
return netStatus.Address, 0, nil
}

// If port is a label and is found then return it
if port, ok := ports.Get(portLabel); ok {
return netStatus.Address, port.Value, nil
}

// Check if port is a literal number
port, err := strconv.Atoi(portLabel)
if err != nil {
// User likely specified wrong port label here
return "", 0, fmt.Errorf("invalid port %q: port label not found or is not numeric", portLabel)
}
if port <= 0 {
return "", 0, fmt.Errorf("invalid port: %q: port must be >0", portLabel)
}
return netStatus.Address, port, nil

default:
// Shouldn't happen due to validation, but enforce invariants
return "", 0, fmt.Errorf("invalid address mode %q", addrMode)
Expand Down
8 changes: 8 additions & 0 deletions command/agent/consul/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ type WorkloadServices struct {
Services []*structs.Service

// Networks from the task's resources stanza.
// TODO: remove and use Ports
Networks structs.Networks

// NetworkStatus from alloc if network namespace is created
// Can be nil
NetworkStatus *structs.AllocNetworkStatus

// AllocatedPorts is the list of port mappings
Ports structs.AllocatedPorts

// DriverExec is the script executor for the task's driver.
// For group services this is nil and script execution is managed by
// a tasklet in the taskrunner script_check_hook
Expand Down
Loading