Skip to content

Commit

Permalink
Add multicast support.
Browse files Browse the repository at this point in the history
Enable IGMP Snoop (when supported) and IGMP relay to allow multicast
connectivity across nodes.

Enforce the following network policies for IP multicast traffic:
- a default deny-all network policy is applied to all IP multicast
  traffic. This is implemented with two ACLs:
  a) one ACL dropping egress multicast traffic from all pods:
     this is to protect OVN controller from processing IP multicast
     reports from nodes that are not allowed to receive multicast
     traffic.
  b) one ACL dropping ingress multicast traffic to all pods.
- when multicast is explicitly enabled in the namespace, IP multicast
  traffic is forwarded only to pods in the same namespace. This is done
  by adding:
  a) a port group containing all logical ports associated with the
     namespace.
  b) one "from-lport" ACL allowing egress multicast traffic from the
     in the namespace.
  c) one "to-lport" ACL allowing ingress multicast traffic to pods in
     the namespace. This matches only traffic originated by pods in
     the same namespace (based on the namespace address set).

Add a new namespace annotation to allow enabling of multicast:
"k8s.ovn.org/multicast-enabled".

Signed-off-by: Dumitru Ceara <[email protected]>
  • Loading branch information
dceara committed Nov 1, 2019
1 parent 4830aa1 commit c3def15
Show file tree
Hide file tree
Showing 8 changed files with 351 additions and 47 deletions.
49 changes: 46 additions & 3 deletions go-controller/pkg/ovn/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,22 @@ func (oc *Controller) StartClusterMaster(masterNodeName string) error {
}
oc.masterSubnetAllocatorList = masterSubnetAllocatorList

if _, _, err := util.RunOVNNbctl("--columns=_uuid", "list", "port_group"); err == nil {
oc.portGroupSupport = true
}

// Multicast support requires portGroupSupport
if oc.portGroupSupport {
if _, _, err := util.RunOVNSbctl("--columns=_uuid", "list", "IGMP_Group"); err == nil {
oc.multicastSupport = true
}
}

if err := oc.SetupMaster(masterNodeName); err != nil {
logrus.Errorf("Failed to setup master (%v)", err)
return err
}

if _, _, err := util.RunOVNNbctl("--columns=_uuid", "list", "port_group"); err == nil {
oc.portGroupSupport = true
}
return nil
}

Expand Down Expand Up @@ -106,6 +114,27 @@ func (oc *Controller) SetupMaster(masterNodeName string) error {
return err
}

// If supported, enable IGMP relay on the router to forward multicast
// traffic between nodes.
if oc.multicastSupport {
stdout, stderr, err = util.RunOVNNbctl("--", "set", "logical_router",
OvnClusterRouter, "options:mcast_relay=\"true\"")
if err != nil {
logrus.Errorf("Failed to enable IGMP relay on the cluster router, "+
"stdout: %q, stderr: %q, error: %v", stdout, stderr, err)
return err
}

// Drop IP multicast globally. Multicast is allowed only if explicitly
// enabled in a namespace.
err = oc.createDefaultDenyMulticastPolicy()
if err != nil {
logrus.Errorf("Failed to create default deny multicast policy, error: %v",
err)
return err
}
}

