Skip to content

Commit

Permalink
wireguard, linuxnodehandler: untangle wg from lnh
Browse files Browse the repository at this point in the history
The reason the WireGuard agent node event handling was contained within
the linuxNodeHandler code was routing, which is no longer the case. In
addition, entangling the two leads to a deadlock, as diagnosed in GitHub
issue cilium#24574.

This patch thus implements NodeHandler for the WireGuard agent, and
subscribes to the NodeManager itself. That way, the wait cycle of the
deadlock is broken, as the linuxNodeHandler doesn't acquire the IPCache
lock while holding its lock.

From the perspective of the agent, the invocations of the callbacks
change insofar that previously, only once the linuxNodeHandler
considered itself "initialised" it would forward node events.
Specifically, this excluded the initial sync of nodes performed on
subscribe. However, I didn't see a reason to specifically replicate this
behaviour.

Suggested-by: Sebastian Wicki <[email protected]>
Signed-off-by: David Bimmler <[email protected]>
  • Loading branch information
bimmlerd authored and christarazi committed May 25, 2023
1 parent 57c2064 commit c8598f8
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 34 deletions.
2 changes: 2 additions & 0 deletions daemon/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,8 @@ func newDaemon(ctx context.Context, cleaner *daemonCleanup, params *daemonParams
log.WithError(err).Error("failed to initialize wireguard agent")
return nil, nil, fmt.Errorf("failed to initialize wireguard agent: %w", err)
}

params.NodeManager.Subscribe(params.WGAgent)
}

// Perform an early probe on the underlying kernel on whether BandwidthManager
Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/linux/datapath.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func NewDatapath(cfg DatapathConfiguration, ruleManager datapath.IptablesManager
lbmap: lbmap.New(),
}

