Skip to content

Commit

Permalink
clustermesh: Extend status command with connectivity status
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Graf <[email protected]>
  • Loading branch information
tgraf committed Jan 13, 2021
1 parent 6de22f4 commit 8c6b059
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 39 deletions.
284 changes: 247 additions & 37 deletions clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/internal/certs"
"github.com/cilium/cilium-cli/internal/k8s"
"github.com/cilium/cilium-cli/status"

"github.com/cilium/cilium/api/v1/models"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
Expand All @@ -50,6 +52,8 @@ var (
deploymentMaxSurge = intstr.FromInt(1)
deploymentMaxUnavailable = intstr.FromInt(1)
secretDefaultMode = int32(420)

retryInterval = 2 * time.Second
)

var clusterRole = &rbacv1.ClusterRole{
Expand Down Expand Up @@ -404,26 +408,30 @@ type k8sClusterMeshImplementation interface {
DeleteService(ctx context.Context, namespace, name string, opts metav1.DeleteOptions) error
GetService(ctx context.Context, namespace, name string, opts metav1.GetOptions) (*corev1.Service, error)
PatchDaemonSet(ctx context.Context, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*appsv1.DaemonSet, error)
GetDaemonSet(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.DaemonSet, error)
ListNodes(ctx context.Context, options metav1.ListOptions) (*corev1.NodeList, error)
ListPods(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.PodList, error)
AutodetectFlavor(ctx context.Context) (k8s.Flavor, error)
CiliumStatus(ctx context.Context, namespace, pod string) (*models.StatusResponse, error)
ClusterName() string
}

type K8sClusterMesh struct {
client k8sClusterMeshImplementation
certManager *certs.CertManager
flavor k8s.Flavor
params Parameters
clusterName string
clusterID string
client k8sClusterMeshImplementation
certManager *certs.CertManager
statusCollector *status.K8sStatusCollector
flavor k8s.Flavor
params Parameters
clusterName string
clusterID string
}

type Parameters struct {
Namespace string
ServiceType string
DestinationContext string
Wait bool
WaitTime time.Duration
WaitDuration time.Duration
DestinationEndpoints []string
SourceEndpoints []string
SkipServiceCheck bool
Expand All @@ -432,8 +440,8 @@ type Parameters struct {
}

func (p Parameters) waitTimeout() time.Duration {
if p.WaitTime != time.Duration(0) {
return p.WaitTime
if p.WaitDuration != time.Duration(0) {
return p.WaitDuration
}

return time.Minute * 15
Expand Down Expand Up @@ -865,61 +873,263 @@ func (k *K8sClusterMesh) Disconnect(ctx context.Context) error {
return nil
}

type status struct {
accessInformation *accessInformation
service *corev1.Service
type Status struct {
AccessInformation *accessInformation
Service *corev1.Service
Connectivity *ConnectivityStatus
}

func (k *K8sClusterMesh) status(ctx context.Context) (*status, error) {
var (
err error
s = &status{}
)
func (k *K8sClusterMesh) statusAccessInformation(ctx context.Context) (*accessInformation, error) {
retry:
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

s.accessInformation, err = k.extractAccessInformation(ctx, k.client, []string{}, false)
if err != nil {
return nil, err
ai, err := k.extractAccessInformation(ctx, k.client, []string{}, false)
if err != nil && k.params.Wait {
time.Sleep(retryInterval)
goto retry
}

return ai, err
}

func (k *K8sClusterMesh) statusService(ctx context.Context) (*corev1.Service, error) {
retry:
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

svc, err := k.client.GetService(ctx, k.params.Namespace, defaults.ClusterMeshServiceName, metav1.GetOptions{})
if err != nil {
if k.params.Wait {
time.Sleep(retryInterval)
goto retry
}

return nil, fmt.Errorf("clustermesh-apiserver cannot be found: %w", err)
}

s.service = svc
return svc, nil
}

if svc.Spec.Type == corev1.ServiceTypeLoadBalancer {
if len(s.accessInformation.ServiceIPs) == 0 {
return nil, fmt.Errorf("No IP available to reach cluster")
type StatisticalStatus struct {
Min int64
Avg float64
Max int64
}

type ClusterStats struct {
Configured int
Connected int
}

type ConnectivityStatus struct {
GlobalServices StatisticalStatus
Connected StatisticalStatus
Clusters map[string]*ClusterStats
Total int64
NotReady int64
Errors status.ErrorCountMapMap
}

func (c *ConnectivityStatus) addError(pod, cluster string, err error) {
m := c.Errors[pod]
if m == nil {
m = status.ErrorCountMap{}
c.Errors[pod] = m
}

if m[cluster] == nil {
m[cluster] = &status.ErrorCount{}
}

m[cluster].Errors = append(m[cluster].Errors, err)
}

func (c *ConnectivityStatus) parseAgentStatus(name string, s *status.ClusterMeshAgentConnectivityStatus) {
if c.GlobalServices.Min < 0 || c.GlobalServices.Min > s.GlobalServices {
c.GlobalServices.Min = s.GlobalServices
}

if c.GlobalServices.Max < s.GlobalServices {
c.GlobalServices.Max = s.GlobalServices
}

c.GlobalServices.Avg += float64(s.GlobalServices)
c.Total++

ready := int64(0)
for _, cluster := range s.Clusters {
stats, ok := c.Clusters[cluster.Name]
if !ok {
stats = &ClusterStats{}
c.Clusters[cluster.Name] = stats
}

stats.Configured++

if cluster.Ready {
ready++
stats.Connected++
} else {
c.addError(name, cluster.Name, fmt.Errorf("cluster is not ready: %s", cluster.Status))
}
}

return s, nil
if ready != int64(len(s.Clusters)) {
c.NotReady++
}

if c.Connected.Min < 0 || c.Connected.Min > ready {
c.Connected.Min = ready
}

if c.Connected.Max < ready {
c.Connected.Max = ready
}

c.Connected.Avg += float64(ready)
}

func (k *K8sClusterMesh) statusConnectivity(ctx context.Context) (*ConnectivityStatus, error) {
status := &ConnectivityStatus{
GlobalServices: StatisticalStatus{Min: -1},
Connected: StatisticalStatus{Min: -1},
Errors: status.ErrorCountMapMap{},
Clusters: map[string]*ClusterStats{},
}
retry:
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

pods, err := k.client.ListPods(ctx, k.params.Namespace, metav1.ListOptions{LabelSelector: "k8s-app=cilium"})
if err != nil {
if k.params.Wait {
time.Sleep(retryInterval)
goto retry
}

return nil, fmt.Errorf("unable to list cilium pods: %w", err)
}

for _, pod := range pods.Items {
s, err := k.statusCollector.ClusterMeshConnectivity(ctx, pod.Name)
if err != nil {
return nil, fmt.Errorf("unable to determine status of cilium pod %q: %w", pod.Name, err)
}

status.parseAgentStatus(pod.Name, s)
}

status.GlobalServices.Avg /= float64(len(pods.Items))
status.Connected.Avg /= float64(len(pods.Items))

return status, nil
}

func (k *K8sClusterMesh) Status(ctx context.Context) error {
func (k *K8sClusterMesh) Status(ctx context.Context, log bool) (*Status, error) {
var (
s *status
err error
s = &Status{}
)

collector, err := status.NewK8sStatusCollector(ctx, k.client, status.K8sStatusParameters{
Namespace: k.params.Namespace,
Wait: k.params.Wait,
WaitDuration: k.params.WaitDuration,
})
if err != nil {
return nil, fmt.Errorf("unable to create client to collect status: %w", err)
}

k.statusCollector = collector

ctx, cancel := context.WithTimeout(ctx, k.params.waitTimeout())
defer cancel()
retry:
s, err = k.status(ctx)

s.AccessInformation, err = k.statusAccessInformation(ctx)
if err != nil {
if k.params.Wait {
time.Sleep(2 * time.Second)
goto retry
return nil, err
}

if log {
k.Log("✅ Cluster access information is available:")
for _, ip := range s.AccessInformation.ServiceIPs {
k.Log(" - %s:%d", ip, s.AccessInformation.ServicePort)
}
}

return err
s.Service, err = k.statusService(ctx)
if err != nil {
return nil, err
}

k.Log("✅ Cluster can be connected to on endpoints:")
for _, ip := range s.accessInformation.ServiceIPs {
k.Log(" - %s:%d", ip, s.accessInformation.ServicePort)
if log {
k.Log("✅ Service %q of type %q found", defaults.ClusterMeshServiceName, s.Service.Spec.Type)
}

return nil
if s.Service.Spec.Type == corev1.ServiceTypeLoadBalancer {
if len(s.AccessInformation.ServiceIPs) == 0 {
if log {
k.Log("❌ Service is of type LoadBalancer but has no IPs assigned")
}
return nil, fmt.Errorf("No IP available to reach cluster")
}
}

s.Connectivity, err = k.statusConnectivity(ctx)
if err != nil {
return nil, err
}

if log {
if s.Connectivity.NotReady > 0 {
k.Log("⚠️ %d/%d nodes are not connected to all clusters [min:%d / avg:%.1f / max:%d]",
s.Connectivity.NotReady,
s.Connectivity.Total,
s.Connectivity.Connected.Min,
s.Connectivity.Connected.Avg,
s.Connectivity.Connected.Max)
} else {
k.Log("✅ All %d nodes are connected to all clusters [min:%d / avg:%.1f / max:%d]",
s.Connectivity.Total,
s.Connectivity.Connected.Min,
s.Connectivity.Connected.Avg,
s.Connectivity.Connected.Max)
}

k.Log("🔌 Cluster Connections:")
for cluster, stats := range s.Connectivity.Clusters {
k.Log("- %s: %d/%d configured, %d/%d connected",
cluster, stats.Configured, s.Connectivity.Total,
stats.Connected, s.Connectivity.Total)
}

k.Log("🔀 Global services: [ min:%d / avg:%.1f / max:%d ]",
s.Connectivity.GlobalServices.Min,
s.Connectivity.GlobalServices.Avg,
s.Connectivity.GlobalServices.Max)

if len(s.Connectivity.Errors) > 0 {
k.Log("❌ %d Errors:", len(s.Connectivity.Errors))

for podName, clusters := range s.Connectivity.Errors {
for clusterName, a := range clusters {
for _, err := range a.Errors {
k.Log("❌ %s is not connected to cluster %s: %s", podName, clusterName, err)
}
}
}
}
}

return s, nil
}
5 changes: 3 additions & 2 deletions internal/cli/cmd/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,15 @@ func newCmdClusterMeshStatus() *cobra.Command {
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
cm := clustermesh.NewK8sClusterMesh(k8sClient, params)
return cm.Status(context.Background())
_, err := cm.Status(context.Background(), true)
return err
},
}

cmd.Flags().StringVar(&params.Namespace, "namespace", "kube-system", "Namespace Cilium is running in")
cmd.Flags().StringVar(&contextName, "context", "", "Kubernetes configuration context")
cmd.Flags().BoolVar(&params.Wait, "wait", false, "Wait until status is successful")
cmd.Flags().DurationVar(&params.WaitTime, "wait-duration", 15*time.Minute, "Maximum time to wait")
cmd.Flags().DurationVar(&params.WaitDuration, "wait-duration", 15*time.Minute, "Maximum time to wait")
cmd.Flags().BoolVar(&params.SkipServiceCheck, "skip-service-check", false, "Do not require service IP of remote cluster to be available")

return cmd
Expand Down
Loading

0 comments on commit 8c6b059

Please sign in to comment.