// Create 2 load-balancers for east-west traffic. One handles UDP and another handles TCP.
oc.TCPLoadBalancerUUID, stderr, err = util.RunOVNNbctl("--data=bare", "--no-heading", "--columns=_uuid", "find", "load_balancer", "external_ids:k8s-cluster-lb-tcp=yes")
if err != nil {
Expand Down Expand Up @@ -275,6 +304,20 @@ func (oc *Controller) ensureNodeLogicalNetwork(nodeName string, hostsubnet *net.
return err
}

// If supported, enable IGMP snooping and querier on the node.
if oc.multicastSupport {
stdout, stderr, err = util.RunOVNNbctl("set", "logical_switch",
nodeName, "other-config:mcast_snoop=\"true\"",
"other-config:mcast_querier=\"true\"",
"other-config:mcast_eth_src=\""+nodeLRPMac+"\"",
"other-config:mcast_ip4_src=\""+firstIP.IP.String()+"\"")
if err != nil {
logrus.Errorf("Failed to enable IGMP on logical switch %v, stdout: %q, stderr: %q, error: %v",
nodeName, stdout, stderr, err)
return err
}
}

// Connect the switch to the router.
stdout, stderr, err = util.RunOVNNbctl("--", "--may-exist", "lsp-add", nodeName, "stor-"+nodeName,
"--", "set", "logical_switch_port", "stor-"+nodeName, "type=router", "options:router-port=rtos-"+nodeName, "addresses="+"\""+nodeLRPMac+"\"")
Expand Down
12 changes: 11 additions & 1 deletion go-controller/pkg/ovn/master_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, st

fexec := ovntest.NewFakeExec()
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --columns=_uuid list port_group",
"ovn-sbctl --timeout=15 --columns=_uuid list IGMP_Group",
"ovn-nbctl --timeout=15 -- --may-exist lr-add ovn_cluster_router -- set logical_router ovn_cluster_router external_ids:k8s-cluster-router=yes",
"ovn-nbctl --timeout=15 -- set logical_router ovn_cluster_router options:mcast_relay=\"true\"",
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find port_group name=mcastPortGroupDeny",
"ovn-nbctl --timeout=15 create port_group name=mcastPortGroupDeny external-ids:name=mcastPortGroupDeny",
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"inport == @mcastPortGroupDeny && ip4.mcast\" action=drop external-ids:default-deny-policy-type=Egress",
"ovn-nbctl --timeout=15 --id=@acl create acl priority=1011 direction=from-lport match=\"inport == @mcastPortGroupDeny && ip4.mcast\" action=drop external-ids:default-deny-policy-type=Egress -- add port_group acls @acl",
"ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find ACL match=\"outport == @mcastPortGroupDeny && ip4.mcast\" action=drop external-ids:default-deny-policy-type=Ingress",
"ovn-nbctl --timeout=15 --id=@acl create acl priority=1011 direction=to-lport match=\"outport == @mcastPortGroupDeny && ip4.mcast\" action=drop external-ids:default-deny-policy-type=Ingress -- add port_group acls @acl",
})
fexec.AddFakeCmd(&ovntest.ExpectedCmd{
Cmd: "ovn-nbctl --timeout=15 --data=bare --no-heading --columns=_uuid find load_balancer external_ids:k8s-cluster-lb-tcp=yes",
Expand All @@ -58,14 +67,14 @@ func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, st
})
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 -- --may-exist lsp-add join jtor-ovn_cluster_router -- set logical_switch_port jtor-ovn_cluster_router type=router options:router-port=rtoj-ovn_cluster_router addresses=\"" + joinLRPMAC + "\"",
"ovn-nbctl --timeout=15 --columns=_uuid list port_group",
})

// Node-related logical network stuff
ip, cidr, err := net.ParseCIDR(nodeSubnet)
Expect(err).NotTo(HaveOccurred())
cidr.IP = util.NextIP(ip)
gwCIDR := cidr.String()
gwIP := cidr.IP.String()
nodeMgmtPortIP := util.NextIP(cidr.IP).String()

