From fa7e30e1a5dbd1e67cd4fe7fac5a2d10d99535a0 Mon Sep 17 00:00:00 2001 From: Marco Iorio Date: Wed, 8 May 2024 14:37:31 +0200 Subject: [PATCH] clustermesh: output kvstoremesh status as part of clustermesh status Extend the cilium connectivity status output to additionally report the kvstoremesh status when such component is enabled. This simplifies determining whether everything is connected correctly and, in case of issues, where the problem actually is. Indeed, errors reported at the node level may be a consequence of kvstoremesh being unable to connect to the remote clusters. Signed-off-by: Marco Iorio --- clustermesh/clustermesh.go | 210 +++++++++++++++++++++++++++---------- defaults/defaults.go | 1 + k8s/client.go | 24 +++++ status/k8s.go | 21 ++++ status/k8s_test.go | 5 + 5 files changed, 208 insertions(+), 53 deletions(-) diff --git a/clustermesh/clustermesh.go b/clustermesh/clustermesh.go index dff551b0da..4cc8f8f796 100644 --- a/clustermesh/clustermesh.go +++ b/clustermesh/clustermesh.go @@ -5,6 +5,7 @@ package clustermesh import ( "bytes" + "cmp" "context" "crypto/x509" "encoding/base64" @@ -67,6 +68,7 @@ type k8sClusterMeshImplementation interface { ListPods(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.PodList, error) AutodetectFlavor(ctx context.Context) k8s.Flavor CiliumStatus(ctx context.Context, namespace, pod string) (*models.StatusResponse, error) + KVStoreMeshStatus(ctx context.Context, namespace, pod string) ([]*models.RemoteCluster, error) CiliumDbgEndpoints(ctx context.Context, namespace, pod string) ([]*models.Endpoint, error) ClusterName() string ListCiliumExternalWorkloads(ctx context.Context, opts metav1.ListOptions) (*ciliumv2.CiliumExternalWorkloadList, error) @@ -572,6 +574,10 @@ func (k *K8sClusterMesh) validateInfoForConnect(aiLocal, aiRemote *accessInforma type Status struct { AccessInformation *accessInformation `json:"access_information,omitempty"` Connectivity *ConnectivityStatus `json:"connectivity,omitempty"` + KVStoreMesh struct { + Enabled bool `json:"enabled"` + Status *ConnectivityStatus `json:"status,omitempty"` + } `json:"kvstoremesh,omitempty"` } func (k *K8sClusterMesh) statusAccessInformation(ctx context.Context, log bool, getExternalWorkloadSecret bool) (*accessInformation, error) { @@ -619,6 +625,21 @@ func (k *K8sClusterMesh) statusDeployment(ctx context.Context) (err error) { } } +func (k *K8sClusterMesh) kvstoremeshEnabled(ctx context.Context) (bool, error) { + deploy, err := k.client.GetDeployment(ctx, k.params.Namespace, defaults.ClusterMeshDeploymentName, metav1.GetOptions{}) + if err != nil { + return false, fmt.Errorf("failed checking whether KVStoreMesh is enabled: %w", err) + } + + for _, container := range deploy.Spec.Template.Spec.Containers { + if container.Name == defaults.ClusterMeshKVStoreMeshContainerName { + return true, nil + } + } + + return false, nil +} + type StatisticalStatus struct { Min int64 `json:"min,omitempty"` Avg float64 `json:"avg,omitempty"` @@ -737,34 +758,67 @@ func (c *ConnectivityStatus) parseAgentStatus(name string, expected []string, s c.Connected.Avg += float64(ready) } -func (k *K8sClusterMesh) statusConnectivity(ctx context.Context) (*ConnectivityStatus, error) { +func (k *K8sClusterMesh) statusConnectivity(ctx context.Context, checkKVStoreMesh bool) (agents, kvstoremesh *ConnectivityStatus, err error) { + var err1, err2 error + var checkAgents = true + w := wait.NewObserver(ctx, wait.Parameters{Log: func(err error, wait string) { k.Log("⌛ Waiting (%s) for clusters to be connected: %s", wait, err) }}) defer w.Cancel() for { - status, err := k.determineStatusConnectivity(ctx) + if checkAgents { + agents, err1 = k.determineStatusConnectivity( + ctx, defaults.ClusterMeshSecretName, defaults.AgentPodSelector, k.statusCollector.ClusterMeshConnectivity, + ) + } + + if checkKVStoreMesh { + kvstoremesh, err2 = k.determineStatusConnectivity( + ctx, defaults.ClusterMeshKVStoreMeshSecretName, defaults.ClusterMeshPodSelector, k.statusCollector.KVStoreMeshConnectivity, + ) + + if errors.Is(err2, k8s.ErrKVStoreMeshStatusNotImplemented) { + k.Log("⚠️ KVStoreMesh status retrieval is not supported by this Cilium version") + err2 = nil + } + } + if k.params.Wait { - if err == nil { - if status.NotReady > 0 { - err = fmt.Errorf("%d nodes are not ready", status.NotReady) + notReadyCheck := func(status *ConnectivityStatus, err error, component string) error { + if err == nil && status != nil { + if status.NotReady > 0 { + err = fmt.Errorf("%d %s are not ready", status.NotReady, component) + } } + return err } - if err != nil { + err1 = notReadyCheck(agents, err1, "nodes") + err2 = notReadyCheck(kvstoremesh, err2, "KVStoreMesh replicas") + + checkAgents = checkAgents && err1 != nil + checkKVStoreMesh = checkKVStoreMesh && err2 != nil + + if err := cmp.Or(err2, err1); err != nil { if err := w.Retry(err); err != nil { - return status, err + return agents, kvstoremesh, err } continue } } - return status, err + // Return only one error rather than both, as joined errors are displayed poorly. + // And Prefer the KVStoreMesh one, given that it implies the agents one as well + // in most cases. + return agents, kvstoremesh, cmp.Or(err2, err1) } } -func (k *K8sClusterMesh) determineStatusConnectivity(ctx context.Context) (*ConnectivityStatus, error) { +func (k *K8sClusterMesh) determineStatusConnectivity(ctx context.Context, secretName, selector string, + collector func(ctx context.Context, ciliumPod string) (*status.ClusterMeshAgentConnectivityStatus, error), +) (*ConnectivityStatus, error) { stats := &ConnectivityStatus{ GlobalServices: StatisticalStatus{Min: -1}, Connected: StatisticalStatus{Min: -1}, @@ -776,9 +830,9 @@ func (k *K8sClusterMesh) determineStatusConnectivity(ctx context.Context) (*Conn // as there's no guarantee that the secret has already propagated into the agents. // Don't fail in case the secret is not found, as it is legitimate if no cluster // has been connected yet. - config, err := k.client.GetSecret(ctx, k.params.Namespace, defaults.ClusterMeshSecretName, metav1.GetOptions{}) + config, err := k.client.GetSecret(ctx, k.params.Namespace, secretName, metav1.GetOptions{}) if err != nil && !k8serrors.IsNotFound(err) { - return nil, fmt.Errorf("unable to retrieve clustermesh configuration: %w", err) + return nil, fmt.Errorf("unable to retrieve configuration: %w", err) } var expected []string @@ -791,18 +845,18 @@ func (k *K8sClusterMesh) determineStatusConnectivity(ctx context.Context) (*Conn } } - pods, err := k.client.ListPods(ctx, k.params.Namespace, metav1.ListOptions{LabelSelector: defaults.AgentPodSelector}) + pods, err := k.client.ListPods(ctx, k.params.Namespace, metav1.ListOptions{LabelSelector: selector}) if err != nil { - return nil, fmt.Errorf("unable to list cilium pods: %w", err) + return nil, fmt.Errorf("unable to list pods: %w", err) } for _, pod := range pods.Items { - s, err := k.statusCollector.ClusterMeshConnectivity(ctx, pod.Name) + s, err := collector(ctx, pod.Name) if err != nil { if len(expected) == 0 && errors.Is(err, status.ErrClusterMeshStatusNotAvailable) { continue } - return nil, fmt.Errorf("unable to determine status of cilium pod %q: %w", pod.Name, err) + return nil, fmt.Errorf("unable to determine status of pod %q: %w", pod.Name, err) } stats.parseAgentStatus(pod.Name, expected, s) @@ -855,9 +909,20 @@ func (k *K8sClusterMesh) Status(ctx context.Context) (*Status, error) { return nil, err } k.Log("✅ Deployment %s is ready", defaults.ClusterMeshDeploymentName) + + s.KVStoreMesh.Enabled, err = k.kvstoremeshEnabled(ctx) + if err != nil { + return nil, err + } + + status := "disabled" + if s.KVStoreMesh.Enabled { + status = "enabled" + } + k.Log("ℹ️ KVStoreMesh is %s", status) } - s.Connectivity, err = k.statusConnectivity(ctx) + s.Connectivity, s.KVStoreMesh.Status, err = k.statusConnectivity(ctx, s.KVStoreMesh.Enabled) if k.params.Output == status.OutputJSON { jsonStatus, err := json.MarshalIndent(s, "", " ") @@ -869,47 +934,87 @@ func (k *K8sClusterMesh) Status(ctx context.Context) (*Status, error) { } if s.Connectivity != nil { - if s.Connectivity.NotReady > 0 { - k.Log("⚠️ %d/%d nodes are not connected to all clusters [min:%d / avg:%.1f / max:%d]", - s.Connectivity.NotReady, - s.Connectivity.Total, - s.Connectivity.Connected.Min, - s.Connectivity.Connected.Avg, - s.Connectivity.Connected.Max) - } else if len(s.Connectivity.Clusters) > 0 { - k.Log("✅ All %d nodes are connected to all clusters [min:%d / avg:%.1f / max:%d]", - s.Connectivity.Total, - s.Connectivity.Connected.Min, - s.Connectivity.Connected.Avg, - s.Connectivity.Connected.Max) + k.outputConnectivityStatus(s.Connectivity, s.KVStoreMesh.Status) + } + + return s, err +} + +func (k *K8sClusterMesh) outputConnectivityStatus(agents, kvstoremesh *ConnectivityStatus) { + outputStatusReady := func(status *ConnectivityStatus, component string) { + if status.NotReady > 0 { + k.Log("⚠️ %d/%d %s are not connected to all clusters [min:%d / avg:%.1f / max:%d]", + status.NotReady, + status.Total, + component, + status.Connected.Min, + status.Connected.Avg, + status.Connected.Max) + } else if len(status.Clusters) > 0 { + k.Log("✅ All %d %s are connected to all clusters [min:%d / avg:%.1f / max:%d]", + status.Total, + component, + status.Connected.Min, + status.Connected.Avg, + status.Connected.Max) } + } + + k.Log("") + outputStatusReady(agents, "nodes") + if kvstoremesh != nil { + outputStatusReady(kvstoremesh, "KVStoreMesh replicas") + } - if len(s.Connectivity.Clusters) > 0 { - k.Log("🔌 Cluster Connections:") - clusters := maps.Keys(s.Connectivity.Clusters) - sort.Strings(clusters) - for _, cluster := range clusters { - stats := s.Connectivity.Clusters[cluster] - k.Log(" - %s: %d/%d configured, %d/%d connected", - cluster, stats.Configured, s.Connectivity.Total, - stats.Connected, s.Connectivity.Total) + k.Log("") + if len(agents.Clusters) > 0 { + k.Log("🔌 Cluster Connections:") + clusters := maps.Keys(agents.Clusters) + sort.Strings(clusters) + for _, cluster := range clusters { + stats := agents.Clusters[cluster] + + line := fmt.Sprintf(" - %s: %d/%d configured, %d/%d connected", + cluster, stats.Configured, agents.Total, + stats.Connected, agents.Total) + + if kvstoremesh != nil { + stats, ok := kvstoremesh.Clusters[cluster] + if !ok { + line += " - KVStoreMesh: status not available" + } else { + line += fmt.Sprintf(" - KVStoreMesh: %d/%d configured, %d/%d connected", + stats.Configured, kvstoremesh.Total, + stats.Connected, kvstoremesh.Total) + } } - } else { - k.Log("🔌 No cluster connected") + + k.Log(line) } + } else { + k.Log("🔌 No cluster connected") + } + + k.Log("") + k.Log("🔀 Global services: [ min:%d / avg:%.1f / max:%d ]", + agents.GlobalServices.Min, + agents.GlobalServices.Avg, + agents.GlobalServices.Max) - k.Log("🔀 Global services: [ min:%d / avg:%.1f / max:%d ]", - s.Connectivity.GlobalServices.Min, - s.Connectivity.GlobalServices.Avg, - s.Connectivity.GlobalServices.Max) + k.Log("") + errCount := len(agents.Errors) + if kvstoremesh != nil { + errCount += len(kvstoremesh.Errors) + } - if len(s.Connectivity.Errors) > 0 { - k.Log("❌ %d Errors:", len(s.Connectivity.Errors)) + if errCount > 0 { + k.Log("❌ %d Errors:", errCount) - podNames := maps.Keys(s.Connectivity.Errors) + outputErrors := func(errs status.ErrorCountMapMap) { + podNames := maps.Keys(errs) sort.Strings(podNames) for _, podName := range podNames { - clusters := s.Connectivity.Errors[podName] + clusters := errs[podName] for clusterName, a := range clusters { for _, err := range a.Errors { k.Log(" ❌ %s is not connected to cluster %s: %s", podName, clusterName, err) @@ -917,13 +1022,12 @@ func (k *K8sClusterMesh) Status(ctx context.Context) (*Status, error) { } } } - } - if err != nil { - return nil, err + outputErrors(agents.Errors) + if kvstoremesh != nil { + outputErrors(kvstoremesh.Errors) + } } - - return s, nil } func (k *K8sClusterMesh) CreateExternalWorkload(ctx context.Context, names []string) error { diff --git a/defaults/defaults.go b/defaults/defaults.go index d5f493324a..c011bd1a4e 100644 --- a/defaults/defaults.go +++ b/defaults/defaults.go @@ -47,6 +47,7 @@ const ( ClusterMeshEtcdMetricsPortName = "etcd-metrics" ClusterMeshServiceName = "clustermesh-apiserver" ClusterMeshSecretName = "cilium-clustermesh" // Secret which contains the clustermesh configuration + ClusterMeshKVStoreMeshSecretName = "cilium-kvstoremesh" // Secret which contains the kvstoremesh configuration ClusterMeshServerSecretName = "clustermesh-apiserver-server-cert" ClusterMeshAdminSecretName = "clustermesh-apiserver-admin-cert" ClusterMeshClientSecretName = "clustermesh-apiserver-client-cert" diff --git a/k8s/client.go b/k8s/client.go index 1d01ae9fcc..385146fc20 100644 --- a/k8s/client.go +++ b/k8s/client.go @@ -392,6 +392,30 @@ func (c *Client) CiliumStatus(ctx context.Context, namespace, pod string) (*mode return &statusResponse, nil } +// KVStoreMeshStatusNotImplemented is a sentinel error to signal that the status command is not implemented. +var ErrKVStoreMeshStatusNotImplemented = errors.New("kvstoremesh-dbg status is not available") + +func (c *Client) KVStoreMeshStatus(ctx context.Context, namespace, pod string) ([]*models.RemoteCluster, error) { + stdout, stderr, err := c.ExecInPodWithStderr(ctx, namespace, pod, defaults.ClusterMeshKVStoreMeshContainerName, + []string{defaults.ClusterMeshBinaryName, "kvstoremesh-dbg", "status", "-o", "json"}) + if err != nil { + // Try to figure out if the status command is not yet supported in this version + stderrStr := stderr.String() + if strings.Contains(stderrStr, "Usage:") || strings.Contains(stderrStr, "unknown command") { + return nil, ErrKVStoreMeshStatusNotImplemented + } + + return nil, err + } + + statusResponse := make([]*models.RemoteCluster, 0) + if err := json.Unmarshal(stdout.Bytes(), &statusResponse); err != nil { + return nil, fmt.Errorf("unable to unmarshal response of kvstoremesh-dbg status: %w", err) + } + + return statusResponse, nil +} + func (c *Client) CiliumDbgEndpoints(ctx context.Context, namespace, pod string) ([]*models.Endpoint, error) { stdout, err := c.ExecInPod(ctx, namespace, pod, defaults.AgentContainerName, []string{"cilium", "endpoint", "list", "-o", "json"}) if err != nil { diff --git a/status/k8s.go b/status/k8s.go index 01b63fba0a..0df728f259 100644 --- a/status/k8s.go +++ b/status/k8s.go @@ -64,6 +64,7 @@ type K8sStatusCollector struct { type k8sImplementation interface { CiliumStatus(ctx context.Context, namespace, pod string) (*models.StatusResponse, error) + KVStoreMeshStatus(ctx context.Context, namespace, pod string) ([]*models.RemoteCluster, error) CiliumDbgEndpoints(ctx context.Context, namespace, pod string) ([]*models.Endpoint, error) GetDaemonSet(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.DaemonSet, error) GetDeployment(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.Deployment, error) @@ -113,6 +114,26 @@ func (k *K8sStatusCollector) ClusterMeshConnectivity(ctx context.Context, cilium return c, nil } +func (k *K8sStatusCollector) KVStoreMeshConnectivity(ctx context.Context, pod string) (*ClusterMeshAgentConnectivityStatus, error) { + ctx, cancel := context.WithTimeout(ctx, k.params.waitTimeout()) + defer cancel() + + c := &ClusterMeshAgentConnectivityStatus{ + Clusters: map[string]*models.RemoteCluster{}, + } + + status, err := k.client.KVStoreMeshStatus(ctx, k.params.Namespace, pod) + if err != nil { + return nil, err + } + + for _, cluster := range status { + c.Clusters[cluster.Name] = cluster + } + + return c, nil +} + func (k *K8sStatusCollector) deploymentStatus(ctx context.Context, status *Status, name string) (bool, error) { d, err := k.client.GetDeployment(ctx, k.params.Namespace, name, metav1.GetOptions{}) if k8serrors.IsNotFound(err) { diff --git a/status/k8s_test.go b/status/k8s_test.go index 0b66a98998..cf16ade543 100644 --- a/status/k8s_test.go +++ b/status/k8s_test.go @@ -5,6 +5,7 @@ package status import ( "context" + "errors" "fmt" "testing" "time" @@ -155,6 +156,10 @@ func (c *k8sStatusMockClient) CiliumStatus(_ context.Context, _, pod string) (*m return s, nil } +func (c *k8sStatusMockClient) KVStoreMeshStatus(_ context.Context, _, _ string) ([]*models.RemoteCluster, error) { + return nil, errors.New("not implemented") +} + func (c *k8sStatusMockClient) CiliumDbgEndpoints(_ context.Context, _, _ string) ([]*models.Endpoint, error) { return nil, nil }