diff --git a/clustermesh/clustermesh.go b/clustermesh/clustermesh.go index b338d97b1f..c6f4e26d5d 100644 --- a/clustermesh/clustermesh.go +++ b/clustermesh/clustermesh.go @@ -27,7 +27,9 @@ import ( "github.com/cilium/cilium-cli/defaults" "github.com/cilium/cilium-cli/internal/certs" "github.com/cilium/cilium-cli/internal/k8s" + "github.com/cilium/cilium-cli/status" + "github.com/cilium/cilium/api/v1/models" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -50,6 +52,8 @@ var ( deploymentMaxSurge = intstr.FromInt(1) deploymentMaxUnavailable = intstr.FromInt(1) secretDefaultMode = int32(420) + + retryInterval = 2 * time.Second ) var clusterRole = &rbacv1.ClusterRole{ @@ -404,18 +408,22 @@ type k8sClusterMeshImplementation interface { DeleteService(ctx context.Context, namespace, name string, opts metav1.DeleteOptions) error GetService(ctx context.Context, namespace, name string, opts metav1.GetOptions) (*corev1.Service, error) PatchDaemonSet(ctx context.Context, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*appsv1.DaemonSet, error) + GetDaemonSet(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.DaemonSet, error) ListNodes(ctx context.Context, options metav1.ListOptions) (*corev1.NodeList, error) + ListPods(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.PodList, error) AutodetectFlavor(ctx context.Context) (k8s.Flavor, error) + CiliumStatus(ctx context.Context, namespace, pod string) (*models.StatusResponse, error) ClusterName() string } type K8sClusterMesh struct { - client k8sClusterMeshImplementation - certManager *certs.CertManager - flavor k8s.Flavor - params Parameters - clusterName string - clusterID string + client k8sClusterMeshImplementation + certManager *certs.CertManager + statusCollector *status.K8sStatusCollector + flavor k8s.Flavor + params Parameters + clusterName string + clusterID string } type Parameters struct { @@ -423,7 +431,7 @@ type Parameters struct { ServiceType string DestinationContext string Wait bool - WaitTime time.Duration + WaitDuration time.Duration DestinationEndpoints []string SourceEndpoints []string SkipServiceCheck bool @@ -432,8 +440,8 @@ type Parameters struct { } func (p Parameters) waitTimeout() time.Duration { - if p.WaitTime != time.Duration(0) { - return p.WaitTime + if p.WaitDuration != time.Duration(0) { + return p.WaitDuration } return time.Minute * 15 @@ -865,61 +873,263 @@ func (k *K8sClusterMesh) Disconnect(ctx context.Context) error { return nil } -type status struct { - accessInformation *accessInformation - service *corev1.Service +type Status struct { + AccessInformation *accessInformation + Service *corev1.Service + Connectivity *ConnectivityStatus } -func (k *K8sClusterMesh) status(ctx context.Context) (*status, error) { - var ( - err error - s = &status{} - ) +func (k *K8sClusterMesh) statusAccessInformation(ctx context.Context) (*accessInformation, error) { +retry: + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } - s.accessInformation, err = k.extractAccessInformation(ctx, k.client, []string{}, false) - if err != nil { - return nil, err + ai, err := k.extractAccessInformation(ctx, k.client, []string{}, false) + if err != nil && k.params.Wait { + time.Sleep(retryInterval) + goto retry + } + + return ai, err +} + +func (k *K8sClusterMesh) statusService(ctx context.Context) (*corev1.Service, error) { +retry: + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: } svc, err := k.client.GetService(ctx, k.params.Namespace, defaults.ClusterMeshServiceName, metav1.GetOptions{}) if err != nil { + if k.params.Wait { + time.Sleep(retryInterval) + goto retry + } + return nil, fmt.Errorf("clustermesh-apiserver cannot be found: %w", err) } - s.service = svc + return svc, nil +} - if svc.Spec.Type == corev1.ServiceTypeLoadBalancer { - if len(s.accessInformation.ServiceIPs) == 0 { - return nil, fmt.Errorf("No IP available to reach cluster") +type StatisticalStatus struct { + Min int64 + Avg float64 + Max int64 +} + +type ClusterStats struct { + Configured int + Connected int +} + +type ConnectivityStatus struct { + GlobalServices StatisticalStatus + Connected StatisticalStatus + Clusters map[string]*ClusterStats + Total int64 + NotReady int64 + Errors status.ErrorCountMapMap +} + +func (c *ConnectivityStatus) addError(pod, cluster string, err error) { + m := c.Errors[pod] + if m == nil { + m = status.ErrorCountMap{} + c.Errors[pod] = m + } + + if m[cluster] == nil { + m[cluster] = &status.ErrorCount{} + } + + m[cluster].Errors = append(m[cluster].Errors, err) +} + +func (c *ConnectivityStatus) parseAgentStatus(name string, s *status.ClusterMeshAgentConnectivityStatus) { + if c.GlobalServices.Min < 0 || c.GlobalServices.Min > s.GlobalServices { + c.GlobalServices.Min = s.GlobalServices + } + + if c.GlobalServices.Max < s.GlobalServices { + c.GlobalServices.Max = s.GlobalServices + } + + c.GlobalServices.Avg += float64(s.GlobalServices) + c.Total++ + + ready := int64(0) + for _, cluster := range s.Clusters { + stats, ok := c.Clusters[cluster.Name] + if !ok { + stats = &ClusterStats{} + c.Clusters[cluster.Name] = stats + } + + stats.Configured++ + + if cluster.Ready { + ready++ + stats.Connected++ + } else { + c.addError(name, cluster.Name, fmt.Errorf("cluster is not ready: %s", cluster.Status)) } } - return s, nil + if ready != int64(len(s.Clusters)) { + c.NotReady++ + } + + if c.Connected.Min < 0 || c.Connected.Min > ready { + c.Connected.Min = ready + } + + if c.Connected.Max < ready { + c.Connected.Max = ready + } + + c.Connected.Avg += float64(ready) +} + +func (k *K8sClusterMesh) statusConnectivity(ctx context.Context) (*ConnectivityStatus, error) { + status := &ConnectivityStatus{ + GlobalServices: StatisticalStatus{Min: -1}, + Connected: StatisticalStatus{Min: -1}, + Errors: status.ErrorCountMapMap{}, + Clusters: map[string]*ClusterStats{}, + } +retry: + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + pods, err := k.client.ListPods(ctx, k.params.Namespace, metav1.ListOptions{LabelSelector: "k8s-app=cilium"}) + if err != nil { + if k.params.Wait { + time.Sleep(retryInterval) + goto retry + } + + return nil, fmt.Errorf("unable to list cilium pods: %w", err) + } + + for _, pod := range pods.Items { + s, err := k.statusCollector.ClusterMeshConnectivity(ctx, pod.Name) + if err != nil { + return nil, fmt.Errorf("unable to determine status of cilium pod %q: %w", pod.Name, err) + } + + status.parseAgentStatus(pod.Name, s) + } + + status.GlobalServices.Avg /= float64(len(pods.Items)) + status.Connected.Avg /= float64(len(pods.Items)) + + return status, nil } -func (k *K8sClusterMesh) Status(ctx context.Context) error { +func (k *K8sClusterMesh) Status(ctx context.Context, log bool) (*Status, error) { var ( - s *status err error + s = &Status{} ) + collector, err := status.NewK8sStatusCollector(ctx, k.client, status.K8sStatusParameters{ + Namespace: k.params.Namespace, + Wait: k.params.Wait, + WaitDuration: k.params.WaitDuration, + }) + if err != nil { + return nil, fmt.Errorf("unable to create client to collect status: %w", err) + } + + k.statusCollector = collector + ctx, cancel := context.WithTimeout(ctx, k.params.waitTimeout()) defer cancel() -retry: - s, err = k.status(ctx) + + s.AccessInformation, err = k.statusAccessInformation(ctx) if err != nil { - if k.params.Wait { - time.Sleep(2 * time.Second) - goto retry + return nil, err + } + + if log { + k.Log("✅ Cluster access information is available:") + for _, ip := range s.AccessInformation.ServiceIPs { + k.Log(" - %s:%d", ip, s.AccessInformation.ServicePort) } + } - return err + s.Service, err = k.statusService(ctx) + if err != nil { + return nil, err } - k.Log("✅ Cluster can be connected to on endpoints:") - for _, ip := range s.accessInformation.ServiceIPs { - k.Log(" - %s:%d", ip, s.accessInformation.ServicePort) + if log { + k.Log("✅ Service %q of type %q found", defaults.ClusterMeshServiceName, s.Service.Spec.Type) } - return nil + if s.Service.Spec.Type == corev1.ServiceTypeLoadBalancer { + if len(s.AccessInformation.ServiceIPs) == 0 { + if log { + k.Log("❌ Service is of type LoadBalancer but has no IPs assigned") + } + return nil, fmt.Errorf("No IP available to reach cluster") + } + } + + s.Connectivity, err = k.statusConnectivity(ctx) + if err != nil { + return nil, err + } + + if log { + if s.Connectivity.NotReady > 0 { + k.Log("⚠️ %d/%d nodes are not connected to all clusters [min:%d / avg:%.1f / max:%d]", + s.Connectivity.NotReady, + s.Connectivity.Total, + s.Connectivity.Connected.Min, + s.Connectivity.Connected.Avg, + s.Connectivity.Connected.Max) + } else { + k.Log("✅ All %d nodes are connected to all clusters [min:%d / avg:%.1f / max:%d]", + s.Connectivity.Total, + s.Connectivity.Connected.Min, + s.Connectivity.Connected.Avg, + s.Connectivity.Connected.Max) + } + + k.Log("🔌 Cluster Connections:") + for cluster, stats := range s.Connectivity.Clusters { + k.Log("- %s: %d/%d configured, %d/%d connected", + cluster, stats.Configured, s.Connectivity.Total, + stats.Connected, s.Connectivity.Total) + } + + k.Log("🔀 Global services: [ min:%d / avg:%.1f / max:%d ]", + s.Connectivity.GlobalServices.Min, + s.Connectivity.GlobalServices.Avg, + s.Connectivity.GlobalServices.Max) + + if len(s.Connectivity.Errors) > 0 { + k.Log("❌ %d Errors:", len(s.Connectivity.Errors)) + + for podName, clusters := range s.Connectivity.Errors { + for clusterName, a := range clusters { + for _, err := range a.Errors { + k.Log("❌ %s is not connected to cluster %s: %s", podName, clusterName, err) + } + } + } + } + } + + return s, nil } diff --git a/internal/cli/cmd/clustermesh.go b/internal/cli/cmd/clustermesh.go index cf929bc0fc..83b4205499 100644 --- a/internal/cli/cmd/clustermesh.go +++ b/internal/cli/cmd/clustermesh.go @@ -141,14 +141,15 @@ func newCmdClusterMeshStatus() *cobra.Command { Long: ``, RunE: func(cmd *cobra.Command, args []string) error { cm := clustermesh.NewK8sClusterMesh(k8sClient, params) - return cm.Status(context.Background()) + _, err := cm.Status(context.Background(), true) + return err }, } cmd.Flags().StringVar(¶ms.Namespace, "namespace", "kube-system", "Namespace Cilium is running in") cmd.Flags().StringVar(&contextName, "context", "", "Kubernetes configuration context") cmd.Flags().BoolVar(¶ms.Wait, "wait", false, "Wait until status is successful") - cmd.Flags().DurationVar(¶ms.WaitTime, "wait-duration", 15*time.Minute, "Maximum time to wait") + cmd.Flags().DurationVar(¶ms.WaitDuration, "wait-duration", 15*time.Minute, "Maximum time to wait") cmd.Flags().BoolVar(¶ms.SkipServiceCheck, "skip-service-check", false, "Do not require service IP of remote cluster to be available") return cmd diff --git a/status/k8s.go b/status/k8s.go index 57cb244ad0..c3b56d2e2b 100644 --- a/status/k8s.go +++ b/status/k8s.go @@ -22,6 +22,7 @@ import ( "github.com/cilium/cilium-cli/defaults" "github.com/cilium/cilium/api/v1/models" + "github.com/go-openapi/strfmt" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,6 +34,8 @@ const ( relayDeploymentName = "hubble-relay" ) +var retryInterval = 2 * time.Second + type K8sStatusParameters struct { Namespace string Wait bool @@ -59,6 +62,75 @@ func NewK8sStatusCollector(ctx context.Context, client k8sImplementation, params }, nil } +type ClusterConnectivityInfo struct { + Ready bool + SharedServices int64 + Nodes int64 + Identities int64 + LastFailure strfmt.DateTime +} + +type ClusterMeshAgentConnectivityStatus struct { + GlobalServices int64 + Clusters map[string]*models.RemoteCluster + Errors ErrorCountMap +} + +func (k *K8sStatusCollector) ClusterMeshConnectivity(ctx context.Context, ciliumPod string) (*ClusterMeshAgentConnectivityStatus, error) { + ctx, cancel := context.WithTimeout(ctx, k.params.waitTimeout()) + defer cancel() + +retry: + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + s, err := k.clusterMeshConnectivity(ctx, ciliumPod) + if err != nil { + if k.params.Wait { + time.Sleep(retryInterval) + goto retry + } + } + + // If we are waiting for a successful status then all clusters need to + // be ready + if k.params.Wait { + for _, cluster := range s.Clusters { + if !cluster.Ready { + time.Sleep(retryInterval) + goto retry + } + } + } + + return s, err +} + +func (k *K8sStatusCollector) clusterMeshConnectivity(ctx context.Context, ciliumPod string) (*ClusterMeshAgentConnectivityStatus, error) { + c := &ClusterMeshAgentConnectivityStatus{ + Clusters: map[string]*models.RemoteCluster{}, + } + + status, err := k.client.CiliumStatus(ctx, k.params.Namespace, ciliumPod) + if err != nil { + return nil, fmt.Errorf("unable to determine cilium status: %w", err) + } + + if status.ClusterMesh == nil { + return nil, fmt.Errorf("ClusterMesh status is not available") + } + + c.GlobalServices = status.ClusterMesh.NumGlobalServices + for _, cluster := range status.ClusterMesh.Clusters { + c.Clusters[cluster.Name] = cluster + } + + return c, nil +} + func (k *K8sStatusCollector) deploymentStatus(ctx context.Context, status *Status, name string) error { d, err := k.client.GetDeployment(ctx, k.params.Namespace, name, metav1.GetOptions{}) if err != nil {