dp.node = NewNodeHandler(cfg, dp.nodeAddressing, wgAgent)
dp.node = NewNodeHandler(cfg, dp.nodeAddressing)
return dp
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/linux/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func FuzzNodeHandler(f *testing.F) {
}
dpConfig := DatapathConfiguration{HostDevice: "veth0"}
fakeNodeAddressing := fake.NewNodeAddressing()
linuxNodeHandler := NewNodeHandler(dpConfig, fakeNodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, fakeNodeAddressing)
if linuxNodeHandler == nil {
panic("Should not be nil")
}
Expand Down
18 changes: 1 addition & 17 deletions pkg/datapath/linux/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ type linuxNodeHandler struct {
neighNextHopRefCount counter.StringCounter
neighByNextHop map[string]*netlink.Neigh // key = string(net.IP)
neighLastPingByNextHop map[string]time.Time // key = string(net.IP)
wgAgent datapath.WireguardAgent

// Pool of available IDs for nodes.
nodeIDs idpool.IDPool
Expand All @@ -88,7 +87,7 @@ var (

// NewNodeHandler returns a new node handler to handle node events and
// implement the implications in the Linux datapath
func NewNodeHandler(datapathConfig DatapathConfiguration, nodeAddressing datapath.NodeAddressing, wgAgent datapath.WireguardAgent) *linuxNodeHandler {
func NewNodeHandler(datapathConfig DatapathConfiguration, nodeAddressing datapath.NodeAddressing) *linuxNodeHandler {
return &linuxNodeHandler{
nodeAddressing: nodeAddressing,
datapathConfig: datapathConfig,
Expand All @@ -98,7 +97,6 @@ func NewNodeHandler(datapathConfig DatapathConfiguration, nodeAddressing datapat
neighNextHopRefCount: counter.StringCounter{},
neighByNextHop: map[string]*netlink.Neigh{},
neighLastPingByNextHop: map[string]time.Time{},
wgAgent: wgAgent,
nodeIDs: idpool.NewIDPool(minNodeID, maxNodeID),
nodeIDsByIPs: map[string]uint16{},
nodeIPsByIDs: map[uint16]string{},
Expand Down Expand Up @@ -1123,14 +1121,6 @@ func (n *linuxNodeHandler) nodeUpdate(oldNode, newNode *nodeTypes.Node, firstAdd
return nil
}

if option.Config.EnableWireguard && newNode.WireguardPubKey != "" {
if err := n.wgAgent.UpdatePeer(newNode.Fullname(), newNode.WireguardPubKey, newIP4, newIP6); err != nil {
log.WithError(err).
WithField(logfields.NodeName, newNode.Fullname()).
Warning("Failed to update wireguard configuration for peer")
}
}

if n.nodeConfig.EnableAutoDirectRouting {
n.updateDirectRoutes(oldAllIP4AllocCidrs, newAllIP4AllocCidrs, oldIP4, newIP4, firstAddition, n.nodeConfig.EnableIPv4)
n.updateDirectRoutes(oldAllIP6AllocCidrs, newAllIP6AllocCidrs, oldIP6, newIP6, firstAddition, n.nodeConfig.EnableIPv6)
Expand Down Expand Up @@ -1238,12 +1228,6 @@ func (n *linuxNodeHandler) nodeDelete(oldNode *nodeTypes.Node) error {
n.deleteIPsec(oldNode)
}

if option.Config.EnableWireguard {
if err := n.wgAgent.DeletePeer(oldNode.Fullname()); err != nil {
return err
}
}

n.deallocateIDForNode(oldNode)

return nil
Expand Down
28 changes: 14 additions & 14 deletions pkg/datapath/linux/node_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *linuxPrivilegedBaseTestSuite) TestUpdateNodeRoute(c *check.C) {

dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}

linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))
nodeConfig := datapath.LocalNodeConfiguration{
EnableIPv4: s.enableIPv4,
Expand Down Expand Up @@ -249,7 +249,7 @@ func (s *linuxPrivilegedBaseTestSuite) TestUpdateNodeRoute(c *check.C) {
func (s *linuxPrivilegedBaseTestSuite) TestSingleClusterPrefix(c *check.C) {
dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}

linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

// enable as per test definition
Expand Down Expand Up @@ -315,7 +315,7 @@ func (s *linuxPrivilegedBaseTestSuite) TestAuxiliaryPrefixes(c *check.C) {
net2 := cidr.MustParseCIDR("cafe:f00d::/112")

dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))
nodeConfig := datapath.LocalNodeConfiguration{
EnableIPv4: s.enableIPv4,
Expand Down Expand Up @@ -389,7 +389,7 @@ func (s *linuxPrivilegedBaseTestSuite) TestNodeUpdateEncapsulation(c *check.C) {
externalNodeIP2 := net.ParseIP("8.8.8.8")

dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))
nodeConfig := datapath.LocalNodeConfiguration{
EnableIPv4: s.enableIPv4,
Expand Down Expand Up @@ -679,7 +679,7 @@ func (s *linuxPrivilegedBaseTestSuite) TestNodeUpdateDirectRouting(c *check.C) {
defer removeDevice(externalNode2Device)

dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))
nodeConfig := datapath.LocalNodeConfiguration{
EnableIPv4: s.enableIPv4,
Expand Down Expand Up @@ -894,7 +894,7 @@ func (s *linuxPrivilegedBaseTestSuite) TestAgentRestartOptionChanges(c *check.C)
underlayIP := net.ParseIP("4.4.4.4")

dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))
nodeConfig := datapath.LocalNodeConfiguration{
EnableIPv4: s.enableIPv4,
Expand Down Expand Up @@ -1001,7 +1001,7 @@ func (s *linuxPrivilegedBaseTestSuite) TestNodeValidationDirectRouting(c *check.
ip4Alloc1 := cidr.MustParseCIDR("5.5.5.0/24")
ip6Alloc1 := cidr.MustParseCIDR("2001:aaaa::/96")
dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

if s.enableIPv4 {
Expand Down Expand Up @@ -1166,7 +1166,7 @@ func (s *linuxPrivilegedIPv6OnlyTestSuite) TestArpPingHandling(c *check.C) {
defer func() { option.Config.ARPPingRefreshPeriod = prevARPPeriod }()
option.Config.ARPPingRefreshPeriod = time.Duration(1 * time.Nanosecond)

linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

err = linuxNodeHandler.NodeConfigurationChanged(datapath.LocalNodeConfiguration{
Expand Down Expand Up @@ -2055,7 +2055,7 @@ func (s *linuxPrivilegedIPv6OnlyTestSuite) TestArpPingHandlingForMultiDevice(c *
defer func() { option.Config.ARPPingRefreshPeriod = prevARPPeriod }()
option.Config.ARPPingRefreshPeriod = 1 * time.Nanosecond

linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

err = linuxNodeHandler.NodeConfigurationChanged(datapath.LocalNodeConfiguration{
Expand Down Expand Up @@ -2336,7 +2336,7 @@ func (s *linuxPrivilegedIPv4OnlyTestSuite) TestArpPingHandling(c *check.C) {
defer func() { option.Config.ARPPingRefreshPeriod = prevARPPeriod }()
option.Config.ARPPingRefreshPeriod = time.Duration(1 * time.Nanosecond)

linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

err = linuxNodeHandler.NodeConfigurationChanged(datapath.LocalNodeConfiguration{
Expand Down Expand Up @@ -3226,7 +3226,7 @@ func (s *linuxPrivilegedIPv4OnlyTestSuite) TestArpPingHandlingForMultiDevice(c *
defer func() { option.Config.ARPPingRefreshPeriod = prevARPPeriod }()
option.Config.ARPPingRefreshPeriod = 1 * time.Nanosecond

linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

err = linuxNodeHandler.NodeConfigurationChanged(datapath.LocalNodeConfiguration{
Expand Down Expand Up @@ -3425,7 +3425,7 @@ func (s *linuxPrivilegedBaseTestSuite) benchmarkNodeUpdate(c *check.C, config da
ip6Alloc2 := cidr.MustParseCIDR("2001:bbbb::/96")

dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

err := linuxNodeHandler.NodeConfigurationChanged(config)
Expand Down Expand Up @@ -3531,7 +3531,7 @@ func (s *linuxPrivilegedBaseTestSuite) benchmarkNodeUpdateNOP(c *check.C, config
ip6Alloc1 := cidr.MustParseCIDR("2001:aaaa::/96")

dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

err := linuxNodeHandler.NodeConfigurationChanged(config)
Expand Down Expand Up @@ -3600,7 +3600,7 @@ func (s *linuxPrivilegedBaseTestSuite) benchmarkNodeValidateImplementation(c *ch
ip6Alloc1 := cidr.MustParseCIDR("2001:aaaa::/96")

dpConfig := DatapathConfiguration{HostDevice: dummyHostDeviceName}
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing, nil)
linuxNodeHandler := NewNodeHandler(dpConfig, s.nodeAddressing)
c.Assert(linuxNodeHandler, check.Not(check.IsNil))

err := linuxNodeHandler.NodeConfigurationChanged(config)
Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/linux/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *linuxTestSuite) TestCreateNodeRoute(c *check.C) {

fakeNodeAddressing := fake.NewNodeAddressing()

nodeHandler := NewNodeHandler(dpConfig, fakeNodeAddressing, nil)
nodeHandler := NewNodeHandler(dpConfig, fakeNodeAddressing)

c1 := cidr.MustParseCIDR("10.10.0.0/16")
generatedRoute, err := nodeHandler.createNodeRouteSpec(c1, false)
Expand Down
59 changes: 59 additions & 0 deletions pkg/wireguard/agent/node_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package agent

import (
datapath "github.com/cilium/cilium/pkg/datapath/types"
"github.com/cilium/cilium/pkg/logging/logfields"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
)

// NodeAdd is called when a node is discovered for the first time.
func (a *Agent) NodeAdd(newNode nodeTypes.Node) error {
return a.nodeUpsert(newNode)
}

// NmdeUpdate is called when a node definition changes. Both the old
// and new node definition is provided. NodeUpdate() is never called
// before NodeAdd() is called for a particular node.
func (a *Agent) NodeUpdate(_, newNode nodeTypes.Node) error {
return a.nodeUpsert(newNode)
}

// NodeDelete is called after a node has been deleted
func (a *Agent) NodeDelete(node nodeTypes.Node) error {
if node.IsLocal() {
return nil
}

return a.DeletePeer(node.Fullname())
}

// NodeValidateImplementation is called to validate the implementation of
// the node in the datapath. This function is intended to be run on an
// interval to ensure that the datapath is consistently converged.
func (a *Agent) NodeValidateImplementation(node nodeTypes.Node) error {
return a.nodeUpsert(node)
}

func (a *Agent) nodeUpsert(node nodeTypes.Node) error {
if node.IsLocal() || node.WireguardPubKey == "" {
return nil
}

newIP4 := node.GetNodeIP(false)
newIP6 := node.GetNodeIP(true)

if err := a.UpdatePeer(node.Fullname(), node.WireguardPubKey, newIP4, newIP6); err != nil {
log.WithError(err).
WithField(logfields.NodeName, node.Fullname()).
Warning("Failed to update wireguard configuration for peer")
}

return nil
}

// NodeConfigurationChanged is called when the local node configuration
// has changed
func (a *Agent) NodeConfigurationChanged(config datapath.LocalNodeConfiguration) error { return nil }

0 comments on commit c8598f8

Please sign in to comment.