Skip to content

Commit

Permalink
BGP CP: Replaces LocalNodeStore with Local CiliumNode
Browse files Browse the repository at this point in the history
- Replaces the LocalNodeStore with the existing local CiliumNode
  throughout the BGP CP.
- Updates unit and integration tests accordingly.
- Adds the GetIP() method for returning the CiliumNode IP.

Signed-off-by: Daneyon Hansen <[email protected]>
  • Loading branch information
danehans authored and pjablonski123 committed Dec 15, 2023
1 parent 76871f3 commit 1af6842
Show file tree
Hide file tree
Showing 17 changed files with 280 additions and 254 deletions.
21 changes: 5 additions & 16 deletions pkg/bgpv1/agent/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
slimmetav1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/cilium/pkg/option"
)

Expand Down Expand Up @@ -51,7 +50,6 @@ func (plf policyListerFunc) List() ([]*v2alpha1api.CiliumBGPPeeringPolicy, error
// Controller listens for events and drives BGP related sub-systems
// to maintain a desired state.
type Controller struct {
LocalNodeStore *node.LocalNodeStore
// CiliumNodeResource provides a stream of events for changes to the local CiliumNode resource.
CiliumNodeResource daemon_k8s.LocalCiliumNodeResource
// LocalCiliumNode is the CiliumNode object for the local node.
Expand Down Expand Up @@ -88,7 +86,6 @@ type ControllerParams struct {
PolicyResource resource.Resource[*v2alpha1api.CiliumBGPPeeringPolicy]
DaemonConfig *option.DaemonConfig
LocalCiliumNodeResource daemon_k8s.LocalCiliumNodeResource
LocalNodeStore *node.LocalNodeStore
}

// NewController constructs a new BGP Control Plane Controller.
Expand All @@ -110,7 +107,6 @@ func NewController(params ControllerParams) (*Controller, error) {
Sig: params.Sig,
BGPMgr: params.RouteMgr,
PolicyResource: params.PolicyResource,
LocalNodeStore: params.LocalNodeStore,
CiliumNodeResource: params.LocalCiliumNodeResource,
}

Expand Down Expand Up @@ -195,12 +191,6 @@ func (c *Controller) Run(ctx context.Context) {
"component": "Controller.Run",
})
)
l.Info("Starting LocalNodeStore Observer")

// setup a reconciliation trigger on LocalNodeStore changes
c.LocalNodeStore.Observe(ctx, func(node node.LocalNode) { c.Sig.Event(struct{}{}) }, func(err error) {
l.WithError(err).Info("LocalNodeStore observe has yielded. Reconciliation will no longer be triggered for LocalNode changes")
})

// add an initial signal to kick things off
c.Sig.Event(struct{}{})
Expand Down Expand Up @@ -298,9 +288,8 @@ func (c *Controller) Reconcile(ctx context.Context) error {
})
)

