Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BGP: Introduce new cli to get bgp peering state #1500

Merged
merged 1 commit into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# @cilium/cli Commandline interfaces
# @cilium/contributing Developer documentation & tools
# @cilium/github-sec GitHub security (handling of secrets, consequences of pull_request_target, etc.)
# @cilium/sig-bgp BGP integration
# @cilium/sig-clustermesh Clustermesh and external workloads
# @cilium/sig-hubble Hubble integration
# @cilium/sig-k8s K8s integration, K8s CNI plugin
Expand All @@ -22,6 +23,7 @@
/.github/kind-config*.yaml @cilium/ci-structure
/.github/tools/ @cilium/ci-structure
/.github/workflows/ @cilium/github-sec @cilium/ci-structure
/bgp/ @cilium/sig-bgp
/cmd/ @cilium/cli
/clustermesh/ @cilium/sig-clustermesh
/connectivity/ @cilium/cli
Expand Down
61 changes: 61 additions & 0 deletions bgp/bgp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package bgp

import (
"context"
"fmt"
"io"
"time"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/cilium/cilium-cli/k8s"
)

// Parameters contains options for CLI
type Parameters struct {
CiliumNamespace string
AgentPodSelector string
NodeName string
Writer io.Writer
WaitDuration time.Duration
Output string
}

// Status is used to get bgp state from cilium agents
type Status struct {
client *k8s.Client
params Parameters
ciliumPods []*corev1.Pod
}

// NewStatus returns new bgp.Status struct
func NewStatus(client *k8s.Client, p Parameters) *Status {
return &Status{
client: client,
params: p,
}
}

// initTargetCiliumPods stores cilium agent pods in the status.ciliumPods.
// If node selector option is specified then only that nodes' cilium-agent
// pod is stored else all cilium-agents in the cluster are stored.
func (s *Status) initTargetCiliumPods(ctx context.Context) error {
opts := metav1.ListOptions{LabelSelector: s.params.AgentPodSelector}
if s.params.NodeName != "" {
opts.FieldSelector = fmt.Sprintf("spec.nodeName=%s", s.params.NodeName)
}

ciliumPods, err := s.client.ListPods(ctx, s.params.CiliumNamespace, opts)
if err != nil {
return fmt.Errorf("unable to list Cilium pods: %w", err)
}

for _, ciliumPod := range ciliumPods.Items {
s.ciliumPods = append(s.ciliumPods, ciliumPod.DeepCopy())
}
return nil
}
183 changes: 183 additions & 0 deletions bgp/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package bgp

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"

"github.com/cilium/cilium/api/v1/models"
corev1 "k8s.io/api/core/v1"

"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/status"
)

const (
padding = 3
minWidth = 5
paddingChar = ' '
)

// GetPeeringState gets peering state from all/specific cilium agent pods.
func (s *Status) GetPeeringState(ctx context.Context) error {
ctx, cancelFn := context.WithTimeout(ctx, s.params.WaitDuration)
defer cancelFn()

err := s.initTargetCiliumPods(ctx)
if err != nil {
return err
}

res, err := s.fetchPeeringStateConcurrently(ctx)
if err != nil {
return err
}

return s.writeStatus(res)
}

func (s *Status) fetchPeeringStateConcurrently(ctx context.Context) (map[string][]*models.BgpPeer, error) {
allFetchedData := make(map[string][]*models.BgpPeer)

// res contains data returned from cilium pod
type res struct {
nodeName string
data []*models.BgpPeer
err error
}
resCh := make(chan res)

var wg sync.WaitGroup

// max number of concurrent go routines will be number of cilium agent pods
wg.Add(len(s.ciliumPods))

// concurrently fetch state from each cilium pod
for _, pod := range s.ciliumPods {
go func(ctx context.Context, pod *corev1.Pod) {
defer wg.Done()

peers, err := s.fetchPeeringStateFromPod(ctx, pod)
resCh <- res{
nodeName: pod.Spec.NodeName,
data: peers,
err: err,
}
}(ctx, pod)
}

// close resCh when data from all nodes is collected
go func() {
wg.Wait()
close(resCh)
}()

// read from the channel till it is closed.
// on error, store error and continue to next node.
var err error
for fetchedData := range resCh {
if fetchedData.err != nil {
err = errors.Join(err, fetchedData.err)
} else {
allFetchedData[fetchedData.nodeName] = fetchedData.data
}
}

return allFetchedData, err
}

func (s *Status) fetchPeeringStateFromPod(ctx context.Context, pod *corev1.Pod) ([]*models.BgpPeer, error) {
cmd := []string{"cilium", "bgp", "peers", "-o", "json"}
output, err := s.client.ExecInPod(ctx, pod.Namespace, pod.Name, defaults.AgentContainerName, cmd)
if err != nil {
return nil, fmt.Errorf("failed to fetch bgp state from %s: %v", pod.Name, err)
}

bgpPeers := make([]*models.BgpPeer, 0)

err = json.Unmarshal(output.Bytes(), &bgpPeers)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal bgp state from %s: %v", pod.Name, err)
}

return bgpPeers, nil
}

func (s *Status) writeStatus(res map[string][]*models.BgpPeer) error {
if s.params.Output == status.OutputJSON {
jsonStatus, err := json.MarshalIndent(res, "", " ")
if err != nil {
return err
}
fmt.Println(string(jsonStatus))
} else {
printSummary(os.Stdout, res)
}

return nil
}

func printSummary(out io.Writer, peersPerNode map[string][]*models.BgpPeer) {
// sort by node names
var nodes []string
for node := range peersPerNode {
nodes = append(nodes, node)
}
sort.Strings(nodes)

// sort peers per node
for _, peers := range peersPerNode {
// sort by local AS, if peers from same AS then sort by peer address.
sort.Slice(peers, func(i, j int) bool {
return peers[i].LocalAsn < peers[j].LocalAsn || peers[i].PeerAddress < peers[j].PeerAddress
})
}

// tab writer with min width 5 and padding 3
w := tabwriter.NewWriter(out, minWidth, 0, padding, paddingChar, 0)
fmt.Fprintln(w, "Node\tLocal AS\tPeer AS\tPeer Address\tSession State\tUptime\tFamily\tReceived\tAdvertised")

for _, node := range nodes {
peers := peersPerNode[node]

for i, peer := range peers {
if i == 0 {
// print name for first row of peers
fmt.Fprintf(w, "%s\t", node)
} else {
// skip name for all rest of the peers
fmt.Fprint(w, "\t")
}

fmt.Fprintf(w, "%d\t", peer.LocalAsn)
fmt.Fprintf(w, "%d\t", peer.PeerAsn)
fmt.Fprintf(w, "%s\t", peer.PeerAddress)
fmt.Fprintf(w, "%s\t", peer.SessionState)
fmt.Fprintf(w, "%s\t", time.Duration(peer.UptimeNanoseconds).Round(time.Second).String())

for j, afisafi := range peer.Families {
if j > 0 {
// skip space for session info
fmt.Fprint(w, strings.Repeat("\t", 6))
}

fmt.Fprintf(w, "%s/%s\t", afisafi.Afi, afisafi.Safi)
fmt.Fprintf(w, "%d\t", afisafi.Received)
fmt.Fprintf(w, "%d\t", afisafi.Advertised)
fmt.Fprintf(w, "\n")
}
}
}
w.Flush()
}
Loading