Skip to content

Commit

Permalink
helm mode: add implementation for clustermesh connect
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Sauber <[email protected]>
  • Loading branch information
asauber committed May 13, 2023
1 parent 845efc1 commit 406d567
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 9 deletions.
95 changes: 93 additions & 2 deletions clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,7 +1777,7 @@ func EnableWithHelm(ctx context.Context, k8sClient *k8s.Client, params Parameter
"clustermesh.useAPIServer=true",
fmt.Sprintf("clustermesh.apiserver.service.type=%s", params.ServiceType),
}
vals, err := helm.ParseVals(helmStrValues)
vals, err := helm.ParseStrVals(helmStrValues)
if err != nil {
return err
}
Expand All @@ -1796,7 +1796,7 @@ func DisableWithHelm(ctx context.Context, k8sClient *k8s.Client, params Paramete
helmStrValues := []string{
"clustermesh.useAPIServer=false",
}
vals, err := helm.ParseVals(helmStrValues)
vals, err := helm.ParseStrVals(helmStrValues)
if err != nil {
return err
}
Expand All @@ -1810,3 +1810,94 @@ func DisableWithHelm(ctx context.Context, k8sClient *k8s.Client, params Paramete
_, err = helm.Upgrade(ctx, k8sClient.RESTClientGetter, upgradeParams)
return err
}

func (k *K8sClusterMesh) ConnectWithHelm(ctx context.Context) error {
remoteCluster, err := k8s.NewClient(k.params.DestinationContext, "")
if err != nil {
return fmt.Errorf("unable to create Kubernetes client to access remote cluster %q: %w", k.params.DestinationContext, err)
}

aiRemote, err := k.extractAccessInformation(ctx, remoteCluster, k.params.DestinationEndpoints, true, false)
if err != nil {
k.Log("❌ Unable to retrieve access information of remote cluster %q: %s", remoteCluster.ClusterName(), err)
return err
}

if !aiRemote.validate() {
return fmt.Errorf("remote cluster has non-unique name (%s) and/or ID (%s)", aiRemote.ClusterName, aiRemote.ClusterID)
}

aiLocal, err := k.extractAccessInformation(ctx, k.client, k.params.SourceEndpoints, true, false)
if err != nil {
k.Log("❌ Unable to retrieve access information of local cluster %q: %s", k.client.ClusterName(), err)
return err
}

if !aiLocal.validate() {
return fmt.Errorf("local cluster has the default name (cluster name: %s) and/or ID 0 (cluster ID: %s)",
aiLocal.ClusterName, aiLocal.ClusterID)
}

cid, err := strconv.Atoi(aiRemote.ClusterID)
if err != nil {
return fmt.Errorf("remote cluster has non-numeric cluster ID %s. Only numeric values 1-255 are allowed", aiRemote.ClusterID)
}
if cid < 1 || cid > 255 {
return fmt.Errorf("remote cluster has cluster ID %d out of acceptable range (1-255)", cid)
}

if aiRemote.ClusterName == aiLocal.ClusterName {
return fmt.Errorf("remote and local cluster have the same, non-unique name: %s", aiLocal.ClusterName)
}

if aiRemote.ClusterID == aiLocal.ClusterID {
return fmt.Errorf("remote and local cluster have the same, non-unique ID: %s", aiLocal.ClusterID)
}

// TODO (ajs): Support more than two clusters (dynamically append to config)
helmValues := map[string]interface{}{
"clustermesh": map[string]interface{}{
"config": map[string]interface{}{
"enabled": true,
"clusters": []map[string]interface{}{
map[string]interface{}{
"name": aiLocal.ClusterName,
// TODO (ajs): Support hostname-based endpoints
// include logic from patchConfig
"ips": []string{aiLocal.ServiceIPs[0]},
"port": aiLocal.ServicePort,
},
map[string]interface{}{
"name": aiRemote.ClusterName,
"ips": []string{aiRemote.ServiceIPs[0]},
"port": aiRemote.ServicePort,
},
},
},
},
}

// Enable clustermesh using a Helm Upgrade command
upgradeParams := helm.UpgradeParameters{
Namespace: k.params.Namespace,
Name: defaults.HelmReleaseName,
Values: helmValues,
ResetValues: false,
ReuseValues: true,
}

// TODO (ajs): After classic mode removal, use a k8s.Client for k.client
_, err = helm.Upgrade(ctx, k.client.(*k8s.Client).RESTClientGetter, upgradeParams)
if err != nil {
return err
}

// TODO (ajs): After classic mode removal, use a k8s.Client for k.client
_, err = helm.Upgrade(ctx, remoteCluster.RESTClientGetter, upgradeParams)
if err != nil {
return err
}

k.Log("✅ Connected cluster %s and %s!", k.client.ClusterName(), remoteCluster.ClusterName())
return nil
}
29 changes: 28 additions & 1 deletion internal/cli/cmd/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,20 @@ func newCmdClusterMesh() *cobra.Command {
}

cmd.AddCommand(
newCmdClusterMeshConnect(),
newCmdClusterMeshDisconnect(),
newCmdClusterMeshStatus(),
newCmdClusterMeshExternalWorkload(),
)

if utils.IsInHelmMode() {
cmd.AddCommand(
newCmdClusterMeshConnectWithHelm(),
newCmdClusterMeshEnableWithHelm(),
newCmdClusterMeshDisableWithHelm(),
)
} else {
cmd.AddCommand(
newCmdClusterMeshConnect(),
newCmdClusterMeshEnable(),
newCmdClusterMeshDisable(),
)
Expand Down Expand Up @@ -393,3 +394,29 @@ func newCmdClusterMeshDisableWithHelm() *cobra.Command {

return cmd
}

func newCmdClusterMeshConnectWithHelm() *cobra.Command {
var params = clustermesh.Parameters{
Writer: os.Stdout,
}

cmd := &cobra.Command{
Use: "connect",
Short: "Connect to a remote cluster using Helm",
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
params.Namespace = namespace
cm := clustermesh.NewK8sClusterMesh(k8sClient, params)
if err := cm.ConnectWithHelm(context.Background()); err != nil {
fatalf("Unable to connect cluster: %s", err)
}
return nil
},
}

cmd.Flags().StringVar(&params.DestinationContext, "destination-context", "", "Kubernetes configuration context of destination cluster")
cmd.Flags().StringSliceVar(&params.DestinationEndpoints, "destination-endpoint", []string{}, "IP of ClusterMesh service of destination cluster")
cmd.Flags().StringSliceVar(&params.SourceEndpoints, "source-endpoint", []string{}, "IP of ClusterMesh service of source cluster")

return cmd
}
4 changes: 2 additions & 2 deletions internal/helm/helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,11 +374,11 @@ func MergeVals(
return vals, nil
}

// ParseVals takes a slice of Helm values of the form
// ParseStrVals takes a slice of Helm values of the form
// ["some.chart.value=val1", "some.other.value=val2"]
// and returns a deeply nested map of Values of the form
// expected by Helm actions.
func ParseVals(helmStrValues []string) (map[string]interface{}, error) {
func ParseStrVals(helmStrValues []string) (map[string]interface{}, error) {
helmValStr := strings.Join(helmStrValues, ",")
helmValues := map[string]interface{}{}
err := strvals.ParseInto(helmValStr, helmValues)
Expand Down
8 changes: 4 additions & 4 deletions internal/helm/helm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestResolveHelmChartVersion(t *testing.T) {
}
}

func TestParseVals(t *testing.T) {
func TestParseStrVals(t *testing.T) {
tests := []struct {
name string
input []string
Expand Down Expand Up @@ -124,13 +124,13 @@ func TestParseVals(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ParseVals(tt.input)
got, err := ParseStrVals(tt.input)
if (err != nil) != tt.wantErr {
t.Errorf("ParseVals() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("ParseStrVals() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseVals() got = %v, want %v", got, tt.want)
t.Errorf("ParseStrVals() got = %v, want %v", got, tt.want)
}
})
}
Expand Down

0 comments on commit 406d567

Please sign in to comment.