fexec.AddFakeCmdsNoOutputNoError([]string{
Expand All @@ -79,6 +88,7 @@ func defaultFakeExec(nodeSubnet, nodeName string) (*ovntest.FakeExec, string, st
fexec.AddFakeCmdsNoOutputNoError([]string{
"ovn-nbctl --timeout=15 --may-exist lrp-add ovn_cluster_router rtos-" + nodeName + " " + lrpMAC + " " + gwCIDR,
"ovn-nbctl --timeout=15 -- --may-exist ls-add " + nodeName + " -- set logical_switch " + nodeName + " other-config:subnet=" + nodeSubnet + " other-config:exclude_ips=" + nodeMgmtPortIP + " external-ids:gateway_ip=" + gwCIDR,
"ovn-nbctl --timeout=15 set logical_switch " + nodeName + " other-config:mcast_snoop=\"true\" other-config:mcast_querier=\"true\" other-config:mcast_eth_src=\"" + lrpMAC + "\" other-config:mcast_ip4_src=\"" + gwIP + "\"",
"ovn-nbctl --timeout=15 -- --may-exist lsp-add " + nodeName + " stor-" + nodeName + " -- set logical_switch_port stor-" + nodeName + " type=router options:router-port=rtos-" + nodeName + " addresses=\"" + lrpMAC + "\"",
"ovn-nbctl --timeout=15 set logical_switch " + nodeName + " load_balancer=" + tcpLBUUID,
"ovn-nbctl --timeout=15 add logical_switch " + nodeName + " load_balancer " + udpLBUUID,
Expand Down
89 changes: 82 additions & 7 deletions go-controller/pkg/ovn/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
kapi "k8s.io/api/core/v1"
)

const (
// Annotation used to enable/disable multicast in the namespace
nsMulticastAnnotation = "k8s.ovn.org/multicast-enabled"
)

func (oc *Controller) syncNamespaces(namespaces []interface{}) {
expectedNs := make(map[string]bool)
for _, nsInterface := range namespaces {
Expand Down Expand Up @@ -49,29 +54,40 @@ func (oc *Controller) waitForNamespaceEvent(namespace string) error {
return nil
}

func (oc *Controller) addPodToNamespaceAddressSet(ns string, ip net.IP) {
func (oc *Controller) addPodToNamespace(ns string, ip net.IP,
logicalPort string) {
mutex := oc.getNamespaceLock(ns)
if mutex == nil {
return
}
defer mutex.Unlock()

if oc.namespacePolicies[ns] == nil {
return
}

// If pod has already been added, nothing to do.
address := ip.String()
if oc.namespaceAddressSet[ns][address] {
if oc.namespaceAddressSet[ns][address] != "" {
return
}

oc.namespaceAddressSet[ns][address] = true
oc.namespaceAddressSet[ns][address] = logicalPort
addresses := make([]string, 0)
for address := range oc.namespaceAddressSet[ns] {
addresses = append(addresses, address)
}

oc.setAddressSet(hashedAddressSet(ns), addresses)

// Enforce the default deny multicast policy
if oc.multicastSupport {
oc.podAddDefaultDenyMulticastPolicy(logicalPort)
}
}

func (oc *Controller) deletePodFromNamespaceAddressSet(ns string, ip net.IP) {
func (oc *Controller) deletePodFromNamespace(ns string, ip net.IP,
logicalPort string) {
if ip == nil {
return
}
Expand All @@ -83,7 +99,7 @@ func (oc *Controller) deletePodFromNamespaceAddressSet(ns string, ip net.IP) {
defer mutex.Unlock()

address := ip.String()
if !oc.namespaceAddressSet[ns][address] {
if oc.namespaceAddressSet[ns][address] == "" {
return
}

Expand All @@ -94,6 +110,51 @@ func (oc *Controller) deletePodFromNamespaceAddressSet(ns string, ip net.IP) {
}

oc.setAddressSet(hashedAddressSet(ns), addresses)

//Remove the port from the default deny multicast policy
if oc.multicastSupport {
oc.podDeleteDefaultDenyMulticastPolicy(logicalPort)
}
}

// Creates an explicit "allow" policy for multicast traffic within the
// namespace if multicast is enabled. Otherwise, removes the "allow" policy.
// Traffic will be dropped by the default multicast deny ACL.
func (oc *Controller) multicastUpdateNamespace(ns *kapi.Namespace) {
if !oc.multicastSupport {
return
}

enabled := (ns.Annotations[nsMulticastAnnotation] == "true")
enabledOld := oc.multicastEnabled[ns.Name]

if enabledOld == enabled {
return
}

var err error
if enabled {
err = oc.createMulticastAllowPolicy(ns.Name)
} else {
err = oc.deleteMulticastAllowPolicy(ns.Name)
}
if err != nil {
logrus.Errorf(err.Error())
return
}

oc.multicastEnabled[ns.Name] = enabled
}

// Cleans up the multicast policy for this namespace if multicast was
// previously allowed.
func (oc *Controller) multicastDeleteNamespace(ns *kapi.Namespace) {
if oc.multicastEnabled[ns.Name] {
if err := oc.deleteMulticastAllowPolicy(ns.Name); err != nil {
logrus.Errorf(err.Error())
}
}
delete(oc.multicastEnabled, ns.Name)
}

// AddNamespace creates corresponding addressset in ovn db
Expand All @@ -110,7 +171,7 @@ func (oc *Controller) AddNamespace(ns *kapi.Namespace) {
defer oc.namespaceMutex[ns.Name].Unlock()
oc.namespaceMutexMutex.Unlock()

oc.namespaceAddressSet[ns.Name] = make(map[string]bool)
oc.namespaceAddressSet[ns.Name] = make(map[string]string)

// Get all the pods in the namespace and append their IP to the
// address_set
Expand All @@ -120,7 +181,8 @@ func (oc *Controller) AddNamespace(ns *kapi.Namespace) {
} else {
for _, pod := range existingPods.Items {
if pod.Status.PodIP != "" {
oc.namespaceAddressSet[ns.Name][pod.Status.PodIP] = true
portName := podLogicalPortName(&pod)
oc.namespaceAddressSet[ns.Name][pod.Status.PodIP] = portName
}
}
}
Expand All @@ -136,6 +198,18 @@ func (oc *Controller) AddNamespace(ns *kapi.Namespace) {
addresses)

oc.namespacePolicies[ns.Name] = make(map[string]*namespacePolicy)
oc.multicastUpdateNamespace(ns)
}

func (oc *Controller) updateNamespace(old, newer *kapi.Namespace) {
logrus.Debugf("Updating namespace: old %s new %s", old.Name, newer.Name)

// A big fat lock per namespace to prevent race conditions
// with namespace resources like address sets and deny acls.
oc.namespaceMutex[newer.Name].Lock()
defer oc.namespaceMutex[newer.Name].Unlock()

oc.multicastUpdateNamespace(newer)
}

func (oc *Controller) deleteNamespace(ns *kapi.Namespace) {
Expand All @@ -151,6 +225,7 @@ func (oc *Controller) deleteNamespace(ns *kapi.Namespace) {
defer mutex.Unlock()

oc.deleteAddressSet(hashedAddressSet(ns.Name))
oc.multicastDeleteNamespace(ns)
delete(oc.namespacePolicies, ns.Name)
delete(oc.namespaceAddressSet, ns.Name)
delete(oc.namespaceMutex, ns.Name)
Expand Down
20 changes: 14 additions & 6 deletions go-controller/pkg/ovn/ovn.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ type Controller struct {
// A cache of all logical ports and its corresponding uuids.
logicalPortUUIDCache map[string]string

// For each namespace, an address_set that has all the pod IP
// address in that namespace
namespaceAddressSet map[string]map[string]bool
// For each namespace, a map from pod IP address to logical port name
// for all pods in that namespace.
namespaceAddressSet map[string]map[string]string

// For each namespace, a lock to protect critical regions
namespaceMutex map[string]*sync.Mutex
Expand Down Expand Up @@ -97,9 +97,15 @@ type Controller struct {
// logicalSwitch information
lsMutex *sync.Mutex

// supports port_group?
// Per namespace multicast enabled?
multicastEnabled map[string]bool

// Supports port_group?
portGroupSupport bool

// Supports multicast?
multicastSupport bool

// Map of load balancers to service namespace
serviceVIPToName map[ServiceVIPKey]types.NamespacedName

Expand All @@ -123,7 +129,7 @@ func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory)
logicalSwitchCache: make(map[string]bool),
logicalPortCache: make(map[string]string),
logicalPortUUIDCache: make(map[string]string),
namespaceAddressSet: make(map[string]map[string]bool),
namespaceAddressSet: make(map[string]map[string]string),
namespacePolicies: make(map[string]map[string]*namespacePolicy),
namespaceMutex: make(map[string]*sync.Mutex),
namespaceMutexMutex: sync.Mutex{},
Expand All @@ -134,6 +140,7 @@ func NewOvnController(kubeClient kubernetes.Interface, wf *factory.WatchFactory)
gatewayCache: make(map[string]string),
loadbalancerClusterCache: make(map[string]string),
loadbalancerGWCache: make(map[string]string),
multicastEnabled: make(map[string]bool),
serviceVIPToName: make(map[ServiceVIPKey]types.NamespacedName),
serviceVIPToNameLock: sync.Mutex{},
}
Expand Down Expand Up @@ -431,7 +438,8 @@ func (oc *Controller) WatchNamespaces() error {
return
},
UpdateFunc: func(old, newer interface{}) {
// We only use namespace's name and that does not get updated.
oldNs, newNs := old.(*kapi.Namespace), newer.(*kapi.Namespace)
oc.updateNamespace(oldNs, newNs)
return
},
DeleteFunc: func(obj interface{}) {
Expand Down
1 change: 1 addition & 0 deletions go-controller/pkg/ovn/ovn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ func (o *FakeOVN) init() {

o.controller = NewOvnController(o.fakeClient, o.watcher)
o.controller.portGroupSupport = true
o.controller.multicastSupport = true
}
15 changes: 10 additions & 5 deletions go-controller/pkg/ovn/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

// Builds the logical switch port name for a given pod.
func podLogicalPortName(pod *kapi.Pod) string {
return pod.Namespace + "_" + pod.Name
}

func (oc *Controller) syncPods(pods []interface{}) {
// get the list of logical switch ports (equivalent to pods)
expectedLogicalPorts := make(map[string]bool)
Expand All @@ -21,7 +26,7 @@ func (oc *Controller) syncPods(pods []interface{}) {
logrus.Errorf("Spurious object in syncPods: %v", podInterface)
continue
}
logicalPort := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name)
logicalPort := podLogicalPortName(pod)
expectedLogicalPorts[logicalPort] = true
}

Expand Down Expand Up @@ -155,7 +160,7 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) {
}

logrus.Infof("Deleting pod: %s", pod.Name)
logicalPort := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name)
logicalPort := podLogicalPortName(pod)
out, stderr, err := util.RunOVNNbctl("--if-exists", "lsp-del",
logicalPort)
if err != nil {
Expand Down Expand Up @@ -187,7 +192,7 @@ func (oc *Controller) deleteLogicalPort(pod *kapi.Pod) {
oc.deleteACLDenyOld(pod.Namespace, pod.Spec.NodeName, logicalPort,
"Egress")
}
oc.deletePodFromNamespaceAddressSet(pod.Namespace, podIP)
oc.deletePodFromNamespace(pod.Namespace, podIP, logicalPort)
return
}

Expand Down Expand Up @@ -242,7 +247,7 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
return
}

portName := fmt.Sprintf("%s_%s", pod.Namespace, pod.Name)
portName := podLogicalPortName(pod)
logrus.Debugf("Creating logical port for %s on switch %s", portName, logicalSwitch)

annotation, err := util.UnmarshalPodAnnotation(pod.Annotations["ovn"])
Expand Down Expand Up @@ -339,7 +344,7 @@ func (oc *Controller) addLogicalPort(pod *kapi.Pod) {
if err != nil {
logrus.Errorf("Failed to set annotation on pod %s - %v", pod.Name, err)
}
oc.addPodToNamespaceAddressSet(pod.Namespace, podIP)
oc.addPodToNamespace(pod.Namespace, podIP, portName)

return
}
Loading

0 comments on commit c3def15

Please sign in to comment.