Skip to content

Commit

Permalink
clustermesh: output kvstoremesh status as part of clustermesh status
Browse files Browse the repository at this point in the history
Extend the cilium connectivity status output to additionally report the
kvstoremesh status when such component is enabled. This simplifies
determining whether everything is connected correctly and, in case of
issues, where the problem actually is. Indeed, errors reported at the
node level may be a consequence of kvstoremesh being unable to connect
to the remote clusters.

Signed-off-by: Marco Iorio <[email protected]>
  • Loading branch information
giorio94 authored and michi-covalent committed May 15, 2024
1 parent 1c7d1ac commit fa7e30e
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 53 deletions.
210 changes: 157 additions & 53 deletions clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package clustermesh

import (
"bytes"
"cmp"
"context"
"crypto/x509"
"encoding/base64"
Expand Down Expand Up @@ -67,6 +68,7 @@ type k8sClusterMeshImplementation interface {
ListPods(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.PodList, error)
AutodetectFlavor(ctx context.Context) k8s.Flavor
CiliumStatus(ctx context.Context, namespace, pod string) (*models.StatusResponse, error)
KVStoreMeshStatus(ctx context.Context, namespace, pod string) ([]*models.RemoteCluster, error)
CiliumDbgEndpoints(ctx context.Context, namespace, pod string) ([]*models.Endpoint, error)
ClusterName() string
ListCiliumExternalWorkloads(ctx context.Context, opts metav1.ListOptions) (*ciliumv2.CiliumExternalWorkloadList, error)
Expand Down Expand Up @@ -572,6 +574,10 @@ func (k *K8sClusterMesh) validateInfoForConnect(aiLocal, aiRemote *accessInforma
type Status struct {
AccessInformation *accessInformation `json:"access_information,omitempty"`
Connectivity *ConnectivityStatus `json:"connectivity,omitempty"`
KVStoreMesh struct {
Enabled bool `json:"enabled"`
Status *ConnectivityStatus `json:"status,omitempty"`
} `json:"kvstoremesh,omitempty"`
}

func (k *K8sClusterMesh) statusAccessInformation(ctx context.Context, log bool, getExternalWorkloadSecret bool) (*accessInformation, error) {
Expand Down Expand Up @@ -619,6 +625,21 @@ func (k *K8sClusterMesh) statusDeployment(ctx context.Context) (err error) {
}
}

func (k *K8sClusterMesh) kvstoremeshEnabled(ctx context.Context) (bool, error) {
deploy, err := k.client.GetDeployment(ctx, k.params.Namespace, defaults.ClusterMeshDeploymentName, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed checking whether KVStoreMesh is enabled: %w", err)
}

for _, container := range deploy.Spec.Template.Spec.Containers {
if container.Name == defaults.ClusterMeshKVStoreMeshContainerName {
return true, nil
}
}

return false, nil
}

type StatisticalStatus struct {
Min int64 `json:"min,omitempty"`
Avg float64 `json:"avg,omitempty"`
Expand Down Expand Up @@ -737,34 +758,67 @@ func (c *ConnectivityStatus) parseAgentStatus(name string, expected []string, s
c.Connected.Avg += float64(ready)
}

func (k *K8sClusterMesh) statusConnectivity(ctx context.Context) (*ConnectivityStatus, error) {
func (k *K8sClusterMesh) statusConnectivity(ctx context.Context, checkKVStoreMesh bool) (agents, kvstoremesh *ConnectivityStatus, err error) {
var err1, err2 error
var checkAgents = true

w := wait.NewObserver(ctx, wait.Parameters{Log: func(err error, wait string) {
k.Log("⌛ Waiting (%s) for clusters to be connected: %s", wait, err)
}})
defer w.Cancel()

for {
status, err := k.determineStatusConnectivity(ctx)
if checkAgents {
agents, err1 = k.determineStatusConnectivity(
ctx, defaults.ClusterMeshSecretName, defaults.AgentPodSelector, k.statusCollector.ClusterMeshConnectivity,
)
}

if checkKVStoreMesh {
kvstoremesh, err2 = k.determineStatusConnectivity(
ctx, defaults.ClusterMeshKVStoreMeshSecretName, defaults.ClusterMeshPodSelector, k.statusCollector.KVStoreMeshConnectivity,
)

if errors.Is(err2, k8s.ErrKVStoreMeshStatusNotImplemented) {
k.Log("⚠️ KVStoreMesh status retrieval is not supported by this Cilium version")
err2 = nil
}
}

if k.params.Wait {
if err == nil {
if status.NotReady > 0 {
err = fmt.Errorf("%d nodes are not ready", status.NotReady)
notReadyCheck := func(status *ConnectivityStatus, err error, component string) error {
if err == nil && status != nil {
if status.NotReady > 0 {
err = fmt.Errorf("%d %s are not ready", status.NotReady, component)
}
}
return err
}

if err != nil {
err1 = notReadyCheck(agents, err1, "nodes")
err2 = notReadyCheck(kvstoremesh, err2, "KVStoreMesh replicas")

checkAgents = checkAgents && err1 != nil
checkKVStoreMesh = checkKVStoreMesh && err2 != nil

if err := cmp.Or(err2, err1); err != nil {
if err := w.Retry(err); err != nil {
return status, err
return agents, kvstoremesh, err
}
continue
}
}

return status, err
// Return only one error rather than both, as joined errors are displayed poorly.
// And Prefer the KVStoreMesh one, given that it implies the agents one as well
// in most cases.
return agents, kvstoremesh, cmp.Or(err2, err1)
}
}

func (k *K8sClusterMesh) determineStatusConnectivity(ctx context.Context) (*ConnectivityStatus, error) {
func (k *K8sClusterMesh) determineStatusConnectivity(ctx context.Context, secretName, selector string,
collector func(ctx context.Context, ciliumPod string) (*status.ClusterMeshAgentConnectivityStatus, error),
) (*ConnectivityStatus, error) {
stats := &ConnectivityStatus{
GlobalServices: StatisticalStatus{Min: -1},
Connected: StatisticalStatus{Min: -1},
Expand All @@ -776,9 +830,9 @@ func (k *K8sClusterMesh) determineStatusConnectivity(ctx context.Context) (*Conn
// as there's no guarantee that the secret has already propagated into the agents.
// Don't fail in case the secret is not found, as it is legitimate if no cluster
// has been connected yet.
config, err := k.client.GetSecret(ctx, k.params.Namespace, defaults.ClusterMeshSecretName, metav1.GetOptions{})
config, err := k.client.GetSecret(ctx, k.params.Namespace, secretName, metav1.GetOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
return nil, fmt.Errorf("unable to retrieve clustermesh configuration: %w", err)
return nil, fmt.Errorf("unable to retrieve configuration: %w", err)
}

var expected []string
Expand All @@ -791,18 +845,18 @@ func (k *K8sClusterMesh) determineStatusConnectivity(ctx context.Context) (*Conn
}
}

pods, err := k.client.ListPods(ctx, k.params.Namespace, metav1.ListOptions{LabelSelector: defaults.AgentPodSelector})
pods, err := k.client.ListPods(ctx, k.params.Namespace, metav1.ListOptions{LabelSelector: selector})
if err != nil {
return nil, fmt.Errorf("unable to list cilium pods: %w", err)
return nil, fmt.Errorf("unable to list pods: %w", err)
}

for _, pod := range pods.Items {
s, err := k.statusCollector.ClusterMeshConnectivity(ctx, pod.Name)
s, err := collector(ctx, pod.Name)
if err != nil {
if len(expected) == 0 && errors.Is(err, status.ErrClusterMeshStatusNotAvailable) {
continue
}
return nil, fmt.Errorf("unable to determine status of cilium pod %q: %w", pod.Name, err)
return nil, fmt.Errorf("unable to determine status of pod %q: %w", pod.Name, err)
}

stats.parseAgentStatus(pod.Name, expected, s)
Expand Down Expand Up @@ -855,9 +909,20 @@ func (k *K8sClusterMesh) Status(ctx context.Context) (*Status, error) {
return nil, err
}
k.Log("✅ Deployment %s is ready", defaults.ClusterMeshDeploymentName)

s.KVStoreMesh.Enabled, err = k.kvstoremeshEnabled(ctx)
if err != nil {
return nil, err
}

status := "disabled"
if s.KVStoreMesh.Enabled {
status = "enabled"
}
k.Log("ℹ️ KVStoreMesh is %s", status)
}

s.Connectivity, err = k.statusConnectivity(ctx)
s.Connectivity, s.KVStoreMesh.Status, err = k.statusConnectivity(ctx, s.KVStoreMesh.Enabled)

if k.params.Output == status.OutputJSON {
jsonStatus, err := json.MarshalIndent(s, "", " ")
Expand All @@ -869,61 +934,100 @@ func (k *K8sClusterMesh) Status(ctx context.Context) (*Status, error) {
}

if s.Connectivity != nil {
if s.Connectivity.NotReady > 0 {
k.Log("⚠️ %d/%d nodes are not connected to all clusters [min:%d / avg:%.1f / max:%d]",
s.Connectivity.NotReady,
s.Connectivity.Total,
s.Connectivity.Connected.Min,
s.Connectivity.Connected.Avg,
s.Connectivity.Connected.Max)
} else if len(s.Connectivity.Clusters) > 0 {
k.Log("✅ All %d nodes are connected to all clusters [min:%d / avg:%.1f / max:%d]",
s.Connectivity.Total,
s.Connectivity.Connected.Min,
s.Connectivity.Connected.Avg,
s.Connectivity.Connected.Max)
k.outputConnectivityStatus(s.Connectivity, s.KVStoreMesh.Status)
}

return s, err
}

func (k *K8sClusterMesh) outputConnectivityStatus(agents, kvstoremesh *ConnectivityStatus) {
outputStatusReady := func(status *ConnectivityStatus, component string) {
if status.NotReady > 0 {
k.Log("⚠️ %d/%d %s are not connected to all clusters [min:%d / avg:%.1f / max:%d]",
status.NotReady,
status.Total,
component,
status.Connected.Min,
status.Connected.Avg,
status.Connected.Max)
} else if len(status.Clusters) > 0 {
k.Log("✅ All %d %s are connected to all clusters [min:%d / avg:%.1f / max:%d]",
status.Total,
component,
status.Connected.Min,
status.Connected.Avg,
status.Connected.Max)
}
}

k.Log("")
outputStatusReady(agents, "nodes")
if kvstoremesh != nil {
outputStatusReady(kvstoremesh, "KVStoreMesh replicas")
}

if len(s.Connectivity.Clusters) > 0 {
k.Log("🔌 Cluster Connections:")
clusters := maps.Keys(s.Connectivity.Clusters)
sort.Strings(clusters)
for _, cluster := range clusters {
stats := s.Connectivity.Clusters[cluster]
k.Log(" - %s: %d/%d configured, %d/%d connected",
cluster, stats.Configured, s.Connectivity.Total,
stats.Connected, s.Connectivity.Total)
k.Log("")
if len(agents.Clusters) > 0 {
k.Log("🔌 Cluster Connections:")
clusters := maps.Keys(agents.Clusters)
sort.Strings(clusters)
for _, cluster := range clusters {
stats := agents.Clusters[cluster]

line := fmt.Sprintf(" - %s: %d/%d configured, %d/%d connected",
cluster, stats.Configured, agents.Total,
stats.Connected, agents.Total)

if kvstoremesh != nil {
stats, ok := kvstoremesh.Clusters[cluster]
if !ok {
line += " - KVStoreMesh: status not available"
} else {
line += fmt.Sprintf(" - KVStoreMesh: %d/%d configured, %d/%d connected",
stats.Configured, kvstoremesh.Total,
stats.Connected, kvstoremesh.Total)
}
}
} else {
k.Log("🔌 No cluster connected")

k.Log(line)
}
} else {
k.Log("🔌 No cluster connected")
}

k.Log("")
k.Log("🔀 Global services: [ min:%d / avg:%.1f / max:%d ]",
agents.GlobalServices.Min,
agents.GlobalServices.Avg,
agents.GlobalServices.Max)

k.Log("🔀 Global services: [ min:%d / avg:%.1f / max:%d ]",
s.Connectivity.GlobalServices.Min,
s.Connectivity.GlobalServices.Avg,
s.Connectivity.GlobalServices.Max)
k.Log("")
errCount := len(agents.Errors)
if kvstoremesh != nil {
errCount += len(kvstoremesh.Errors)
}

if len(s.Connectivity.Errors) > 0 {
k.Log("❌ %d Errors:", len(s.Connectivity.Errors))
if errCount > 0 {
k.Log("❌ %d Errors:", errCount)

podNames := maps.Keys(s.Connectivity.Errors)
outputErrors := func(errs status.ErrorCountMapMap) {
podNames := maps.Keys(errs)
sort.Strings(podNames)
for _, podName := range podNames {
clusters := s.Connectivity.Errors[podName]
clusters := errs[podName]
for clusterName, a := range clusters {
for _, err := range a.Errors {
k.Log(" ❌ %s is not connected to cluster %s: %s", podName, clusterName, err)
}
}
}
}
}

if err != nil {
return nil, err
outputErrors(agents.Errors)
if kvstoremesh != nil {
outputErrors(kvstoremesh.Errors)
}
}

return s, nil
}

func (k *K8sClusterMesh) CreateExternalWorkload(ctx context.Context, names []string) error {
Expand Down
1 change: 1 addition & 0 deletions defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (
ClusterMeshEtcdMetricsPortName = "etcd-metrics"
ClusterMeshServiceName = "clustermesh-apiserver"
ClusterMeshSecretName = "cilium-clustermesh" // Secret which contains the clustermesh configuration
ClusterMeshKVStoreMeshSecretName = "cilium-kvstoremesh" // Secret which contains the kvstoremesh configuration
ClusterMeshServerSecretName = "clustermesh-apiserver-server-cert"
ClusterMeshAdminSecretName = "clustermesh-apiserver-admin-cert"
ClusterMeshClientSecretName = "clustermesh-apiserver-client-cert"
Expand Down
24 changes: 24 additions & 0 deletions k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,30 @@ func (c *Client) CiliumStatus(ctx context.Context, namespace, pod string) (*mode
return &statusResponse, nil
}

// KVStoreMeshStatusNotImplemented is a sentinel error to signal that the status command is not implemented.
var ErrKVStoreMeshStatusNotImplemented = errors.New("kvstoremesh-dbg status is not available")

func (c *Client) KVStoreMeshStatus(ctx context.Context, namespace, pod string) ([]*models.RemoteCluster, error) {
stdout, stderr, err := c.ExecInPodWithStderr(ctx, namespace, pod, defaults.ClusterMeshKVStoreMeshContainerName,
[]string{defaults.ClusterMeshBinaryName, "kvstoremesh-dbg", "status", "-o", "json"})
if err != nil {
// Try to figure out if the status command is not yet supported in this version
stderrStr := stderr.String()
if strings.Contains(stderrStr, "Usage:") || strings.Contains(stderrStr, "unknown command") {
return nil, ErrKVStoreMeshStatusNotImplemented
}

return nil, err
}

statusResponse := make([]*models.RemoteCluster, 0)
if err := json.Unmarshal(stdout.Bytes(), &statusResponse); err != nil {
return nil, fmt.Errorf("unable to unmarshal response of kvstoremesh-dbg status: %w", err)
}

return statusResponse, nil
}

func (c *Client) CiliumDbgEndpoints(ctx context.Context, namespace, pod string) ([]*models.Endpoint, error) {
stdout, err := c.ExecInPod(ctx, namespace, pod, defaults.AgentContainerName, []string{"cilium", "endpoint", "list", "-o", "json"})
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions status/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type K8sStatusCollector struct {

type k8sImplementation interface {
CiliumStatus(ctx context.Context, namespace, pod string) (*models.StatusResponse, error)
KVStoreMeshStatus(ctx context.Context, namespace, pod string) ([]*models.RemoteCluster, error)
CiliumDbgEndpoints(ctx context.Context, namespace, pod string) ([]*models.Endpoint, error)
GetDaemonSet(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.DaemonSet, error)
GetDeployment(ctx context.Context, namespace, name string, options metav1.GetOptions) (*appsv1.Deployment, error)
Expand Down Expand Up @@ -113,6 +114,26 @@ func (k *K8sStatusCollector) ClusterMeshConnectivity(ctx context.Context, cilium
return c, nil
}

func (k *K8sStatusCollector) KVStoreMeshConnectivity(ctx context.Context, pod string) (*ClusterMeshAgentConnectivityStatus, error) {
ctx, cancel := context.WithTimeout(ctx, k.params.waitTimeout())
defer cancel()

c := &ClusterMeshAgentConnectivityStatus{
Clusters: map[string]*models.RemoteCluster{},
}

status, err := k.client.KVStoreMeshStatus(ctx, k.params.Namespace, pod)
if err != nil {
return nil, err
}

for _, cluster := range status {
c.Clusters[cluster.Name] = cluster
}

return c, nil
}

func (k *K8sStatusCollector) deploymentStatus(ctx context.Context, status *Status, name string) (bool, error) {
d, err := k.client.GetDeployment(ctx, k.params.Namespace, name, metav1.GetOptions{})
if k8serrors.IsNotFound(err) {
Expand Down
Loading

0 comments on commit fa7e30e

Please sign in to comment.