Skip to content

Commit

Permalink
Merge 8edfa59 into 7eec130
Browse files Browse the repository at this point in the history
  • Loading branch information
wenyingd authored Feb 10, 2022
2 parents 7eec130 + 8edfa59 commit b25ba04
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 44 deletions.
21 changes: 10 additions & 11 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,14 +305,6 @@ func run(o *Options) error {
}

var egressController *egress.EgressController
var nodeTransportIP net.IP
if nodeConfig.NodeTransportIPv4Addr != nil {
nodeTransportIP = nodeConfig.NodeTransportIPv4Addr.IP
} else if nodeConfig.NodeTransportIPv6Addr != nil {
nodeTransportIP = nodeConfig.NodeTransportIPv6Addr.IP
} else {
return fmt.Errorf("invalid Node Transport IPAddr in Node config: %v", nodeConfig)
}

var externalIPPoolController *externalippool.ExternalIPPoolController
var externalIPController *serviceexternalip.ServiceExternalIPController
Expand All @@ -324,7 +316,14 @@ func run(o *Options) error {
crdClient, externalIPPoolInformer,
)
localIPDetector = ipassigner.NewLocalIPDetector()

var nodeTransportIP net.IP
if nodeConfig.NodeTransportIPv4Addr != nil {
nodeTransportIP = nodeConfig.NodeTransportIPv4Addr.IP
} else if nodeConfig.NodeTransportIPv6Addr != nil {
nodeTransportIP = nodeConfig.NodeTransportIPv6Addr.IP
} else {
return fmt.Errorf("invalid Node Transport IPAddr in Node config: %v", nodeConfig)
}
memberlistCluster, err = memberlist.NewCluster(nodeTransportIP, o.config.ClusterMembershipPort,
nodeConfig.Name, nodeInformer, externalIPPoolInformer,
)
Expand All @@ -334,7 +333,7 @@ func run(o *Options) error {
}
if features.DefaultFeatureGate.Enabled(features.Egress) {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeTransportIP,
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer, localIPDetector,
)
if err != nil {
Expand All @@ -344,7 +343,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) {
externalIPController, err = serviceexternalip.NewServiceExternalIPController(
nodeConfig.Name,
nodeTransportIP,
nodeConfig.NodeTransportInterfaceName,
k8sClient,
memberlistCluster,
serviceInformer,
Expand Down
11 changes: 10 additions & 1 deletion pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/containernetworking/plugins/pkg/ip"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
Expand All @@ -47,6 +48,7 @@ import (
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/ovs/ovsctl"
"antrea.io/antrea/pkg/util/env"
utilip "antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -796,7 +798,7 @@ func (i *Initializer) initNodeLocalConfig() error {
if err != nil {
return fmt.Errorf("failed to obtain local IP addresses from K8s: %w", err)
}
nodeIPv4Addr, nodeIPv6Addr, nodeInterface, err = getIPNetDeviceFromIP(ipAddrs)
nodeIPv4Addr, nodeIPv6Addr, nodeInterface, err = i.getNodeInterfaceFromIP(ipAddrs)
if err != nil {
return fmt.Errorf("failed to get local IPNet device with IP %v: %v", ipAddrs, err)
}
Expand Down Expand Up @@ -1117,3 +1119,10 @@ func (i *Initializer) patchNodeAnnotations(nodeName, key string, value interface
}
return nil
}

// getNodeInterfaceFromIP returns the IPv4/IPv6 configuration, and the associated interface according the give nodeIPs.
// When searching the Node interface, antrea-gw0 is ignored because it is configured with the same address as Node IP
// with NetworkPolicyOnly mode on public cloud setup, e.g., AKS.
func (i *Initializer) getNodeInterfaceFromIP(nodeIPs *utilip.DualStackIPs) (v4IPNet *net.IPNet, v6IPNet *net.IPNet, iface *net.Interface, err error) {
return getIPNetDeviceFromIP(nodeIPs, sets.NewString(i.hostGateway))
}
2 changes: 1 addition & 1 deletion pkg/agent/agent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (i *Initializer) prepareOVSBridge() error {
klog.Infof("Preparing OVS bridge for AntreaFlexibleIPAM")
// Get uplink network configuration.
// TODO(gran): support IPv6
_, _, adapter, err := util.GetIPNetDeviceFromIP(&utilip.DualStackIPs{IPv4: i.nodeConfig.NodeIPv4Addr.IP})
_, _, adapter, err := i.getNodeInterfaceFromIP(&utilip.DualStackIPs{IPv4: i.nodeConfig.NodeIPv4Addr.IP})
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes/fake"

"antrea.io/antrea/pkg/agent/cniserver"
Expand Down Expand Up @@ -422,7 +423,7 @@ func TestInitNodeLocalConfig(t *testing.T) {

func mockGetIPNetDeviceFromIP(ipNet *net.IPNet, ipDevice *net.Interface) func() {
prevGetIPNetDeviceFromIP := getIPNetDeviceFromIP
getIPNetDeviceFromIP = func(localIP *ip.DualStackIPs) (*net.IPNet, *net.IPNet, *net.Interface, error) {
getIPNetDeviceFromIP = func(localIP *ip.DualStackIPs, ignoredHostInterfaces sets.String) (*net.IPNet, *net.IPNet, *net.Interface, error) {
return ipNet, nil, ipDevice, nil
}
return func() { getIPNetDeviceFromIP = prevGetIPNetDeviceFromIP }
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (i *Initializer) prepareHostNetwork() error {
// Get uplink network configuration. The uplink interface is the one used for transporting Pod traffic across Nodes.
// Use the interface specified with "transportInterface" in the configuration if configured, otherwise the interface
// configured with NodeIP is used as uplink.
_, _, adapter, err := util.GetIPNetDeviceFromIP(&ip.DualStackIPs{IPv4: i.nodeConfig.NodeTransportIPv4Addr.IP})
_, _, adapter, err := i.getNodeInterfaceFromIP(&ip.DualStackIPs{IPv4: i.nodeConfig.NodeTransportIPv4Addr.IP})
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func NewEgressController(
ifaceStore interfacestore.InterfaceStore,
routeClient route.Interface,
nodeName string,
nodeTransportIP net.IP,
nodeTransportInterface string,
cluster *memberlist.Cluster,
egressInformer crdinformers.EgressInformer,
nodeInformer coreinformers.NodeInformer,
Expand All @@ -176,7 +176,7 @@ func NewEgressController(
idAllocator: newIDAllocator(minEgressMark, maxEgressMark),
cluster: cluster,
}
ipAssigner, err := ipassigner.NewIPAssigner(nodeTransportIP, egressDummyDevice)
ipAssigner, err := ipassigner.NewIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
return nil, fmt.Errorf("initializing egressIP assigner failed: %v", err)
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/agent/controller/serviceexternalip/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package serviceexternalip
import (
"context"
"fmt"
"net"
"sync"
"time"

Expand Down Expand Up @@ -86,7 +85,7 @@ type ServiceExternalIPController struct {

func NewServiceExternalIPController(
nodeName string,
nodeTransportIP net.IP,
nodeTransportInterface string,
client kubernetes.Interface,
cluster memberlist.Interface,
serviceInformer coreinformers.ServiceInformer,
Expand All @@ -107,7 +106,7 @@ func NewServiceExternalIPController(
externalIPStates: make(map[apimachinerytypes.NamespacedName]externalIPState),
localIPDetector: localIPDetector,
}
ipAssigner, err := ipassigner.NewIPAssigner(nodeTransportIP, ingressDummyDevice)
ipAssigner, err := ipassigner.NewIPAssigner(nodeTransportInterface, ingressDummyDevice)
if err != nil {
return nil, fmt.Errorf("initializing service external IP assigner failed: %v", err)
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/agent/ipassigner/ip_assigner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/util/arping"
"antrea.io/antrea/pkg/agent/util/ndp"
"antrea.io/antrea/pkg/util/ip"
)

// ipAssigner creates a dummy device and assigns IPs to it.
Expand All @@ -47,16 +46,10 @@ type ipAssigner struct {
}

// NewIPAssigner returns an *ipAssigner.
func NewIPAssigner(nodeTransportIPAddr net.IP, dummyDeviceName string) (*ipAssigner, error) {
nodeTransportIPs := new(ip.DualStackIPs)
if nodeTransportIPAddr.To4() == nil {
nodeTransportIPs.IPv6 = nodeTransportIPAddr
} else {
nodeTransportIPs.IPv4 = nodeTransportIPAddr
}
_, _, externalInterface, err := util.GetIPNetDeviceFromIP(nodeTransportIPs)
func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (*ipAssigner, error) {
_, _, externalInterface, err := util.GetIPNetDeviceByName(nodeTransportInterface)
if err != nil {
return nil, fmt.Errorf("get IPNetDevice from ip %v error: %+v", nodeTransportIPAddr, err)
return nil, fmt.Errorf("get IPNetDevice from name %s error: %+v", nodeTransportInterface, err)
}

dummyDevice, err := ensureDummyDevice(dummyDeviceName)
Expand Down
4 changes: 1 addition & 3 deletions pkg/agent/ipassigner/ip_assigner_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
package ipassigner

import (
"net"

"k8s.io/apimachinery/pkg/util/sets"
)

type ipAssigner struct {
}

func NewIPAssigner(nodeTransportIPAddr net.IP, dummyDeviceName string) (*ipAssigner, error) {
func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (*ipAssigner, error) {
return nil, nil
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/agent/util/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ func dialUnix(address string) (net.Conn, error) {
return net.Dial("unix", address)
}

// GetIPNetDeviceFromIP returns local IPs/masks and associated device from IP.
func GetIPNetDeviceFromIP(localIPs *ip.DualStackIPs) (v4IPNet *net.IPNet, v6IPNet *net.IPNet, iface *net.Interface, err error) {
// GetIPNetDeviceFromIP returns local IPs/masks and associated device from IP, and ignores the interfaces which have
// names in the ignoredInterfaces.
func GetIPNetDeviceFromIP(localIPs *ip.DualStackIPs, ignoredInterfaces sets.String) (v4IPNet *net.IPNet, v6IPNet *net.IPNet, iface *net.Interface, err error) {
linkList, err := net.Interfaces()
if err != nil {
return nil, nil, nil, err
Expand All @@ -129,19 +130,23 @@ func GetIPNetDeviceFromIP(localIPs *ip.DualStackIPs) (v4IPNet *net.IPNet, v6IPNe
return nil
}
for i := range linkList {
addrList, err := linkList[i].Addrs()
link := linkList[i]
if ignoredInterfaces.Has(link.Name) {
continue
}
addrList, err := link.Addrs()
if err != nil {
continue
}
for _, addr := range addrList {
if ipNet, ok := addr.(*net.IPNet); ok {
if ipNet.IP.Equal(localIPs.IPv4) {
if err := saveIface(&linkList[i]); err != nil {
if err := saveIface(&link); err != nil {
return nil, nil, nil, err
}
v4IPNet = ipNet
} else if ipNet.IP.Equal(localIPs.IPv6) {
if err := saveIface(&linkList[i]); err != nil {
if err := saveIface(&link); err != nil {
return nil, nil, nil, err
}
v6IPNet = ipNet
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/util/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"strings"
"testing"

"k8s.io/apimachinery/pkg/util/sets"

"antrea.io/antrea/pkg/util/ip"
)

Expand Down Expand Up @@ -58,7 +60,7 @@ func TestGetDefaultLocalNodeAddr(t *testing.T) {
localAddr := conn.LocalAddr().(*net.UDPAddr).IP

nodeIPs := &ip.DualStackIPs{IPv4: localAddr}
_, _, dev, err := GetIPNetDeviceFromIP(nodeIPs)
_, _, dev, err := GetIPNetDeviceFromIP(nodeIPs, sets.NewString())
if err != nil {
t.Error(err)
}
Expand Down
8 changes: 4 additions & 4 deletions test/integration/agent/ip_assigner_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
const dummyDeviceName = "antrea-dummy0"

func TestIPAssigner(t *testing.T) {
nodeIPAddr := nodeIPv4.IP
require.NotNil(t, nodeIPAddr, "Get Node IP failed")
nodeLinkName := nodeIntf.Name
require.NotNil(t, nodeLinkName, "Get Node link failed")

ipAssigner, err := ipassigner.NewIPAssigner(nodeIPAddr, dummyDeviceName)
ipAssigner, err := ipassigner.NewIPAssigner(nodeLinkName, dummyDeviceName)
require.NoError(t, err, "Initializing IP assigner failed")

dummyDevice, err := netlink.LinkByName(dummyDeviceName)
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestIPAssigner(t *testing.T) {
assert.Equal(t, desiredIPs, actualIPs, "Actual IPs don't match")

// NewIPAssigner should load existing IPs correctly.
newIPAssigner, err := ipassigner.NewIPAssigner(nodeIPAddr, dummyDeviceName)
newIPAssigner, err := ipassigner.NewIPAssigner(nodeLinkName, dummyDeviceName)
require.NoError(t, err, "Initializing new IP assigner failed")
assert.Equal(t, desiredIPs, newIPAssigner.AssignedIPs(), "Assigned IPs don't match")

Expand Down
3 changes: 2 additions & 1 deletion test/integration/agent/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/vishvananda/netlink"
"golang.org/x/net/nettest"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"

"antrea.io/antrea/pkg/agent/config"
Expand All @@ -56,7 +57,7 @@ var (
conn, _ := net.Dial("udp", "8.8.8.8:80")
defer conn.Close()
return &utilip.DualStackIPs{IPv4: conn.LocalAddr().(*net.UDPAddr).IP}
}())
}(), sets.NewString())
nodeLink, _ = netlink.LinkByName(nodeIntf.Name)
localPeerIP = ip.NextIP(nodeIPv4.IP)
remotePeerIP = net.ParseIP("50.50.50.1")
Expand Down

0 comments on commit b25ba04

Please sign in to comment.