Skip to content

Commit

Permalink
BGP: Introduce new cli to get bgp state
Browse files Browse the repository at this point in the history
This change introduces new BGP status CLI : 'cilium bgp peers'.
Status of BGP peers is returned from all or specific node in json or summary formats.

Signed-off-by: harsimran pabla <[email protected]>
  • Loading branch information
harsimran-pabla authored and tklauser committed Apr 13, 2023
1 parent 44807f1 commit ac9bcfa
Show file tree
Hide file tree
Showing 14 changed files with 3,979 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# @cilium/cli Commandline interfaces
# @cilium/contributing Developer documentation & tools
# @cilium/github-sec GitHub security (handling of secrets, consequences of pull_request_target, etc.)
# @cilium/sig-bgp BGP integration
# @cilium/sig-clustermesh Clustermesh and external workloads
# @cilium/sig-hubble Hubble integration
# @cilium/sig-k8s K8s integration, K8s CNI plugin
Expand All @@ -22,6 +23,7 @@
/.github/kind-config*.yaml @cilium/ci-structure
/.github/tools/ @cilium/ci-structure
/.github/workflows/ @cilium/github-sec @cilium/ci-structure
/bgp/ @cilium/sig-bgp
/cmd/ @cilium/cli
/clustermesh/ @cilium/sig-clustermesh
/connectivity/ @cilium/cli
Expand Down
61 changes: 61 additions & 0 deletions bgp/bgp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package bgp

import (
"context"
"fmt"
"io"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/cilium/cilium-cli/k8s"
)

// Parameters contains options for CLI
type Parameters struct {
CiliumNamespace string
AgentPodSelector string
NodeName string
Writer io.Writer
WaitDuration time.Duration
Output string
}

// Status is used to get bgp state from cilium agents
type Status struct {
client *k8s.Client
params Parameters
ciliumPods []*corev1.Pod
}

// NewStatus returns new bgp.Status struct
func NewStatus(client *k8s.Client, p Parameters) *Status {
return &Status{
client: client,
params: p,
}
}

// initTargetCiliumPods stores cilium agent pods in the status.ciliumPods.
// If node selector option is specified then only that nodes' cilium-agent
// pod is stored else all cilium-agents in the cluster are stored.
func (s *Status) initTargetCiliumPods(ctx context.Context) error {
opts := metav1.ListOptions{LabelSelector: s.params.AgentPodSelector}
if s.params.NodeName != "" {
opts.FieldSelector = fmt.Sprintf("spec.nodeName=%s", s.params.NodeName)
}

ciliumPods, err := s.client.ListPods(ctx, s.params.CiliumNamespace, opts)
if err != nil {
return fmt.Errorf("unable to list Cilium pods: %w", err)
}

for _, ciliumPod := range ciliumPods.Items {
s.ciliumPods = append(s.ciliumPods, ciliumPod.DeepCopy())
}
return nil
}
183 changes: 183 additions & 0 deletions bgp/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package bgp

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"

"github.com/cilium/cilium/api/v1/models"
corev1 "k8s.io/api/core/v1"

"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/status"
)

const (
padding = 3
minWidth = 5
paddingChar = ' '
)

// GetPeeringState gets peering state from all/specific cilium agent pods.
func (s *Status) GetPeeringState(ctx context.Context) error {
ctx, cancelFn := context.WithTimeout(ctx, s.params.WaitDuration)
defer cancelFn()

err := s.initTargetCiliumPods(ctx)
if err != nil {
return err
}

res, err := s.fetchPeeringStateConcurrently(ctx)
if err != nil {
return err
}

return s.writeStatus(res)
}

