Skip to content

Commit

Permalink
helm mode: add implementation for clustermesh connect
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Sauber <[email protected]>
  • Loading branch information
asauber committed Jun 5, 2023
1 parent 7b3f21d commit ec41572
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 30 deletions.
296 changes: 275 additions & 21 deletions clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ package clustermesh
import (
"bytes"
"context"
"crypto/x509"
"encoding/base64"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
"io"
Expand All @@ -23,13 +25,15 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/blang/semver/v4"
"github.com/cilium/cilium/api/v1/models"
ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/versioncheck"
"helm.sh/helm/v3/pkg/release"

"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/internal/certs"
Expand Down Expand Up @@ -953,36 +957,56 @@ func (k *K8sClusterMesh) patchConfig(ctx context.Context, client k8sClusterMeshI
return nil
}

func (k *K8sClusterMesh) Connect(ctx context.Context) error {
remoteCluster, err := k8s.NewClient(k.params.DestinationContext, "")
// getClientsForConnect returns a k8s.Client for the local and remote cluster, respectively
func (k *K8sClusterMesh) getClientsForConnect() (*k8s.Client, *k8s.Client, error) {
remoteClient, err := k8s.NewClient(k.params.DestinationContext, "")
if err != nil {
return fmt.Errorf("unable to create Kubernetes client to access remote cluster %q: %w", k.params.DestinationContext, err)
return nil, nil, fmt.Errorf(
"unable to create Kubernetes client to access remote cluster %q: %w",
k.params.DestinationContext, err)
}
return k.client.(*k8s.Client), remoteClient, nil
}

aiRemote, err := k.extractAccessInformation(ctx, remoteCluster, k.params.DestinationEndpoints, true, false)
// connectAccessInit initializes a Kubernetes client for the local and remote cluster
// and performs some validation that the two clusters can be connected via clustermesh
func (k *K8sClusterMesh) getAccessInfoForConnect(
ctx context.Context, localClient, remoteClient *k8s.Client,
) (*accessInformation, *accessInformation, error) {
aiRemote, err := k.extractAccessInformation(ctx, remoteClient, k.params.DestinationEndpoints, true, false)
if err != nil {
k.Log("❌ Unable to retrieve access information of remote cluster %q: %s", remoteCluster.ClusterName(), err)
return err
k.Log("❌ Unable to retrieve access information of remote cluster %q: %s", remoteClient.ClusterName(), err)
return nil, nil, err
}

if !aiRemote.validate() {
return fmt.Errorf("remote cluster has non-unique name (%s) and/or ID (%s)", aiRemote.ClusterName, aiRemote.ClusterID)
}

aiLocal, err := k.extractAccessInformation(ctx, k.client, k.params.SourceEndpoints, true, false)
aiLocal, err := k.extractAccessInformation(ctx, localClient, k.params.SourceEndpoints, true, false)
if err != nil {
k.Log("❌ Unable to retrieve access information of local cluster %q: %s", k.client.ClusterName(), err)
return err
return nil, nil, err
}

return aiLocal, aiRemote, nil
}

// connectAccessInit initializes a Kubernetes client for the local and remote cluster
// and performs some validation that the two clusters can be connected via clustermesh
func (k *K8sClusterMesh) validateInfoForConnect(aiLocal, aiRemote *accessInformation) error {
if !aiRemote.validate() {
return fmt.Errorf("remote cluster has non-unique name (%s) and/or ID (%s)",
aiRemote.ClusterName, aiRemote.ClusterID)
}

if !aiLocal.validate() {
return fmt.Errorf("local cluster has the default name (cluster name: %s) and/or ID 0 (cluster ID: %s)",
return fmt.Errorf(
"local cluster has the default name (cluster name: %s) and/or ID 0 (cluster ID: %s)",
aiLocal.ClusterName, aiLocal.ClusterID)
}

cid, err := strconv.Atoi(aiRemote.ClusterID)
if err != nil {
return fmt.Errorf("remote cluster has non-numeric cluster ID %s. Only numeric values 1-255 are allowed", aiRemote.ClusterID)
return fmt.Errorf(
"remote cluster has non-numeric cluster ID %s. Only numeric values 1-255 are allowed",
aiRemote.ClusterID)
}
if cid < 1 || cid > 255 {
return fmt.Errorf("remote cluster has cluster ID %d out of acceptable range (1-255)", cid)
Expand All @@ -996,17 +1020,36 @@ func (k *K8sClusterMesh) Connect(ctx context.Context) error {
return fmt.Errorf("remote and local cluster have the same, non-unique ID: %s", aiLocal.ClusterID)
}

k.Log("✨ Connecting cluster %s -> %s...", k.client.ClusterName(), remoteCluster.ClusterName())
return nil
}

func (k *K8sClusterMesh) Connect(ctx context.Context) error {
localClient, remoteClient, err := k.getClientsForConnect()
if err != nil {
return err
}

aiLocal, aiRemote, err := k.getAccessInfoForConnect(ctx, localClient, remoteClient)
if err != nil {
return err
}

err = k.validateInfoForConnect(aiLocal, aiRemote)
if err != nil {
return err
}

k.Log("✨ Connecting cluster %s -> %s...", k.client.ClusterName(), remoteClient.ClusterName())
if err := k.patchConfig(ctx, k.client, aiRemote); err != nil {
return err
}

k.Log("✨ Connecting cluster %s -> %s...", remoteCluster.ClusterName(), k.client.ClusterName())
if err := k.patchConfig(ctx, remoteCluster, aiLocal); err != nil {
k.Log("✨ Connecting cluster %s -> %s...", remoteClient.ClusterName(), k.client.ClusterName())
if err := k.patchConfig(ctx, remoteClient, aiLocal); err != nil {
return err
}

k.Log("✅ Connected cluster %s and %s!", k.client.ClusterName(), remoteCluster.ClusterName())
k.Log("✅ Connected cluster %s and %s!", localClient.ClusterName(), remoteClient.ClusterName())

return nil
}
Expand Down Expand Up @@ -1757,7 +1800,7 @@ func (k *K8sClusterMesh) ExternalWorkloadStatus(ctx context.Context, names []str
}
cews = cewList.Items
if len(cews) == 0 {
k.Log("⚠️ No external workloads found.")
k.Log("⚠️ No external workloads found.")
return nil
}
} else {
Expand Down Expand Up @@ -1847,18 +1890,29 @@ func generateEnableHelmValues(params Parameters, flavor k8s.Flavor) (map[string]
}
}

helmVals["clustermesh"].(map[string]interface{})["apiserver"].(map[string]interface{})["tls"] =
// default to using certgen, so that certificates are renewed automatically
map[string]interface{}{
"auto": map[string]interface{}{
"enabled": true,
"method": "cronJob",
// run the renewal every 4 months on the 1st of the month
"schedule": "0 0 1 */4 *",
},
}

return helmVals, nil
}

func EnableWithHelm(ctx context.Context, k8sClient *k8s.Client, params Parameters) error {
vals, err := generateEnableHelmValues(params, k8sClient.AutodetectFlavor(ctx))
helmVals, err := generateEnableHelmValues(params, k8sClient.AutodetectFlavor(ctx))
if err != nil {
return err
}
upgradeParams := helm.UpgradeParameters{
Namespace: params.Namespace,
Name: defaults.HelmReleaseName,
Values: vals,
Values: helmVals,
ResetValues: false,
ReuseValues: true,
}
Expand All @@ -1885,3 +1939,203 @@ func DisableWithHelm(ctx context.Context, k8sClient *k8s.Client, params Paramete
_, err = helm.Upgrade(ctx, k8sClient.RESTClientGetter, upgradeParams)
return err
}

func getRelease(kc *k8s.Client, namespace string) (*release.Release, error) {
client := kc.RESTClientGetter
release, err := helm.GetCurrentRelease(client, namespace, defaults.HelmReleaseName)
if err != nil {
return nil, err
}
return release, nil
}

func (k *K8sClusterMesh) validateCAMatch(aiLocal, aiRemote *accessInformation) error {
caLocal := aiLocal.CA
block, _ := pem.Decode(caLocal)
if block == nil {
panic("failed to parse certificate PEM")
}
localCert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return err
}

caRemote := aiRemote.CA
block, _ = pem.Decode(caRemote)
if block == nil {
panic("failed to parse certificate PEM")
}
remoteCert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return err
}

// Compare the x509 key identifier of each certificate
if !bytes.Equal(localCert.SubjectKeyId, remoteCert.SubjectKeyId) {
return fmt.Errorf("Local and Remote cluster cilium-ca certificate keys do not match")
}
return nil
}

// ConnectWithHelm enables clustermesh using a Helm Upgrade
// action. Certificates are generated via the Helm chart's cronJob
// (certgen) mode. As with classic mode, only autodetected IP-based
// clustermesh-apiserver Service endpoints are currently supported.
func (k *K8sClusterMesh) ConnectWithHelm(ctx context.Context) error {
localRelease, err := getRelease(k.client.(*k8s.Client), k.params.Namespace)
if err != nil {
k.Log("❌ Unable to find Helm release for the target cluster")
return err
}
version := localRelease.Chart.AppVersion()
semv, err := utils.ParseCiliumVersion(version)
if err != nil {
return fmt.Errorf("failed to parse Cilium version: %w", err)
}

v := fmt.Sprintf("%d.%d.%d", semv.Major, semv.Minor, semv.Patch)
k.Log("✅ Detected Helm release with Cilium version %s", v)

if v < "1.14.0" {
// Helm-based clustermesh enable is only supported on Cilium v1.14+ due to a lack of support in earlier versions
// for autoconfigured certificates (tls.{crt,key}) for cluster members when running in certgen (cronJob) PKI
// mode
k.Log("⚠️ Cilium Version is less than 1.14.0. Continuing in classic mode.")
return k.Connect(ctx)
}

localClient, remoteClient, err := k.getClientsForConnect()
if err != nil {
return err
}

aiLocal, aiRemote, err := k.getAccessInfoForConnect(ctx, localClient, remoteClient)
if err != nil {
return err
}

err = k.validateInfoForConnect(aiLocal, aiRemote)
if err != nil {
return err
}

// Validate that CA certificates match between the two clusters
err = k.validateCAMatch(aiLocal, aiRemote)
if err != nil {
return err
}

// Get existing helm values for the local cluster
localHelmValues := localRelease.Config
// Expand those values to include the clustermesh configuration
localHelmValues, err = updateClustermeshConfig(localHelmValues, aiRemote)
if err != nil {
return err
}

// Get existing helm values for the remote cluster
remoteRelease, err := getRelease(remoteClient, k.params.Namespace)
if err != nil {
k.Log("❌ Unable to find Helm release for the remote cluster")
return err
}
remoteHelmValues := remoteRelease.Config
// Expand those values to include the clustermesh configuration
remoteHelmValues, err = updateClustermeshConfig(remoteHelmValues, aiLocal)
if err != nil {
return err
}

upgradeParams := helm.UpgradeParameters{
Namespace: k.params.Namespace,
Name: defaults.HelmReleaseName,
Values: localHelmValues,
ResetValues: false,
ReuseValues: true,
}

// Enable clustermesh using a Helm Upgrade command against our target cluster
k.Log("⚠️ Configuring Cilium in cluster '%s' to connect to cluster '%s'",
localClient.ClusterName(), remoteClient.ClusterName())
_, err = helm.Upgrade(ctx, localClient.RESTClientGetter, upgradeParams)
if err != nil {
return err
}

// Enable clustermesh using a Helm Upgrade command against the remote cluster
k.Log("⚠️ Configuring Cilium in cluster '%s' to connect to cluster '%s'",
remoteClient.ClusterName(), localClient.ClusterName())
upgradeParams.Values = remoteHelmValues
_, err = helm.Upgrade(ctx, remoteClient.RESTClientGetter, upgradeParams)
if err != nil {
return err
}

k.Log("✅ Connected cluster %s and %s!", localClient.ClusterName(), remoteClient.ClusterName())
return nil
}

func updateClustermeshConfig(
values map[string]interface{}, aiRemote *accessInformation,
) (map[string]interface{}, error) {
// get current clusters config slice, if it exists
c, found, err := unstructured.NestedFieldCopy(values, "clustermesh", "config", "clusters")
if err != nil {
return nil, fmt.Errorf("existing clustermesh.config is invalid")
}
if !found {
c = []map[string]interface{}{}
}

// parse the existing config slice
oldClusters := make([]map[string]interface{}, 0)
cs, ok := c.([]interface{})
if !ok {
return nil, fmt.Errorf("existing clustermesh.config.clusters array is invalid")
}
for _, m := range cs {
cluster, ok := m.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("existing clustermesh.config.clusters array is invalid")
}
oldClusters = append(oldClusters, cluster)
}

// allocate new cluster entries
newClusters := []map[string]interface{}{
{
"name": aiRemote.ClusterName,
"ips": []string{aiRemote.ServiceIPs[0]},
"port": aiRemote.ServicePort,
},
}

// merge new clusters on top of old clusters
clusters := map[string]map[string]interface{}{}
for _, c := range oldClusters {
name, ok := c["name"].(string)
if !ok {
return nil, fmt.Errorf("existing clustermesh.config.clusters array is invalid")
}
clusters[name] = c
}
for _, c := range newClusters {
clusters[c["name"].(string)] = c
}

outputClusters := make([]map[string]interface{}, 0)
for _, v := range clusters {
outputClusters = append(outputClusters, v)
}

newValues := map[string]interface{}{
"clustermesh": map[string]interface{}{
"config": map[string]interface{}{
"enabled": true,
"clusters": outputClusters,
},
},
}

return newValues, nil
}
Loading

0 comments on commit ec41572

Please sign in to comment.