Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Encapsulate all commands #289

Merged
merged 4 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions go-chaos/backend/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
v1 "k8s.io/api/core/v1"
)

func ConnectBrokers() error {
k8Client, err := internal.CreateK8Client()
func ConnectBrokers(kubeConfigPath string, namespace string) error {
k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -55,8 +55,8 @@ func ConnectBrokers() error {
return nil
}

func ConnectGateway() error {
k8Client, err := internal.CreateK8Client()
func ConnectGateway(kubeConfigPath string, namespace string) error {
k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -103,8 +103,8 @@ type DisconnectBrokerCfg struct {
OneDirection bool
}

func DisconnectBroker(disconnectBrokerCfg DisconnectBrokerCfg) error {
k8Client, err := prepareBrokerDisconnect()
func DisconnectBroker(kubeConfigPath string, namespace string, disconnectBrokerCfg DisconnectBrokerCfg) error {
k8Client, err := prepareBrokerDisconnect(kubeConfigPath, namespace)

zbClient, closeFn, err := ConnectToZeebeCluster(k8Client)
if err != nil {
Expand Down Expand Up @@ -138,8 +138,8 @@ type DisconnectGatewayCfg struct {
BrokerCfg Broker
}

func DisconnectGateway(disconnectGatewayCfg DisconnectGatewayCfg) error {
k8Client, zbClient, closeFn, err := prepareGatewayDisconnect()
func DisconnectGateway(kubeConfigPath string, namespace string, disconnectGatewayCfg DisconnectGatewayCfg) error {
k8Client, zbClient, closeFn, err := prepareGatewayDisconnect(kubeConfigPath, namespace)
if err != nil {
return err
}
Expand Down Expand Up @@ -173,8 +173,8 @@ func DisconnectGateway(disconnectGatewayCfg DisconnectGatewayCfg) error {
return nil
}

func prepareGatewayDisconnect() (internal.K8Client, zbc.Client, func(), error) {
k8Client, err := prepareBrokerDisconnect()
func prepareGatewayDisconnect(kubeConfigPath string, namespace string) (internal.K8Client, zbc.Client, func(), error) {
k8Client, err := prepareBrokerDisconnect(kubeConfigPath, namespace)
if err != nil {
return k8Client, nil, nil, err
}
Expand All @@ -199,8 +199,8 @@ func prepareGatewayDisconnect() (internal.K8Client, zbc.Client, func(), error) {
return k8Client, zbClient, closeFn, nil
}

func prepareBrokerDisconnect() (internal.K8Client, error) {
k8Client, err := internal.CreateK8Client()
func prepareBrokerDisconnect(kubeConfigPath string, namespace string) (internal.K8Client, error) {
k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace)
if err != nil {
return internal.K8Client{}, err
}
Expand Down
132 changes: 69 additions & 63 deletions go-chaos/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cmd

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -33,78 +34,83 @@ import (
"github.com/zeebe-io/zeebe-chaos/go-chaos/internal"
)

var (
backupId string
)
func AddBackupCommand(rootCmd *cobra.Command, flags Flags) {

func init() {
rootCmd.AddCommand(backupCommand)
backupCommand.AddCommand(setupBackupCommand)
backupCommand.AddCommand(takeBackupCommand)
takeBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
backupCommand.AddCommand(waitForBackupCommand)
waitForBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
backupCommand.AddCommand(restoreBackupCommand)
restoreBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
}
var backupCommand = &cobra.Command{
Use: "backup",
Short: "Controls Zeebe backups",
Long: "Can be used to take backups and query their status",
}

var backupCommand = &cobra.Command{
Use: "backup",
Short: "Controls Zeebe backups",
Long: "Can be used to take backups and query their status",
}
var setupBackupCommand = &cobra.Command{
Use: "setup",
Short: "Configures a zeebe cluster's backup settings",
RunE: func(cmd *cobra.Command, args []string) error {
return setupBackup(flags.kubeConfigPath, flags.namespace)
},
}

var setupBackupCommand = &cobra.Command{
Use: "setup",
Short: "Configures a zeebe cluster's backup settings",
RunE: setupBackup,
}
var takeBackupCommand = &cobra.Command{
Use: "take",
Short: "Trigger a backup",
RunE: func(cmd *cobra.Command, args []string) error {
return takeBackup(flags)
},
}

var takeBackupCommand = &cobra.Command{
Use: "take",
Short: "Trigger a backup",
RunE: takeBackup,
}
var waitForBackupCommand = &cobra.Command{
Use: "wait",
Short: "Wait for a backup to complete or fail",
RunE: func(cmd *cobra.Command, args []string) error {
return waitForBackup(flags)
},
}

var waitForBackupCommand = &cobra.Command{
Use: "wait",
Short: "Wait for a backup to complete or fail",
RunE: waitForBackup,
}
var restoreBackupCommand = &cobra.Command{
Use: "restore",
Short: "Restore from a given backup id",
RunE: func(cmd *cobra.Command, args []string) error {
return restoreFromBackup(flags)
},
}

var restoreBackupCommand = &cobra.Command{
Use: "restore",
Short: "Restore from a given backup id",
RunE: restoreFromBackup,
rootCmd.AddCommand(backupCommand)
backupCommand.AddCommand(setupBackupCommand)
backupCommand.AddCommand(takeBackupCommand)
takeBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
backupCommand.AddCommand(waitForBackupCommand)
waitForBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
backupCommand.AddCommand(restoreBackupCommand)
restoreBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default")
}

func setupBackup(cmd *cobra.Command, _ []string) error {
k8Client, err := internal.CreateK8Client()
func setupBackup(kubeConfigPath string, namespace string) error {
k8Client, err := internal.CreateK8Client(kubeConfigPath, namespace)
if err != nil {
panic(err)
}

namespace := k8Client.GetCurrentNamespace()
namespace = k8Client.GetCurrentNamespace()

err = k8Client.PauseReconciliation()
if err != nil {
return err
}

_, err = createBackupSecret(cmd, k8Client, namespace)
_, err = createBackupSecret(k8Client, namespace)
if err != nil {
return err
}

err = setupStatefulSetForBackups(cmd, err, k8Client, namespace)
err = setupStatefulSetForBackups(err, k8Client, namespace)
if err != nil {
return err
}
err = setupGatewayForBackups(cmd, err, k8Client, namespace)
err = setupGatewayForBackups(err, k8Client, namespace)
return err
}

func setupStatefulSetForBackups(cmd *cobra.Command, err error, k8Client internal.K8Client, namespace string) error {
func setupStatefulSetForBackups(err error, k8Client internal.K8Client, namespace string) error {
sfs, err := k8Client.GetZeebeStatefulSet()
if err != nil {
return err
Expand All @@ -122,17 +128,17 @@ func setupStatefulSetForBackups(cmd *cobra.Command, err error, k8Client internal
core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_BACKUPS_ENABLED", Value: "true"},
)

_, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(cmd.Context(), sfs, meta.UpdateOptions{})
_, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(context.TODO(), sfs, meta.UpdateOptions{})
return err
}

func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8Client, namespace string) error {
func setupGatewayForBackups(err error, k8Client internal.K8Client, namespace string) error {
saasGatewayLabels := meta.LabelSelector{
MatchLabels: map[string]string{"app.kubernetes.io/component": "standalone-gateway"},
}
var gatewayDeployments *apps.DeploymentList

gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List(cmd.Context(), meta.ListOptions{LabelSelector: labels.Set(saasGatewayLabels.MatchLabels).String()})
gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List(context.TODO(), meta.ListOptions{LabelSelector: labels.Set(saasGatewayLabels.MatchLabels).String()})
if err != nil {
return err
}
Expand All @@ -141,7 +147,7 @@ func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8C
MatchLabels: map[string]string{"app.kubernetes.io/component": "zeebe-gateway"},
}
gatewayDeployments, err = k8Client.Clientset.AppsV1().Deployments(namespace).List(
cmd.Context(),
context.TODO(),
meta.ListOptions{LabelSelector: labels.Set(selector.MatchLabels).String()},
)
if err != nil {
Expand All @@ -156,13 +162,13 @@ func setupGatewayForBackups(cmd *cobra.Command, err error, k8Client internal.K8C
core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE", Value: "*"},
core.EnvVar{Name: "MANAGEMENT_ENDPOINTS_BACKUPS_ENABLED", Value: "true"},
)
_, err = k8Client.Clientset.AppsV1().Deployments(namespace).Update(cmd.Context(), &gateway, meta.UpdateOptions{})
_, err = k8Client.Clientset.AppsV1().Deployments(namespace).Update(context.TODO(), &gateway, meta.UpdateOptions{})
return err
}

func createBackupSecret(cmd *cobra.Command, k8Client internal.K8Client, namespace string) (*core.Secret, error) {
func createBackupSecret(k8Client internal.K8Client, namespace string) (*core.Secret, error) {
return k8Client.Clientset.CoreV1().Secrets(namespace).Create(
cmd.Context(),
context.TODO(),
&core.Secret{
Type: "Opaque",
ObjectMeta: meta.ObjectMeta{Name: "zeebe-backup-store-s3"},
Expand All @@ -176,8 +182,8 @@ func createBackupSecret(cmd *cobra.Command, k8Client internal.K8Client, namespac
)
}

func takeBackup(*cobra.Command, []string) error {
k8Client, err := internal.CreateK8Client()
func takeBackup(flags Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}
Expand All @@ -190,7 +196,7 @@ func takeBackup(*cobra.Command, []string) error {
port := 9600
closePortForward := k8Client.MustGatewayPortForward(port, port)
defer closePortForward()
url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, backupId)
url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, flags.backupId)
resp, err := http.Post(url, "", nil)
if err != nil {
return err
Expand All @@ -205,8 +211,8 @@ func takeBackup(*cobra.Command, []string) error {
return err
}

func waitForBackup(*cobra.Command, []string) error {
k8Client, err := internal.CreateK8Client()
func waitForBackup(flags Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}
Expand All @@ -216,7 +222,7 @@ func waitForBackup(*cobra.Command, []string) error {
defer closePortForward()

for {
backup, err := getBackupStatus(port, backupId)
backup, err := getBackupStatus(port, flags.backupId)
if err != nil {
return err
}
Expand All @@ -234,8 +240,8 @@ func waitForBackup(*cobra.Command, []string) error {

}

func restoreFromBackup(cmd *cobra.Command, _ []string) error {
k8Client, err := internal.CreateK8Client()
func restoreFromBackup(flags Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -272,7 +278,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error {
Name: "restore-from-backup",
Image: sfs.Spec.Template.Spec.Containers[0].Image,
ImagePullPolicy: core.PullAlways,
Env: restoreEnvFromSfs(sfs),
Env: restoreEnvFromSfs(flags, sfs),
EnvFrom: []core.EnvFromSource{{SecretRef: &core.SecretEnvSource{LocalObjectReference: core.LocalObjectReference{Name: "zeebe-backup-store-s3"}}}},
VolumeMounts: []core.VolumeMount{
{
Expand All @@ -284,7 +290,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error {
}
sfs.Spec.Template.Spec.InitContainers = []core.Container{deleteContainer, restoreContainer}

_, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(cmd.Context(), sfs, meta.UpdateOptions{})
_, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(context.TODO(), sfs, meta.UpdateOptions{})
if err != nil {
return err
}
Expand All @@ -308,7 +314,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error {
return nil
}

func restoreEnvFromSfs(sfs *apps.StatefulSet) []core.EnvVar {
func restoreEnvFromSfs(flags Flags, sfs *apps.StatefulSet) []core.EnvVar {
zeebeEnv := sfs.Spec.Template.Spec.Containers[0].Env
restoreEnv := make([]core.EnvVar, 0)
for _, env := range zeebeEnv {
Expand All @@ -326,7 +332,7 @@ func restoreEnvFromSfs(sfs *apps.StatefulSet) []core.EnvVar {
},
core.EnvVar{
Name: "ZEEBE_RESTORE_FROM_BACKUP_ID",
Value: backupId,
Value: flags.backupId,
})
return restoreEnv
}
Expand Down
54 changes: 27 additions & 27 deletions go-chaos/cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,34 @@ import (
"github.com/zeebe-io/zeebe-chaos/go-chaos/backend"
)

func init() {
rootCmd.AddCommand(connect)
connect.AddCommand(connectBrokers)
connect.AddCommand(connectGateway)
}
func AddConnectCmd(rootCmd *cobra.Command, flags Flags) {
var connect = &cobra.Command{
Use: "connect",
Short: "Connect Zeebe nodes",
Long: `Connect all Zeebe nodes again, after they have been disconnected uses sub-commands to connect brokers, gateways, etc.`,
}

var connect = &cobra.Command{
Use: "connect",
Short: "Connect Zeebe nodes",
Long: `Connect all Zeebe nodes again, after they have been disconnected uses sub-commands to connect brokers, gateways, etc.`,
}
var connectBrokers = &cobra.Command{
Use: "brokers",
Short: "Connect Zeebe Brokers",
Long: `Connect all Zeebe Brokers again, after they have been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
err := backend.ConnectBrokers(flags.kubeConfigPath, flags.namespace)
ensureNoError(err)
},
}

var connectBrokers = &cobra.Command{
Use: "brokers",
Short: "Connect Zeebe Brokers",
Long: `Connect all Zeebe Brokers again, after they have been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
err := backend.ConnectBrokers()
ensureNoError(err)
},
}
var connectGateway = &cobra.Command{
Use: "gateway",
Short: "Connect Zeebe Gateway",
Long: `Connect all Zeebe Gateway again, after it has been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
err := backend.ConnectGateway(flags.kubeConfigPath, flags.namespace)
ensureNoError(err)
},
}

var connectGateway = &cobra.Command{
Use: "gateway",
Short: "Connect Zeebe Gateway",
Long: `Connect all Zeebe Gateway again, after it has been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
err := backend.ConnectGateway()
ensureNoError(err)
},
rootCmd.AddCommand(connect)
connect.AddCommand(connectBrokers)
connect.AddCommand(connectGateway)
}
Loading