localNode, err := c.LocalNodeStore.Get(ctx)
if err != nil {
return fmt.Errorf("failed to retrieve local node: %w", err)
if c.LocalCiliumNode == nil {
return fmt.Errorf("attempted reconciliation with nil local CiliumNode")
}

// retrieve all CiliumBGPPeeringPolicies
Expand All @@ -311,7 +300,7 @@ func (c *Controller) Reconcile(ctx context.Context) error {
l.WithField("count", len(policies)).Debug("Successfully listed CiliumBGPPeeringPolicies")

// perform policy selection based on node.
labels := localNode.Labels
labels := c.LocalCiliumNode.Labels
policy, err := PolicySelection(ctx, labels, policies)
if err != nil {
l.WithError(err).Error("Policy selection failed")
Expand All @@ -337,7 +326,7 @@ func (c *Controller) Reconcile(ctx context.Context) error {

// call bgp sub-systems required to apply this policy's BGP topology.
l.Debug("Asking configured BGPRouterManager to configure peering")
if err := c.BGPMgr.ConfigurePeers(ctx, policy, &localNode, c.LocalCiliumNode); err != nil {
if err := c.BGPMgr.ConfigurePeers(ctx, policy, c.LocalCiliumNode); err != nil {
return fmt.Errorf("failed to configure BGP peers, cannot apply BGP peering policy: %w", err)
}

Expand All @@ -347,7 +336,7 @@ func (c *Controller) Reconcile(ctx context.Context) error {
// FullWithdrawal will instruct the configured BGPRouterManager to withdraw all
// BGP servers and peers.
func (c *Controller) FullWithdrawal(ctx context.Context) {
_ = c.BGPMgr.ConfigurePeers(ctx, nil, nil, nil) // cannot fail, no need for error handling
_ = c.BGPMgr.ConfigurePeers(ctx, nil, nil) // cannot fail, no need for error handling
}

// validatePolicy validates the CiliumBGPPeeringPolicy.
Expand Down
78 changes: 38 additions & 40 deletions pkg/bgpv1/agent/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"errors"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"

"github.com/cilium/cilium/pkg/bgpv1/agent"
"github.com/cilium/cilium/pkg/bgpv1/mock"
v2api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
v2alpha1api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
v1 "github.com/cilium/cilium/pkg/k8s/slim/k8s/apis/meta/v1"
"github.com/cilium/cilium/pkg/node"
"github.com/cilium/cilium/pkg/node/types"
)

// TestControllerSanity ensures that the controller calls the correct methods,
Expand All @@ -40,7 +39,7 @@ func TestControllerSanity(t *testing.T) {
// a mock List method for the controller's PolicyLister
plist func() ([]*v2alpha1api.CiliumBGPPeeringPolicy, error)
// a mock ConfigurePeers method for the controller's BGPRouterManager
configurePeers func(context.Context, *v2alpha1api.CiliumBGPPeeringPolicy, *node.LocalNode, *v2api.CiliumNode) error
configurePeers func(context.Context, *v2alpha1api.CiliumBGPPeeringPolicy, *v2api.CiliumNode) error
// error nil or not
err error
}{
Expand All @@ -54,7 +53,7 @@ func TestControllerSanity(t *testing.T) {
plist: func() ([]*v2alpha1api.CiliumBGPPeeringPolicy, error) {
return []*v2alpha1api.CiliumBGPPeeringPolicy{wantPolicy}, nil
},
configurePeers: func(_ context.Context, p *v2alpha1api.CiliumBGPPeeringPolicy, node *node.LocalNode, ciliumNode *v2api.CiliumNode) error {
configurePeers: func(_ context.Context, p *v2alpha1api.CiliumBGPPeeringPolicy, ciliumNode *v2api.CiliumNode) error {
if !p.DeepEqual(wantPolicy) {
t.Fatalf("got: %+v, want: %+v", p, wantPolicy)
}
Expand Down Expand Up @@ -87,7 +86,7 @@ func TestControllerSanity(t *testing.T) {
}
return []*v2alpha1api.CiliumBGPPeeringPolicy{p}, nil
},
configurePeers: func(_ context.Context, p *v2alpha1api.CiliumBGPPeeringPolicy, _ *node.LocalNode, _ *v2api.CiliumNode) error {
configurePeers: func(_ context.Context, p *v2alpha1api.CiliumBGPPeeringPolicy, _ *v2api.CiliumNode) error {
for _, r := range p.Spec.VirtualRouters {
for _, n := range r.Neighbors {
if n.PeerPort == nil ||
Expand All @@ -113,7 +112,7 @@ func TestControllerSanity(t *testing.T) {
"bgp-policy": "a",
},
annotations: map[string]string{},
configurePeers: func(_ context.Context, p *v2alpha1api.CiliumBGPPeeringPolicy, _ *node.LocalNode, _ *v2api.CiliumNode) error {
configurePeers: func(_ context.Context, p *v2alpha1api.CiliumBGPPeeringPolicy, _ *v2api.CiliumNode) error {
return errors.New("")
},
err: errors.New(""),
Expand Down Expand Up @@ -142,7 +141,7 @@ func TestControllerSanity(t *testing.T) {
"bgp-policy": "a",
},
annotations: map[string]string{},
configurePeers: func(_ context.Context, p *v2alpha1api.CiliumBGPPeeringPolicy, _ *node.LocalNode, _ *v2api.CiliumNode) error {
configurePeers: func(_ context.Context, p *v2alpha1api.CiliumBGPPeeringPolicy, _ *v2api.CiliumNode) error {
return nil
},
err: errors.New(""),
Expand All @@ -157,18 +156,19 @@ func TestControllerSanity(t *testing.T) {
ConfigurePeers_: tt.configurePeers,
}

nodeStore := node.NewTestLocalNodeStore(node.LocalNode{
Node: types.Node{
// create test cilium node
node := &v2api.CiliumNode{
ObjectMeta: metav1.ObjectMeta{
Name: "Test Node",
Annotations: tt.annotations,
Labels: tt.labels,
},
})
}

c := agent.Controller{
PolicyLister: policyLister,
BGPMgr: rtmgr,
LocalNodeStore: nodeStore,
PolicyLister: policyLister,
BGPMgr: rtmgr,
LocalCiliumNode: node,
}

err := c.Reconcile(context.Background())
Expand Down Expand Up @@ -405,37 +405,35 @@ func TestPolicySelection(t *testing.T) {
}
for _, tt := range table {
t.Run(tt.name, func(t *testing.T) {
node.WithTestLocalNodeStore(func() {
// expand anon policies into CiliumBGPPeeringPolicy, make note of wanted
var policies []*v2alpha1api.CiliumBGPPeeringPolicy
var want *v2alpha1api.CiliumBGPPeeringPolicy
for _, p := range tt.policies {
policy := &v2alpha1api.CiliumBGPPeeringPolicy{
Spec: v2alpha1api.CiliumBGPPeeringPolicySpec{
NodeSelector: p.selector,
},
}
policies = append(policies, policy)
if p.want {
want = policy
}
// expand policies into CiliumBGPPeeringPolicies, make note of wanted
var policies []*v2alpha1api.CiliumBGPPeeringPolicy
var want *v2alpha1api.CiliumBGPPeeringPolicy
for _, p := range tt.policies {
policy := &v2alpha1api.CiliumBGPPeeringPolicy{
Spec: v2alpha1api.CiliumBGPPeeringPolicySpec{
NodeSelector: p.selector,
},
}
// call function under test
policy, err := agent.PolicySelection(context.Background(), tt.nodeLabels, policies)
if (tt.err == nil) != (err == nil) {
t.Fatalf("expected err: %v", (tt.err == nil))
policies = append(policies, policy)
if p.want {
want = policy
}
}
// call function under test
policy, err := agent.PolicySelection(context.Background(), tt.nodeLabels, policies)
if (tt.err == nil) != (err == nil) {
t.Fatalf("expected err: %v", (tt.err == nil))
}
if want != nil {
if policy == nil {
t.Fatalf("got: <nil>, want: %+v", *want)
}
if want != nil {
if policy == nil {
t.Fatalf("got: <nil>, want: %+v", *want)
}

// pointer comparison, not a deep equal.
if policy != want {
t.Fatalf("got: %+v, want: %+v", *policy, *want)
}
// pointer comparison, not a deep equal.
if policy != want {
t.Fatalf("got: %+v, want: %+v", *policy, *want)
}
})
}
})
}
}
3 changes: 1 addition & 2 deletions pkg/bgpv1/agent/routermgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
restapi "github.com/cilium/cilium/api/v1/server/restapi/bgp"
v2api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
v2alpha1api "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/node"
)

// BGPRouterManager provides a declarative API for defining
Expand All @@ -34,7 +33,7 @@ type BGPRouterManager interface {
//
// Providing a nil policy to ConfigurePeers will withdrawal all routes
// and disconnect from the peers.
ConfigurePeers(ctx context.Context, policy *v2alpha1api.CiliumBGPPeeringPolicy, node *node.LocalNode, ciliumNode *v2api.CiliumNode) error
ConfigurePeers(ctx context.Context, policy *v2alpha1api.CiliumBGPPeeringPolicy, ciliumNode *v2api.CiliumNode) error

// GetPeers fetches BGP peering state from underlying routing daemon.
//
Expand Down
23 changes: 9 additions & 14 deletions pkg/bgpv1/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/node"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -131,7 +130,6 @@ func NewBGPRouterManager(params bgpRouterManagerParams) agent.BGPRouterManager {
// This method is not thread safe and does not intend to be called concurrently.
func (m *BGPRouterManager) ConfigurePeers(ctx context.Context,
policy *v2alpha1api.CiliumBGPPeeringPolicy,
node *node.LocalNode,
ciliumNode *v2api.CiliumNode) error {
m.Lock()
defer m.Unlock()
Expand All @@ -144,7 +142,7 @@ func (m *BGPRouterManager) ConfigurePeers(ctx context.Context,

// use a reconcileDiff to compute which BgpServers must be created, removed
// and reconciled.
rd := newReconcileDiff(node, ciliumNode)
rd := newReconcileDiff(ciliumNode)

if policy == nil {
return m.withdrawAll(ctx, rd)
Expand Down Expand Up @@ -191,7 +189,7 @@ func (m *BGPRouterManager) register(ctx context.Context, rd *reconcileDiff) erro
l.Errorf("Work diff (add) contains unseen ASN %v, skipping", asn)
continue
}
if err := m.registerBGPServer(ctx, config, rd.node, rd.ciliumNode); err != nil {
if err := m.registerBGPServer(ctx, config, rd.ciliumNode); err != nil {
// we'll just log the error and attempt to register the next BgpServer.
l.WithError(err).Errorf("Error while registering new BGP server for local ASN %v.", config.LocalASN)
}
Expand All @@ -207,7 +205,6 @@ func (m *BGPRouterManager) register(ctx context.Context, rd *reconcileDiff) erro
// and deleted from our manager (if it was added).
func (m *BGPRouterManager) registerBGPServer(ctx context.Context,
c *v2alpha1api.CiliumBGPVirtualRouter,
node *node.LocalNode,
ciliumNode *v2api.CiliumNode) error {
l := log.WithFields(
logrus.Fields{
Expand All @@ -231,7 +228,7 @@ func (m *BGPRouterManager) registerBGPServer(ctx context.Context,
}
}()

annoMap, err := agent.NewAnnotationMap(node.Annotations)
annoMap, err := agent.NewAnnotationMap(ciliumNode.Annotations)
if err != nil {
return fmt.Errorf("unable to parse local node's annotations: %v", err)
}
Expand All @@ -247,11 +244,11 @@ func (m *BGPRouterManager) registerBGPServer(ctx context.Context,

routerID, err := annoMap.ResolveRouterID(c.LocalASN)
if err != nil {
nodeIP := node.GetNodeIP(false)
if nodeIP.IsUnspecified() {
return fmt.Errorf("failed to resolve router id: %w", err)
if nodeIP := ciliumNode.GetIP(false); nodeIP == nil {
return fmt.Errorf("failed to get ciliumnode IP %v: %w", nodeIP, err)
} else {
routerID = nodeIP.String()
}
routerID = nodeIP.String()
}

globalConfig := types.ServerParameters{
Expand All @@ -269,7 +266,7 @@ func (m *BGPRouterManager) registerBGPServer(ctx context.Context,
return fmt.Errorf("failed to start BGP server for config with local ASN %v: %w", c.LocalASN, err)
}

if err = m.reconcileBGPConfig(ctx, s, c, node, ciliumNode); err != nil {
if err = m.reconcileBGPConfig(ctx, s, c, ciliumNode); err != nil {
return fmt.Errorf("failed initial reconciliation for peer config with local ASN %v: %w", c.LocalASN, err)
}

Expand Down Expand Up @@ -340,7 +337,7 @@ func (m *BGPRouterManager) reconcile(ctx context.Context, rd *reconcileDiff) err
continue
}

if err := m.reconcileBGPConfig(ctx, sc, newc, rd.node, rd.ciliumNode); err != nil {
if err := m.reconcileBGPConfig(ctx, sc, newc, rd.ciliumNode); err != nil {
l.WithError(err).Errorf("Encountered error reconciling virtual router with local ASN %v, shutting down this server", newc.LocalASN)
sc.Server.Stop()
delete(m.Servers, asn)
Expand Down Expand Up @@ -368,7 +365,6 @@ func (m *BGPRouterManager) reconcile(ctx context.Context, rd *reconcileDiff) err
func (m *BGPRouterManager) reconcileBGPConfig(ctx context.Context,
sc *ServerWithConfig,
newc *v2alpha1api.CiliumBGPVirtualRouter,
node *node.LocalNode,
ciliumNode *v2api.CiliumNode) error {
if sc.Config != nil {
if sc.Config.LocalASN != newc.LocalASN {
Expand All @@ -379,7 +375,6 @@ func (m *BGPRouterManager) reconcileBGPConfig(ctx context.Context,
if err := r.Reconcile(ctx, ReconcileParams{
CurrentServer: sc,
DesiredConfig: newc,
Node: node,
CiliumNode: ciliumNode,
}); err != nil {
return fmt.Errorf("reconciliation of virtual router with local ASN %v failed: %w", newc.LocalASN, err)
Expand Down
Loading

0 comments on commit 1af6842

Please sign in to comment.