From 36f4b5634cc57c1b08923985e73c8352ca8a0ee3 Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Fri, 15 May 2020 11:09:01 -0400 Subject: [PATCH] CNI Implementation (#7518) --- client/allocrunner/network_manager_linux.go | 11 +- client/allocrunner/networking_bridge_linux.go | 99 ++-------- client/allocrunner/networking_cni.go | 182 ++++++++++++++++++ client/client.go | 41 ++-- client/client_test.go | 58 +++--- client/config/config.go | 12 ++ client/fingerprint/bridge.go | 12 ++ client/fingerprint/bridge_default.go | 5 + client/fingerprint/bridge_linux.go | 49 +++++ client/fingerprint/bridge_linux_test.go | 14 ++ client/fingerprint/cni.go | 77 ++++++++ client/fingerprint/cni_test.go | 85 ++++++++ client/fingerprint/env_aws.go | 1 + client/fingerprint/fingerprint.go | 8 + client/fingerprint/fingerprint_linux.go | 1 + client/fingerprint/network.go | 1 + .../fingerprint/test_fixtures/cni/net1.conf | 17 ++ .../test_fixtures/cni/net2.conflist | 25 +++ client/fingerprint_manager.go | 18 ++ nomad/mock/mock.go | 1 + nomad/structs/structs.go | 53 ++--- nomad/structs/structs_test.go | 44 +++++ scheduler/feasible.go | 38 ++++ scheduler/feasible_test.go | 42 ++++ scheduler/stack.go | 10 +- 25 files changed, 724 insertions(+), 180 deletions(-) create mode 100644 client/allocrunner/networking_cni.go create mode 100644 client/fingerprint/bridge.go create mode 100644 client/fingerprint/bridge_default.go create mode 100644 client/fingerprint/bridge_linux.go create mode 100644 client/fingerprint/bridge_linux_test.go create mode 100644 client/fingerprint/cni.go create mode 100644 client/fingerprint/cni_test.go create mode 100644 client/fingerprint/test_fixtures/cni/net1.conf create mode 100644 client/fingerprint/test_fixtures/cni/net2.conflist diff --git a/client/allocrunner/network_manager_linux.go b/client/allocrunner/network_manager_linux.go index 87da239c433..b6702a7fe39 100644 --- a/client/allocrunner/network_manager_linux.go +++ b/client/allocrunner/network_manager_linux.go @@ -117,6 +117,9 @@ func netModeToIsolationMode(netMode string) drivers.NetIsolationMode { case "driver": return drivers.NetIsolationModeTask default: + if strings.HasPrefix(strings.ToLower(netMode), "cni/") { + return drivers.NetIsolationModeGroup + } return drivers.NetIsolationModeHost } } @@ -129,9 +132,13 @@ func newNetworkConfigurator(log hclog.Logger, alloc *structs.Allocation, config return &hostNetworkConfigurator{}, nil } - switch strings.ToLower(tg.Networks[0].Mode) { - case "bridge": + netMode := strings.ToLower(tg.Networks[0].Mode) + + switch { + case netMode == "bridge": return newBridgeNetworkConfigurator(log, config.BridgeNetworkName, config.BridgeNetworkAllocSubnet, config.CNIPath) + case strings.HasPrefix(netMode, "cni/"): + return newCNINetworkConfigurator(log, config.CNIPath, config.CNIInterfacePrefix, config.CNIConfigDir, netMode[4:]) default: return &hostNetworkConfigurator{}, nil } diff --git a/client/allocrunner/networking_bridge_linux.go b/client/allocrunner/networking_bridge_linux.go index af07ad1d990..2bd429b8a61 100644 --- a/client/allocrunner/networking_bridge_linux.go +++ b/client/allocrunner/networking_bridge_linux.go @@ -3,12 +3,7 @@ package allocrunner import ( "context" "fmt" - "math/rand" - "os" - "path/filepath" - "time" - cni "github.com/containerd/go-cni" "github.com/coreos/go-iptables/iptables" hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/nomad/structs" @@ -16,14 +11,6 @@ import ( ) const ( - // envCNIPath is the environment variable name to use to derive the CNI path - // when it is not explicitly set by the client - envCNIPath = "CNI_PATH" - - // defaultCNIPath is the CNI path to use when it is not set by the client - // and is not set by environment variable - defaultCNIPath = "/opt/cni/bin" - // defaultNomadBridgeName is the name of the bridge to use when not set by // the client defaultNomadBridgeName = "nomad" @@ -45,11 +32,10 @@ const ( // shared bridge, configures masquerading for egress traffic and port mapping // for ingress type bridgeNetworkConfigurator struct { - cni cni.CNI + cni *cniNetworkConfigurator allocSubnet string bridgeName string - rand *rand.Rand logger hclog.Logger } @@ -57,21 +43,8 @@ func newBridgeNetworkConfigurator(log hclog.Logger, bridgeName, ipRange, cniPath b := &bridgeNetworkConfigurator{ bridgeName: bridgeName, allocSubnet: ipRange, - rand: rand.New(rand.NewSource(time.Now().Unix())), logger: log, } - if cniPath == "" { - if cniPath = os.Getenv(envCNIPath); cniPath == "" { - cniPath = defaultCNIPath - } - } - - c, err := cni.New(cni.WithPluginDir(filepath.SplitList(cniPath)), - cni.WithInterfacePrefix(bridgeNetworkAllocIfPrefix)) - if err != nil { - return nil, err - } - b.cni = c if b.bridgeName == "" { b.bridgeName = defaultNomadBridgeName @@ -81,6 +54,12 @@ func newBridgeNetworkConfigurator(log hclog.Logger, bridgeName, ipRange, cniPath b.allocSubnet = defaultNomadAllocSubnet } + c, err := newCNINetworkConfiguratorWithConf(log, cniPath, bridgeNetworkAllocIfPrefix, buildNomadBridgeNetConfig(b.bridgeName, b.allocSubnet)) + if err != nil { + return nil, err + } + b.cni = c + return b, nil } @@ -148,72 +127,16 @@ func (b *bridgeNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Al return fmt.Errorf("failed to initialize table forwarding rules: %v", err) } - if err := b.ensureCNIInitialized(); err != nil { - return err - } - - // Depending on the version of bridge cni plugin (< 0.8.4) a known race could occur - // where two alloc attempt to create the nomad bridge at the same time, resulting - // in one of them to fail. This retry attempts to overcome those erroneous failures. - const retry = 3 - for attempt := 1; ; attempt++ { - //TODO eventually returning the IP from the result would be nice to have in the alloc - if _, err := b.cni.Setup(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc))); err != nil { - b.logger.Warn("failed to configure bridge network", "err", err, "attempt", attempt) - if attempt == retry { - return fmt.Errorf("failed to configure bridge network: %v", err) - } - // Sleep for 1 second + jitter - time.Sleep(time.Second + (time.Duration(b.rand.Int63n(1000)) * time.Millisecond)) - continue - } - break - } - - return nil - + return b.cni.Setup(ctx, alloc, spec) } // Teardown calls the CNI plugins with the delete action func (b *bridgeNetworkConfigurator) Teardown(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error { - if err := b.ensureCNIInitialized(); err != nil { - return err - } - - return b.cni.Remove(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc))) -} - -func (b *bridgeNetworkConfigurator) ensureCNIInitialized() error { - if err := b.cni.Status(); cni.IsCNINotInitialized(err) { - return b.cni.Load(cni.WithConfListBytes(b.buildNomadNetConfig())) - } else { - return err - } -} - -// getPortMapping builds a list of portMapping structs that are used as the -// portmapping capability arguments for the portmap CNI plugin -func getPortMapping(alloc *structs.Allocation) []cni.PortMapping { - ports := []cni.PortMapping{} - for _, network := range alloc.AllocatedResources.Shared.Networks { - for _, port := range append(network.DynamicPorts, network.ReservedPorts...) { - if port.To < 1 { - continue - } - for _, proto := range []string{"tcp", "udp"} { - ports = append(ports, cni.PortMapping{ - HostPort: int32(port.Value), - ContainerPort: int32(port.To), - Protocol: proto, - }) - } - } - } - return ports + return b.cni.Teardown(ctx, alloc, spec) } -func (b *bridgeNetworkConfigurator) buildNomadNetConfig() []byte { - return []byte(fmt.Sprintf(nomadCNIConfigTemplate, b.bridgeName, b.allocSubnet, cniAdminChainName)) +func buildNomadBridgeNetConfig(bridgeName, subnet string) []byte { + return []byte(fmt.Sprintf(nomadCNIConfigTemplate, bridgeName, subnet, cniAdminChainName)) } const nomadCNIConfigTemplate = `{ diff --git a/client/allocrunner/networking_cni.go b/client/allocrunner/networking_cni.go new file mode 100644 index 00000000000..15cd2745fd4 --- /dev/null +++ b/client/allocrunner/networking_cni.go @@ -0,0 +1,182 @@ +package allocrunner + +import ( + "context" + "fmt" + "math/rand" + "os" + "path/filepath" + "sort" + "strings" + "time" + + cni "github.com/containerd/go-cni" + cnilibrary "github.com/containernetworking/cni/libcni" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" +) + +const ( + + // envCNIPath is the environment variable name to use to derive the CNI path + // when it is not explicitly set by the client + envCNIPath = "CNI_PATH" + + // defaultCNIPath is the CNI path to use when it is not set by the client + // and is not set by environment variable + defaultCNIPath = "/opt/cni/bin" + + // defaultCNIInterfacePrefix is the network interface to use if not set in + // client config + defaultCNIInterfacePrefix = "eth" +) + +type cniNetworkConfigurator struct { + cni cni.CNI + cniConf []byte + + rand *rand.Rand + logger log.Logger +} + +func newCNINetworkConfigurator(logger log.Logger, cniPath, cniInterfacePrefix, cniConfDir, networkName string) (*cniNetworkConfigurator, error) { + cniConf, err := loadCNIConf(cniConfDir, networkName) + if err != nil { + return nil, fmt.Errorf("failed to load CNI config: %v", err) + } + + return newCNINetworkConfiguratorWithConf(logger, cniPath, cniInterfacePrefix, cniConf) +} + +func newCNINetworkConfiguratorWithConf(logger log.Logger, cniPath, cniInterfacePrefix string, cniConf []byte) (*cniNetworkConfigurator, error) { + conf := &cniNetworkConfigurator{ + cniConf: cniConf, + rand: rand.New(rand.NewSource(time.Now().Unix())), + logger: logger, + } + if cniPath == "" { + if cniPath = os.Getenv(envCNIPath); cniPath == "" { + cniPath = defaultCNIPath + } + } + + if cniInterfacePrefix == "" { + cniInterfacePrefix = defaultCNIInterfacePrefix + } + + c, err := cni.New(cni.WithPluginDir(filepath.SplitList(cniPath)), + cni.WithInterfacePrefix(cniInterfacePrefix)) + if err != nil { + return nil, err + } + conf.cni = c + + return conf, nil +} + +// Setup calls the CNI plugins with the add action +func (c *cniNetworkConfigurator) Setup(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error { + if err := c.ensureCNIInitialized(); err != nil { + return err + } + + // Depending on the version of bridge cni plugin used, a known race could occure + // where two alloc attempt to create the nomad bridge at the same time, resulting + // in one of them to fail. This rety attempts to overcome those erroneous failures. + const retry = 3 + var firstError error + for attempt := 1; ; attempt++ { + //TODO eventually returning the IP from the result would be nice to have in the alloc + if _, err := c.cni.Setup(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc))); err != nil { + c.logger.Warn("failed to configure network", "err", err, "attempt", attempt) + switch attempt { + case 1: + firstError = err + case retry: + return fmt.Errorf("failed to configure network: %v", firstError) + } + + // Sleep for 1 second + jitter + time.Sleep(time.Second + (time.Duration(c.rand.Int63n(1000)) * time.Millisecond)) + continue + } + break + } + + return nil + +} + +func loadCNIConf(confDir, name string) ([]byte, error) { + files, err := cnilibrary.ConfFiles(confDir, []string{".conf", ".conflist", ".json"}) + switch { + case err != nil: + return nil, fmt.Errorf("failed to detect CNI config file: %v", err) + case len(files) == 0: + return nil, fmt.Errorf("no CNI network config found in %s", confDir) + } + + // files contains the network config files associated with cni network. + // Use lexicographical way as a defined order for network config files. + sort.Strings(files) + for _, confFile := range files { + if strings.HasSuffix(confFile, ".conflist") { + confList, err := cnilibrary.ConfListFromFile(confFile) + if err != nil { + return nil, fmt.Errorf("failed to load CNI config list file %s: %v", confFile, err) + } + if confList.Name == name { + return confList.Bytes, nil + } + } else { + conf, err := cnilibrary.ConfFromFile(confFile) + if err != nil { + return nil, fmt.Errorf("failed to load CNI config file %s: %v", confFile, err) + } + if conf.Network.Name == name { + return conf.Bytes, nil + } + } + } + + return nil, fmt.Errorf("CNI network config not found for name %q", name) +} + +// Teardown calls the CNI plugins with the delete action +func (c *cniNetworkConfigurator) Teardown(ctx context.Context, alloc *structs.Allocation, spec *drivers.NetworkIsolationSpec) error { + if err := c.ensureCNIInitialized(); err != nil { + return err + } + + return c.cni.Remove(ctx, alloc.ID, spec.Path, cni.WithCapabilityPortMap(getPortMapping(alloc))) +} + +func (c *cniNetworkConfigurator) ensureCNIInitialized() error { + if err := c.cni.Status(); cni.IsCNINotInitialized(err) { + return c.cni.Load(cni.WithConfListBytes(c.cniConf)) + } else { + return err + } +} + +// getPortMapping builds a list of portMapping structs that are used as the +// portmapping capability arguments for the portmap CNI plugin +func getPortMapping(alloc *structs.Allocation) []cni.PortMapping { + ports := []cni.PortMapping{} + for _, network := range alloc.AllocatedResources.Shared.Networks { + for _, port := range append(network.DynamicPorts, network.ReservedPorts...) { + if port.To < 1 { + continue + } + for _, proto := range []string{"tcp", "udp"} { + ports = append(ports, cni.PortMapping{ + HostPort: int32(port.Value), + ContainerPort: int32(port.To), + Protocol: proto, + }) + } + } + } + return ports +} diff --git a/client/client.go b/client/client.go index 7e925495589..b5bd9cdcbb9 100644 --- a/client/client.go +++ b/client/client.go @@ -1436,7 +1436,6 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp // if we still have node changes, merge them if response.Resources != nil { response.Resources.Networks = updateNetworks( - c.config.Node.Resources.Networks, response.Resources.Networks, c.config) if !c.config.Node.Resources.Equals(response.Resources) { @@ -1449,7 +1448,6 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp // if we still have node changes, merge them if response.NodeResources != nil { response.NodeResources.Networks = updateNetworks( - c.config.Node.NodeResources.Networks, response.NodeResources.Networks, c.config) if !c.config.Node.NodeResources.Equals(response.NodeResources) { @@ -1465,33 +1463,40 @@ func (c *Client) updateNodeFromFingerprint(response *fingerprint.FingerprintResp return c.configCopy.Node } -// updateNetworks preserves manually configured network options, but -// applies fingerprint updates -func updateNetworks(ns structs.Networks, up structs.Networks, c *config.Config) structs.Networks { - if c.NetworkInterface == "" { - ns = up - } else { - // If a network device is configured, filter up to contain details for only +// updateNetworks filters and overrides network speed of host networks based +// on configured settings +func updateNetworks(up structs.Networks, c *config.Config) structs.Networks { + if up == nil { + return nil + } + + if c.NetworkInterface != "" { + // For host networks, if a network device is configured filter up to contain details for only // that device upd := []*structs.NetworkResource{} for _, n := range up { - if c.NetworkInterface == n.Device { + switch n.Mode { + case "host": + if c.NetworkInterface == n.Device { + upd = append(upd, n) + } + default: upd = append(upd, n) + } } - // If updates, use them. Otherwise, ns contains the configured interfaces - if len(upd) > 0 { - ns = upd - } + up = upd } - // ns is set, apply the config NetworkSpeed to all + // if set, apply the config NetworkSpeed to networks in host mode if c.NetworkSpeed != 0 { - for _, n := range ns { - n.MBits = c.NetworkSpeed + for _, n := range up { + if n.Mode == "host" { + n.MBits = c.NetworkSpeed + } } } - return ns + return up } // retryIntv calculates a retry interval value given the base diff --git a/client/client_test.go b/client/client_test.go index 95bf51069ad..1afb34422fc 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1184,17 +1184,16 @@ func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) { client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ NodeResources: &structs.NodeResources{ Cpu: structs.NodeCpuResources{CpuShares: 123}, - Networks: []*structs.NetworkResource{{Device: "any-interface"}}, + Networks: []*structs.NetworkResource{{Mode: "host", Device: "any-interface"}}, }, Resources: &structs.Resources{ - CPU: 80, - Networks: []*structs.NetworkResource{{Device: "any-interface"}}, + CPU: 80, }, }) - assert.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares) - assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[0].Device) - assert.Equal(t, 80, client.config.Node.Resources.CPU) - assert.Equal(t, "any-interface", client.config.Node.Resources.Networks[0].Device) + idx := len(client.config.Node.NodeResources.Networks) - 1 + require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares) + require.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[idx].Device) + require.Equal(t, 80, client.config.Node.Resources.CPU) // lookup an interface. client.Node starts with a hardcoded value, eth0, // and is only updated async through fingerprinter. @@ -1210,48 +1209,43 @@ func TestClient_UpdateNodeFromFingerprintKeepsConfig(t *testing.T) { client, cleanup = TestClient(t, func(c *config.Config) { c.NetworkInterface = dev c.Node.Name = name + c.Options["fingerprint.blacklist"] = "network" // Node is already a mock.Node, with a device c.Node.NodeResources.Networks[0].Device = dev - c.Node.Resources.Networks = c.Node.NodeResources.Networks }) defer cleanup() client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ NodeResources: &structs.NodeResources{ Cpu: structs.NodeCpuResources{CpuShares: 123}, Networks: []*structs.NetworkResource{ - {Device: "any-interface", MBits: 20}, - {Device: dev, MBits: 20}, + {Mode: "host", Device: "any-interface", MBits: 20}, }, }, - Resources: &structs.Resources{ - CPU: 80, - Networks: []*structs.NetworkResource{{Device: "any-interface"}}, - }, }) - assert.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares) + require.Equal(t, int64(123), client.config.Node.NodeResources.Cpu.CpuShares) // only the configured device is kept - assert.Equal(t, 1, len(client.config.Node.NodeResources.Networks)) - assert.Equal(t, dev, client.config.Node.NodeResources.Networks[0].Device) - // network speed updates to the configured network are kept - assert.Equal(t, 20, client.config.Node.NodeResources.Networks[0].MBits) - assert.Equal(t, 80, client.config.Node.Resources.CPU) - assert.Equal(t, dev, client.config.Node.Resources.Networks[0].Device) + require.Equal(t, 2, len(client.config.Node.NodeResources.Networks)) + require.Equal(t, dev, client.config.Node.NodeResources.Networks[0].Device) + require.Equal(t, "bridge", client.config.Node.NodeResources.Networks[1].Mode) // Network speed is applied to all NetworkResources client.config.NetworkInterface = "" client.config.NetworkSpeed = 100 client.updateNodeFromFingerprint(&fingerprint.FingerprintResponse{ NodeResources: &structs.NodeResources{ - Cpu: structs.NodeCpuResources{CpuShares: 123}, - Networks: []*structs.NetworkResource{{Device: "any-interface", MBits: 20}}, + Cpu: structs.NodeCpuResources{CpuShares: 123}, + Networks: []*structs.NetworkResource{ + {Mode: "host", Device: "any-interface", MBits: 20}, + }, }, Resources: &structs.Resources{ - CPU: 80, - Networks: []*structs.NetworkResource{{Device: "any-interface"}}, + CPU: 80, }, }) - assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[0].Device) - assert.Equal(t, 100, client.config.Node.NodeResources.Networks[0].MBits) + assert.Equal(t, 3, len(client.config.Node.NodeResources.Networks)) + assert.Equal(t, "any-interface", client.config.Node.NodeResources.Networks[2].Device) + assert.Equal(t, 100, client.config.Node.NodeResources.Networks[2].MBits) + assert.Equal(t, 0, client.config.Node.NodeResources.Networks[1].MBits) } // Support multiple IP addresses (ipv4 vs. 6, e.g.) on the configured network interface @@ -1269,7 +1263,7 @@ func Test_UpdateNodeFromFingerprintMultiIP(t *testing.T) { // Client without network configured updates to match fingerprint client, cleanup := TestClient(t, func(c *config.Config) { c.NetworkInterface = dev - c.Node.NodeResources.Networks[0].Device = dev + c.Options["fingerprint.blacklist"] = "network,cni,bridge" c.Node.Resources.Networks = c.Node.NodeResources.Networks }) defer cleanup() @@ -1284,12 +1278,13 @@ func Test_UpdateNodeFromFingerprintMultiIP(t *testing.T) { }, }) - two := structs.Networks{ + nets := structs.Networks{ + mock.Node().NodeResources.Networks[0], {Device: dev, IP: "127.0.0.1"}, {Device: dev, IP: "::1"}, } - require.Equal(t, two, client.config.Node.NodeResources.Networks) + require.Equal(t, nets, client.config.Node.NodeResources.Networks) } func TestClient_computeAllocatedDeviceStats(t *testing.T) { @@ -1480,6 +1475,9 @@ func TestClient_getAllocatedResources(t *testing.T) { result := client.getAllocatedResources(client.config.Node) + // Ignore comparing networks for now + result.Flattened.Networks = nil + expected := structs.ComparableResources{ Flattened: structs.AllocatedTaskResources{ Cpu: structs.AllocatedCpuResources{ diff --git a/client/config/config.go b/client/config/config.go index d1de0d2eaa8..a6df2fac2d1 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -234,6 +234,15 @@ type Config struct { // be specified with colon delimited CNIPath string + // CNIConfigDir is the directory where CNI network configuration is located. The + // client will use this path when fingerprinting CNI networks. + CNIConfigDir string + + // CNIInterfacePrefix is the prefix to use when creating CNI network interfaces. This + // defaults to 'eth', therefore the first interface created by CNI inside the alloc + // network will be 'eth0'. + CNIInterfacePrefix string + // BridgeNetworkName is the name to use for the bridge created in bridge // networking mode. This defaults to 'nomad' if not set BridgeNetworkName string @@ -301,6 +310,9 @@ func DefaultConfig() *Config { }, BackwardsCompatibleMetrics: false, RPCHoldTimeout: 5 * time.Second, + CNIPath: "/opt/cni/bin", + CNIConfigDir: "/opt/cni/config", + CNIInterfacePrefix: "eth", } } diff --git a/client/fingerprint/bridge.go b/client/fingerprint/bridge.go new file mode 100644 index 00000000000..163e8cbea19 --- /dev/null +++ b/client/fingerprint/bridge.go @@ -0,0 +1,12 @@ +package fingerprint + +import log "github.com/hashicorp/go-hclog" + +type BridgeFingerprint struct { + logger log.Logger + StaticFingerprinter +} + +func NewBridgeFingerprint(logger log.Logger) Fingerprint { + return &BridgeFingerprint{logger: logger} +} diff --git a/client/fingerprint/bridge_default.go b/client/fingerprint/bridge_default.go new file mode 100644 index 00000000000..bf544522730 --- /dev/null +++ b/client/fingerprint/bridge_default.go @@ -0,0 +1,5 @@ +// +build !linux + +package fingerprint + +func (f *BridgeFingerprint) Fingerprint(*FingerprintRequest, *FingerprintResponse) error { return nil } diff --git a/client/fingerprint/bridge_linux.go b/client/fingerprint/bridge_linux.go new file mode 100644 index 00000000000..231d669ecfe --- /dev/null +++ b/client/fingerprint/bridge_linux.go @@ -0,0 +1,49 @@ +package fingerprint + +import ( + "bufio" + "fmt" + "os" + "regexp" + + "github.com/hashicorp/nomad/nomad/structs" +) + +const bridgeKernelModuleName = "bridge" + +func (f *BridgeFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error { + if err := f.checkKMod(bridgeKernelModuleName); err != nil { + f.logger.Warn("failed to detect bridge kernel module, bridge network mode disabled", "error", err) + return nil + } + + resp.NodeResources = &structs.NodeResources{ + Networks: []*structs.NetworkResource{ + { + Mode: "bridge", + }, + }, + } + resp.Detected = true + return nil +} + +func (f *BridgeFingerprint) checkKMod(mod string) error { + file, err := os.Open("/proc/modules") + if err != nil { + return fmt.Errorf("could not read /proc/modules: %v", err) + } + defer file.Close() + + scanner := bufio.NewScanner(file) + pattern := fmt.Sprintf("%s\\s+.*$", mod) + for scanner.Scan() { + if matched, err := regexp.MatchString(pattern, scanner.Text()); matched { + return nil + } else if err != nil { + return fmt.Errorf("could not parse /proc/modules: %v", err) + } + } + + return fmt.Errorf("could not detect kernel module %s", mod) +} diff --git a/client/fingerprint/bridge_linux_test.go b/client/fingerprint/bridge_linux_test.go new file mode 100644 index 00000000000..4e55c92dcee --- /dev/null +++ b/client/fingerprint/bridge_linux_test.go @@ -0,0 +1,14 @@ +package fingerprint + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBridgeFingerprint_checkKMod(t *testing.T) { + require := require.New(t) + f := &BridgeFingerprint{} + require.NoError(f.checkKMod("ip_tables")) + require.Error(f.checkKMod("nonexistentmodule")) +} diff --git a/client/fingerprint/cni.go b/client/fingerprint/cni.go new file mode 100644 index 00000000000..9080733db59 --- /dev/null +++ b/client/fingerprint/cni.go @@ -0,0 +1,77 @@ +package fingerprint + +import ( + "fmt" + "os" + "strings" + + "github.com/containernetworking/cni/libcni" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/nomad/structs" +) + +type CNIFingerprint struct { + StaticFingerprinter + logger log.Logger +} + +func NewCNIFingerprint(logger log.Logger) Fingerprint { + return &CNIFingerprint{logger: logger} +} + +func (f *CNIFingerprint) Fingerprint(req *FingerprintRequest, resp *FingerprintResponse) error { + confDir := req.Config.CNIConfigDir + networks := map[string]struct{}{} + if _, err := os.Stat(confDir); os.IsNotExist(err) { + f.logger.Debug("CNI config dir is not set or does not exist, skipping", "cni_config_dir", confDir) + resp.Detected = false + return nil + } + + files, err := libcni.ConfFiles(confDir, []string{".conf", ".conflist", ".json"}) + if err != nil { + return fmt.Errorf("failed to detect CNI conf files: %v", err) + } + + for _, confFile := range files { + if strings.HasSuffix(confFile, ".conflist") { + confList, err := libcni.ConfListFromFile(confFile) + if err != nil { + return fmt.Errorf("failed to load CNI config list file %s: %v", confFile, err) + } + if _, ok := networks[confList.Name]; ok { + f.logger.Warn("duplicate CNI config names found, ignoring file", "name", confList.Name, "file", confFile) + continue + } + networks[confList.Name] = struct{}{} + } else { + conf, err := libcni.ConfFromFile(confFile) + if err != nil { + return fmt.Errorf("failed to load CNI config file %s: %v", confFile, err) + } + if _, ok := networks[conf.Network.Name]; ok { + f.logger.Warn("duplicate CNI config names found, ignoring file", "name", conf.Network.Name, "file", confFile) + continue + } + networks[conf.Network.Name] = struct{}{} + } + } + + var nodeNetworks structs.Networks + + for name := range networks { + nodeNetworks = append(nodeNetworks, &structs.NetworkResource{ + Mode: fmt.Sprintf("cni/%s", name), + }) + f.logger.Debug("detected CNI network", "name", name) + } + + resp.NodeResources = &structs.NodeResources{ + Networks: nodeNetworks, + } + + resp.Detected = true + return nil +} + +func (f *CNIFingerprint) Reload() {} diff --git a/client/fingerprint/cni_test.go b/client/fingerprint/cni_test.go new file mode 100644 index 00000000000..3fd125b7d4f --- /dev/null +++ b/client/fingerprint/cni_test.go @@ -0,0 +1,85 @@ +package fingerprint + +import ( + "testing" + + "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +// Test that CNI fingerprinter is reloadable +var _ ReloadableFingerprint = &CNIFingerprint{} + +func TestCNIFingerprint(t *testing.T) { + cases := []struct { + name string + req *FingerprintRequest + exp *FingerprintResponse + err bool + errMatch string + }{ + { + name: "cni config dir not set", + req: &FingerprintRequest{ + Config: &config.Config{}, + }, + exp: &FingerprintResponse{ + Detected: false, + }, + }, + { + name: "cni config dir non-existent", + req: &FingerprintRequest{ + Config: &config.Config{ + CNIConfigDir: "text_fixtures/cni_nonexistent", + }, + }, + exp: &FingerprintResponse{ + Detected: false, + }, + }, + { + name: "two networks, no errors", + req: &FingerprintRequest{ + Config: &config.Config{ + CNIConfigDir: "test_fixtures/cni", + }, + }, + exp: &FingerprintResponse{ + NodeResources: &structs.NodeResources{ + Networks: []*structs.NetworkResource{ + { + Mode: "cni/net1", + }, + { + Mode: "cni/net2", + }, + }, + }, + Detected: true, + }, + err: false, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + r := require.New(t) + fp := NewCNIFingerprint(testlog.HCLogger(t)) + resp := &FingerprintResponse{} + err := fp.Fingerprint(c.req, resp) + if c.err { + r.Error(err) + r.Contains(err.Error(), c.errMatch) + } else { + r.NoError(err) + r.Equal(c.exp.Detected, resp.Detected) + if resp.NodeResources != nil || c.exp.NodeResources != nil { + r.ElementsMatch(c.exp.NodeResources.Networks, resp.NodeResources.Networks) + } + } + }) + } +} diff --git a/client/fingerprint/env_aws.go b/client/fingerprint/env_aws.go index 0b2466fd3bb..87becff9e9a 100644 --- a/client/fingerprint/env_aws.go +++ b/client/fingerprint/env_aws.go @@ -485,6 +485,7 @@ func (f *EnvAWSFingerprint) Fingerprint(request *FingerprintRequest, response *F nodeResources = new(structs.NodeResources) nodeResources.Networks = []*structs.NetworkResource{ { + Mode: "host", Device: "eth0", IP: val, CIDR: val + "/32", diff --git a/client/fingerprint/fingerprint.go b/client/fingerprint/fingerprint.go index f7d0e2279b7..d896b48e224 100644 --- a/client/fingerprint/fingerprint.go +++ b/client/fingerprint/fingerprint.go @@ -31,6 +31,7 @@ var ( hostFingerprinters = map[string]Factory{ "arch": NewArchFingerprint, "consul": NewConsulFingerprint, + "cni": NewCNIFingerprint, "cpu": NewCPUFingerprint, "host": NewHostFingerprint, "memory": NewMemoryFingerprint, @@ -115,6 +116,13 @@ type Fingerprint interface { Periodic() (bool, time.Duration) } +// ReloadableFingerprint can be implemented if the fingerprinter needs to be run during client reload. +// If implemented, the client will call Reload during client reload then immediately Fingerprint +type ReloadableFingerprint interface { + Fingerprint + Reload() +} + // StaticFingerprinter can be embedded in a struct that has a Fingerprint method // to make it non-periodic. type StaticFingerprinter struct{} diff --git a/client/fingerprint/fingerprint_linux.go b/client/fingerprint/fingerprint_linux.go index f52669a7f18..10c9768b29f 100644 --- a/client/fingerprint/fingerprint_linux.go +++ b/client/fingerprint/fingerprint_linux.go @@ -2,4 +2,5 @@ package fingerprint func initPlatformFingerprints(fps map[string]Factory) { fps["cgroup"] = NewCGroupFingerprint + fps["bridge"] = NewBridgeFingerprint } diff --git a/client/fingerprint/network.go b/client/fingerprint/network.go index 9cc4fcc658f..73b8fa5098c 100644 --- a/client/fingerprint/network.go +++ b/client/fingerprint/network.go @@ -132,6 +132,7 @@ func (f *NetworkFingerprint) createNetworkResources(throughput int, intf *net.In for _, addr := range addrs { // Create a new network resource newNetwork := &structs.NetworkResource{ + Mode: "host", Device: intf.Name, MBits: throughput, } diff --git a/client/fingerprint/test_fixtures/cni/net1.conf b/client/fingerprint/test_fixtures/cni/net1.conf new file mode 100644 index 00000000000..ea5dddb63d8 --- /dev/null +++ b/client/fingerprint/test_fixtures/cni/net1.conf @@ -0,0 +1,17 @@ +{ + "cniVersion": "0.2.0", + "name": "net1", + "type": "bridge", + "bridge": "cni0", + "isGateway": true, + "ipMasq": true, + "ipam": { + "type": "host-local", + "subnet": "10.22.0.0/16", + "routes": [ + { + "dst": "0.0.0.0/0" + } + ] + } +} diff --git a/client/fingerprint/test_fixtures/cni/net2.conflist b/client/fingerprint/test_fixtures/cni/net2.conflist new file mode 100644 index 00000000000..cec6c67efa9 --- /dev/null +++ b/client/fingerprint/test_fixtures/cni/net2.conflist @@ -0,0 +1,25 @@ +{ + "cniVersion": "0.3.1", + "name": "net2", + "plugins": [ + { + "type": "ptp", + "ipMasq": true, + "ipam": { + "type": "host-local", + "subnet": "172.16.30.0/24", + "routes": [ + { + "dst": "0.0.0.0/0" + } + ] + } + }, + { + "type": "portmap", + "capabilities": { + "portMappings": true + } + } + ] +} diff --git a/client/fingerprint_manager.go b/client/fingerprint_manager.go index 84efed5bcb3..e8993526ffa 100644 --- a/client/fingerprint_manager.go +++ b/client/fingerprint_manager.go @@ -24,6 +24,8 @@ type FingerprintManager struct { // associated node updateNodeAttributes func(*fingerprint.FingerprintResponse) *structs.Node + reloadableFps map[string]fingerprint.ReloadableFingerprint + logger log.Logger } @@ -44,6 +46,7 @@ func NewFingerprintManager( node: node, shutdownCh: shutdownCh, logger: logger.Named("fingerprint_mgr"), + reloadableFps: make(map[string]fingerprint.ReloadableFingerprint), } } @@ -103,6 +106,17 @@ func (fp *FingerprintManager) Run() error { return nil } +// Reload will reload any registered ReloadableFingerprinters and immediately call Fingerprint +func (fm *FingerprintManager) Reload() { + for name, fp := range fm.reloadableFps { + fm.logger.Info("reloading fingerprinter", "fingerprinter", name) + fp.Reload() + if _, err := fm.fingerprint(name, fp); err != nil { + fm.logger.Warn("error fingerprinting after reload", "fingerprinter", name, "error", err) + } + } +} + // setupFingerprints is used to fingerprint the node to see if these attributes are // supported func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error { @@ -130,6 +144,10 @@ func (fm *FingerprintManager) setupFingerprinters(fingerprints []string) error { if p { go fm.runFingerprint(f, period, name) } + + if rfp, ok := f.(fingerprint.ReloadableFingerprint); ok { + fm.reloadableFps[name] = rfp + } } fm.logger.Debug("detected fingerprints", "node_attrs", appliedFingerprints) diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 23738503cfd..49c911bfdc0 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -65,6 +65,7 @@ func Node() *structs.Node { }, Networks: []*structs.NetworkResource{ { + Mode: "host", Device: "eth0", CIDR: "192.168.0.100/32", MBits: 1000, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8998c8f5932..b86f462b5e8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -12,6 +12,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash/crc32" "math" "net" "os" @@ -2236,53 +2237,23 @@ type NetworkResource struct { DynamicPorts []Port // Host Dynamically assigned ports } -func (nr *NetworkResource) Equals(other *NetworkResource) bool { - if nr.Mode != other.Mode { - return false - } - - if nr.Device != other.Device { - return false - } - - if nr.CIDR != other.CIDR { - return false - } - - if nr.IP != other.IP { - return false - } - - if nr.MBits != other.MBits { - return false - } - - if len(nr.ReservedPorts) != len(other.ReservedPorts) { - return false - } +func (nr *NetworkResource) Hash() uint32 { + var data []byte + data = append(data, []byte(fmt.Sprintf("%s%s%s%s%d", nr.Mode, nr.Device, nr.CIDR, nr.IP, nr.MBits))...) for i, port := range nr.ReservedPorts { - if len(other.ReservedPorts) <= i { - return false - } - if port != other.ReservedPorts[i] { - return false - } + data = append(data, []byte(fmt.Sprintf("r%d%s%d%d", i, port.Label, port.Value, port.To))...) } - if len(nr.DynamicPorts) != len(other.DynamicPorts) { - return false - } for i, port := range nr.DynamicPorts { - if len(other.DynamicPorts) <= i { - return false - } - if port != other.DynamicPorts[i] { - return false - } + data = append(data, []byte(fmt.Sprintf("d%d%s%d%d", i, port.Label, port.Value, port.To))...) } - return true + return crc32.ChecksumIEEE(data) +} + +func (nr *NetworkResource) Equals(other *NetworkResource) bool { + return nr.Hash() == other.Hash() } func (n *NetworkResource) Canonicalize() { @@ -2584,7 +2555,7 @@ func (n *NodeResources) Merge(o *NodeResources) { n.Disk.Merge(&o.Disk) if len(o.Networks) != 0 { - n.Networks = o.Networks + n.Networks = append(n.Networks, o.Networks...) } if len(o.Devices) != 0 { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 98cb49c1a4d..98187c78874 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5239,3 +5239,47 @@ func TestNodeReservedNetworkResources_ParseReserved(t *testing.T) { require.Equal(out, tc.Parsed) } } + +func TestNodeResources_Merge(t *testing.T) { + res := &NodeResources{ + Cpu: NodeCpuResources{ + CpuShares: int64(32000), + }, + Memory: NodeMemoryResources{ + MemoryMB: int64(64000), + }, + Networks: Networks{ + { + Device: "foo", + }, + }, + } + + res.Merge(&NodeResources{ + Memory: NodeMemoryResources{ + MemoryMB: int64(100000), + }, + Networks: Networks{ + { + Mode: "foo/bar", + }, + }, + }) + + require.Exactly(t, &NodeResources{ + Cpu: NodeCpuResources{ + CpuShares: int64(32000), + }, + Memory: NodeMemoryResources{ + MemoryMB: int64(100000), + }, + Networks: Networks{ + { + Device: "foo", + }, + { + Mode: "foo/bar", + }, + }, + }, res) +} diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 848764443fc..edcaae1255d 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -314,6 +314,44 @@ func (c *CSIVolumeChecker) hasPlugins(n *structs.Node) (bool, string) { return true, "" } +// NetworkChecker is a FeasibilityChecker which returns whether a node has the +// network resources necessary to schedule the task group +type NetworkChecker struct { + ctx Context + networkMode string +} + +func NewNetworkChecker(ctx Context) *NetworkChecker { + return &NetworkChecker{ctx: ctx, networkMode: "host"} +} + +func (c *NetworkChecker) SetNetworkMode(netMode string) { + c.networkMode = netMode +} + +func (c *NetworkChecker) Feasible(option *structs.Node) bool { + if c.hasNetwork(option) { + return true + } + + c.ctx.Metrics().FilterNode(option, "missing network") + return false +} + +func (c *NetworkChecker) hasNetwork(option *structs.Node) bool { + if option.NodeResources == nil { + return false + } + + for _, nw := range option.NodeResources.Networks { + if nw.Mode == c.networkMode { + return true + } + } + + return false +} + // DriverChecker is a FeasibilityChecker which returns whether a node has the // drivers necessary to scheduler a task group. type DriverChecker struct { diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 18e50c6ee4e..725a90bc121 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -397,6 +397,48 @@ func TestCSIVolumeChecker(t *testing.T) { } } +func TestNetworkChecker(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + } + nodes[0].NodeResources.Networks = append(nodes[0].NodeResources.Networks, &structs.NetworkResource{Mode: "bridge"}) + nodes[1].NodeResources.Networks = append(nodes[1].NodeResources.Networks, &structs.NetworkResource{Mode: "bridge"}) + nodes[2].NodeResources.Networks = append(nodes[2].NodeResources.Networks, &structs.NetworkResource{Mode: "cni/mynet"}) + + checker := NewNetworkChecker(ctx) + cases := []struct { + mode string + results []bool + }{ + { + mode: "host", + results: []bool{true, true, true}, + }, + { + mode: "bridge", + results: []bool{true, true, false}, + }, + { + mode: "cni/mynet", + results: []bool{false, false, true}, + }, + { + mode: "cni/nonexistent", + results: []bool{false, false, false}, + }, + } + + for _, c := range cases { + checker.SetNetworkMode(c.mode) + for i, node := range nodes { + require.Equal(t, c.results[i], checker.Feasible(node), "mode=%q, idx=%d", c.mode, i) + } + } +} + func TestDriverChecker_DriverInfo(t *testing.T) { _, ctx := testContext(t) nodes := []*structs.Node{ diff --git a/scheduler/stack.go b/scheduler/stack.go index b381e4c5871..d1a94bcc334 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -52,6 +52,7 @@ type GenericStack struct { taskGroupDevices *DeviceChecker taskGroupHostVolumes *HostVolumeChecker taskGroupCSIVolumes *CSIVolumeChecker + taskGroupNetwork *NetworkChecker distinctHostsConstraint *DistinctHostsIterator distinctPropertyConstraint *DistinctPropertyIterator @@ -135,6 +136,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra s.taskGroupDevices.SetTaskGroup(tg) s.taskGroupHostVolumes.SetVolumes(tg.Volumes) s.taskGroupCSIVolumes.SetVolumes(tg.Volumes) + if len(tg.Networks) > 0 { + s.taskGroupNetwork.SetNetworkMode(tg.Networks[0].Mode) + } s.distinctHostsConstraint.SetTaskGroup(tg) s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) @@ -332,6 +336,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // Filter on available, healthy CSI plugins s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx) + // Filter on available client networks + s.taskGroupNetwork = NewNetworkChecker(ctx) + // Create the feasibility wrapper which wraps all feasibility checks in // which feasibility checking can be skipped if the computed node class has // previously been marked as eligible or ineligible. Generally this will be @@ -340,7 +347,8 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, - s.taskGroupDevices} + s.taskGroupDevices, + s.taskGroupNetwork} avail := []FeasibilityChecker{s.taskGroupCSIVolumes} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)