func (s *Status) fetchPeeringStateConcurrently(ctx context.Context) (map[string][]*models.BgpPeer, error) {
allFetchedData := make(map[string][]*models.BgpPeer)

// res contains data returned from cilium pod
type res struct {
nodeName string
data []*models.BgpPeer
err error
}
resCh := make(chan res)

var wg sync.WaitGroup

// max number of concurrent go routines will be number of cilium agent pods
wg.Add(len(s.ciliumPods))

// concurrently fetch state from each cilium pod
for _, pod := range s.ciliumPods {
go func(ctx context.Context, pod *corev1.Pod) {
defer wg.Done()

peers, err := s.fetchPeeringStateFromPod(ctx, pod)
resCh <- res{
nodeName: pod.Spec.NodeName,
data: peers,
err: err,
}
}(ctx, pod)
}

// close resCh when data from all nodes is collected
go func() {
wg.Wait()
close(resCh)
}()

// read from the channel till it is closed.
// on error, store error and continue to next node.
var err error
for fetchedData := range resCh {
if fetchedData.err != nil {
err = errors.Join(err, fetchedData.err)
} else {
allFetchedData[fetchedData.nodeName] = fetchedData.data
}
}

return allFetchedData, err
}

func (s *Status) fetchPeeringStateFromPod(ctx context.Context, pod *corev1.Pod) ([]*models.BgpPeer, error) {
cmd := []string{"cilium", "bgp", "peers", "-o", "json"}
output, err := s.client.ExecInPod(ctx, pod.Namespace, pod.Name, defaults.AgentContainerName, cmd)
if err != nil {
return nil, fmt.Errorf("failed to fetch bgp state from %s: %v", pod.Name, err)
}

bgpPeers := make([]*models.BgpPeer, 0)

err = json.Unmarshal(output.Bytes(), &bgpPeers)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal bgp state from %s: %v", pod.Name, err)
}

return bgpPeers, nil
}

func (s *Status) writeStatus(res map[string][]*models.BgpPeer) error {
if s.params.Output == status.OutputJSON {
jsonStatus, err := json.MarshalIndent(res, "", " ")
if err != nil {
return err
}
fmt.Println(string(jsonStatus))
} else {
printSummary(os.Stdout, res)
}

return nil
}

func printSummary(out io.Writer, peersPerNode map[string][]*models.BgpPeer) {
// sort by node names
var nodes []string
for node := range peersPerNode {
nodes = append(nodes, node)
}
sort.Strings(nodes)

// sort peers per node
for _, peers := range peersPerNode {
// sort by local AS, if peers from same AS then sort by peer address.
sort.Slice(peers, func(i, j int) bool {
return peers[i].LocalAsn < peers[j].LocalAsn || peers[i].PeerAddress < peers[j].PeerAddress
})
}

// tab writer with min width 5 and padding 3
w := tabwriter.NewWriter(out, minWidth, 0, padding, paddingChar, 0)
fmt.Fprintln(w, "Node\tLocal AS\tPeer AS\tPeer Address\tSession State\tUptime\tFamily\tReceived\tAdvertised")

for _, node := range nodes {
peers := peersPerNode[node]

for i, peer := range peers {
if i == 0 {
// print name for first row of peers
fmt.Fprintf(w, "%s\t", node)
} else {
// skip name for all rest of the peers
fmt.Fprint(w, "\t")
}

fmt.Fprintf(w, "%d\t", peer.LocalAsn)
fmt.Fprintf(w, "%d\t", peer.PeerAsn)
fmt.Fprintf(w, "%s\t", peer.PeerAddress)
fmt.Fprintf(w, "%s\t", peer.SessionState)
fmt.Fprintf(w, "%s\t", time.Duration(peer.UptimeNanoseconds).Round(time.Second).String())

for j, afisafi := range peer.Families {
if j > 0 {
// skip space for session info
fmt.Fprint(w, strings.Repeat("\t", 6))
}

fmt.Fprintf(w, "%s/%s\t", afisafi.Afi, afisafi.Safi)
fmt.Fprintf(w, "%d\t", afisafi.Received)
fmt.Fprintf(w, "%d\t", afisafi.Advertised)
fmt.Fprintf(w, "\n")
}
}
}
w.Flush()
}
Loading

0 comments on commit ac9bcfa

Please sign in to comment.