From c7b468e44e8b47348f5b03b2dd1c005c8d5caf2e Mon Sep 17 00:00:00 2001 From: Christopher Zell Date: Fri, 9 Dec 2022 09:46:46 +0100 Subject: [PATCH] refactor: encapsulate all commands Add new functions to always create new commands, such that we not reuse commands and not rely on globals --- go-chaos/cmd/backup.go | 82 ++++++++------- go-chaos/cmd/connect.go | 54 +++++----- go-chaos/cmd/dataloss_sim.go | 187 ++++++++++++++++++----------------- go-chaos/cmd/deploy.go | 175 ++++++++++++++++---------------- go-chaos/cmd/disconnect.go | 108 ++++++++++---------- go-chaos/cmd/exporting.go | 38 +++---- go-chaos/cmd/publish.go | 55 ++++++----- go-chaos/cmd/restart.go | 73 +++++++------- go-chaos/cmd/root.go | 98 ++++++++---------- go-chaos/cmd/stress.go | 106 ++++++++++---------- go-chaos/cmd/terminate.go | 77 ++++++++------- go-chaos/cmd/topology.go | 54 +++++----- go-chaos/cmd/verify.go | 117 +++++++++++----------- go-chaos/cmd/version.go | 20 ++-- go-chaos/cmd/worker.go | 21 ++-- go-chaos/cmd/zeebePods.go | 40 ++++---- 16 files changed, 648 insertions(+), 657 deletions(-) diff --git a/go-chaos/cmd/backup.go b/go-chaos/cmd/backup.go index 9db315694..43b099b88 100644 --- a/go-chaos/cmd/backup.go +++ b/go-chaos/cmd/backup.go @@ -15,6 +15,7 @@ package cmd import ( + "context" "encoding/json" "errors" "fmt" @@ -33,7 +34,44 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { +func AddBackupCommand(rootCmd *cobra.Command, flags Flags) { + + var backupCommand = &cobra.Command{ + Use: "backup", + Short: "Controls Zeebe backups", + Long: "Can be used to take backups and query their status", + } + + var setupBackupCommand = &cobra.Command{ + Use: "setup", + Short: "Configures a zeebe cluster's backup settings", + RunE: setupBackup, + } + + var takeBackupCommand = &cobra.Command{ + Use: "take", + Short: "Trigger a backup", + RunE: func(cmd *cobra.Command, args []string) error { + return takeBackup(flags) + }, + } + + var waitForBackupCommand = &cobra.Command{ + Use: "wait", + Short: "Wait for a backup to complete or fail", + RunE: func(cmd *cobra.Command, args []string) error { + return waitForBackup(flags) + }, + } + + var restoreBackupCommand = &cobra.Command{ + Use: "restore", + Short: "Restore from a given backup id", + RunE: func(cmd *cobra.Command, args []string) error { + return restoreFromBackup(flags) + }, + } + rootCmd.AddCommand(backupCommand) backupCommand.AddCommand(setupBackupCommand) backupCommand.AddCommand(takeBackupCommand) @@ -44,36 +82,6 @@ func init() { restoreBackupCommand.Flags().StringVar(&flags.backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") } -var backupCommand = &cobra.Command{ - Use: "backup", - Short: "Controls Zeebe backups", - Long: "Can be used to take backups and query their status", -} - -var setupBackupCommand = &cobra.Command{ - Use: "setup", - Short: "Configures a zeebe cluster's backup settings", - RunE: setupBackup, -} - -var takeBackupCommand = &cobra.Command{ - Use: "take", - Short: "Trigger a backup", - RunE: takeBackup, -} - -var waitForBackupCommand = &cobra.Command{ - Use: "wait", - Short: "Wait for a backup to complete or fail", - RunE: waitForBackup, -} - -var restoreBackupCommand = &cobra.Command{ - Use: "restore", - Short: "Restore from a given backup id", - RunE: restoreFromBackup, -} - func setupBackup(cmd *cobra.Command, _ []string) error { k8Client, err := internal.CreateK8Client() if err != nil { @@ -172,7 +180,7 @@ func createBackupSecret(cmd *cobra.Command, k8Client internal.K8Client, namespac ) } -func takeBackup(*cobra.Command, []string) error { +func takeBackup(flags Flags) error { k8Client, err := internal.CreateK8Client() if err != nil { panic(err) @@ -201,7 +209,7 @@ func takeBackup(*cobra.Command, []string) error { return err } -func waitForBackup(*cobra.Command, []string) error { +func waitForBackup(flags Flags) error { k8Client, err := internal.CreateK8Client() if err != nil { panic(err) @@ -230,7 +238,7 @@ func waitForBackup(*cobra.Command, []string) error { } -func restoreFromBackup(cmd *cobra.Command, _ []string) error { +func restoreFromBackup(flags Flags) error { k8Client, err := internal.CreateK8Client() if err != nil { panic(err) @@ -268,7 +276,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error { Name: "restore-from-backup", Image: sfs.Spec.Template.Spec.Containers[0].Image, ImagePullPolicy: core.PullAlways, - Env: restoreEnvFromSfs(sfs), + Env: restoreEnvFromSfs(flags, sfs), EnvFrom: []core.EnvFromSource{{SecretRef: &core.SecretEnvSource{LocalObjectReference: core.LocalObjectReference{Name: "zeebe-backup-store-s3"}}}}, VolumeMounts: []core.VolumeMount{ { @@ -280,7 +288,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error { } sfs.Spec.Template.Spec.InitContainers = []core.Container{deleteContainer, restoreContainer} - _, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(cmd.Context(), sfs, meta.UpdateOptions{}) + _, err = k8Client.Clientset.AppsV1().StatefulSets(namespace).Update(context.TODO(), sfs, meta.UpdateOptions{}) if err != nil { return err } @@ -304,7 +312,7 @@ func restoreFromBackup(cmd *cobra.Command, _ []string) error { return nil } -func restoreEnvFromSfs(sfs *apps.StatefulSet) []core.EnvVar { +func restoreEnvFromSfs(flags Flags, sfs *apps.StatefulSet) []core.EnvVar { zeebeEnv := sfs.Spec.Template.Spec.Containers[0].Env restoreEnv := make([]core.EnvVar, 0) for _, env := range zeebeEnv { diff --git a/go-chaos/cmd/connect.go b/go-chaos/cmd/connect.go index 851e47148..2a2363223 100644 --- a/go-chaos/cmd/connect.go +++ b/go-chaos/cmd/connect.go @@ -19,34 +19,34 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/backend" ) -func init() { - rootCmd.AddCommand(connect) - connect.AddCommand(connectBrokers) - connect.AddCommand(connectGateway) -} +func AddConnectCmd(rootCmd *cobra.Command) { + var connect = &cobra.Command{ + Use: "connect", + Short: "Connect Zeebe nodes", + Long: `Connect all Zeebe nodes again, after they have been disconnected uses sub-commands to connect brokers, gateways, etc.`, + } -var connect = &cobra.Command{ - Use: "connect", - Short: "Connect Zeebe nodes", - Long: `Connect all Zeebe nodes again, after they have been disconnected uses sub-commands to connect brokers, gateways, etc.`, -} + var connectBrokers = &cobra.Command{ + Use: "brokers", + Short: "Connect Zeebe Brokers", + Long: `Connect all Zeebe Brokers again, after they have been disconnected.`, + Run: func(cmd *cobra.Command, args []string) { + err := backend.ConnectBrokers() + ensureNoError(err) + }, + } -var connectBrokers = &cobra.Command{ - Use: "brokers", - Short: "Connect Zeebe Brokers", - Long: `Connect all Zeebe Brokers again, after they have been disconnected.`, - Run: func(cmd *cobra.Command, args []string) { - err := backend.ConnectBrokers() - ensureNoError(err) - }, -} + var connectGateway = &cobra.Command{ + Use: "gateway", + Short: "Connect Zeebe Gateway", + Long: `Connect all Zeebe Gateway again, after it has been disconnected.`, + Run: func(cmd *cobra.Command, args []string) { + err := backend.ConnectGateway() + ensureNoError(err) + }, + } -var connectGateway = &cobra.Command{ - Use: "gateway", - Short: "Connect Zeebe Gateway", - Long: `Connect all Zeebe Gateway again, after it has been disconnected.`, - Run: func(cmd *cobra.Command, args []string) { - err := backend.ConnectGateway() - ensureNoError(err) - }, + rootCmd.AddCommand(connect) + connect.AddCommand(connectBrokers) + connect.AddCommand(connectGateway) } diff --git a/go-chaos/cmd/dataloss_sim.go b/go-chaos/cmd/dataloss_sim.go index 93e4b9523..f9abdfdb6 100644 --- a/go-chaos/cmd/dataloss_sim.go +++ b/go-chaos/cmd/dataloss_sim.go @@ -21,7 +21,100 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { +func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags Flags) { + + var datalossCmd = &cobra.Command{ + Use: "dataloss", + Short: "Simulate dataloss and recover", + Long: `Simulate dataloss of a broker, and recover from it.`, + } + + var prepareCmd = &cobra.Command{ + Use: "prepare", + Short: "Prepare the k8s deployment for dataloss test", + Long: `Prepares the k8s deployment - such as applying patches to statefulsets - to enable applying dataloss commands.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } + + // Add Init container for dataloss simulation test + err = k8Client.ApplyInitContainerPatch() + + if err != nil { + panic(err) + } + + internal.LogInfo("Prepared cluster in namesapce %s", k8Client.GetCurrentNamespace()) + }, + } + + var datalossDelete = &cobra.Command{ + Use: "delete", + Short: "Delete data of a broker", + Long: `Delete data of a broker by deleting the pvc and the pod`, + Run: func(cmd *cobra.Command, args []string) { + + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } + + pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(flags.nodeId)) + + if err != nil { + internal.LogInfo("Failed to get pod with nodeId %d %s", flags.nodeId, err) + panic(err) + } + + k8Client.DeletePvcOfBroker(pod.Name) + + internal.SetInitContainerBlockFlag(k8Client, flags.nodeId, "true") + err = k8Client.RestartPod(pod.Name) + if err != nil { + internal.LogInfo("Failed to restart pod %s", pod.Name) + panic(err) + } + + internal.LogInfo("Deleted pod %s in namespace %s", pod.Name, k8Client.GetCurrentNamespace()) + }, + } + + var datalossRecover = &cobra.Command{ + Use: "recover", + Short: "Recover broker after full data loss", + Long: `Restart the broker after full data loss, wait until the data is fully recovered`, + Run: func(cmd *cobra.Command, args []string) { + + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } + + err = internal.SetInitContainerBlockFlag(k8Client, flags.nodeId, "false") + if err != nil { + panic(err) + } + + pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(flags.nodeId)) + + if err != nil { + internal.LogInfo("Failed to get pod with nodeId %d %s", flags.nodeId, err) + panic(err) + } + + // The pod is restarting after dataloss, so it takes longer to be ready + err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute) + + if err != nil { + internal.LogInfo("%s", err) + panic(err) + } + internal.LogInfo("Broker %d is recovered", flags.nodeId) + }, + } + rootCmd.AddCommand(datalossCmd) datalossCmd.AddCommand(prepareCmd) datalossCmd.AddCommand(datalossDelete) @@ -30,95 +123,3 @@ func init() { datalossDelete.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker") datalossRecover.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker") } - -var datalossCmd = &cobra.Command{ - Use: "dataloss", - Short: "Simulate dataloss and recover", - Long: `Simulate dataloss of a broker, and recover from it.`, -} - -var prepareCmd = &cobra.Command{ - Use: "prepare", - Short: "Prepare the k8s deployment for dataloss test", - Long: `Prepares the k8s deployment - such as applying patches to statefulsets - to enable applying dataloss commands.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } - - // Add Init container for dataloss simulation test - err = k8Client.ApplyInitContainerPatch() - - if err != nil { - panic(err) - } - - internal.LogInfo("Prepared cluster in namesapce %s", k8Client.GetCurrentNamespace()) - }, -} - -var datalossDelete = &cobra.Command{ - Use: "delete", - Short: "Delete data of a broker", - Long: `Delete data of a broker by deleting the pvc and the pod`, - Run: func(cmd *cobra.Command, args []string) { - - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } - - pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(flags.nodeId)) - - if err != nil { - internal.LogInfo("Failed to get pod with nodeId %d %s", flags.nodeId, err) - panic(err) - } - - k8Client.DeletePvcOfBroker(pod.Name) - - internal.SetInitContainerBlockFlag(k8Client, flags.nodeId, "true") - err = k8Client.RestartPod(pod.Name) - if err != nil { - internal.LogInfo("Failed to restart pod %s", pod.Name) - panic(err) - } - - internal.LogInfo("Deleted pod %s in namespace %s", pod.Name, k8Client.GetCurrentNamespace()) - }, -} - -var datalossRecover = &cobra.Command{ - Use: "recover", - Short: "Recover broker after full data loss", - Long: `Restart the broker after full data loss, wait until the data is fully recovered`, - Run: func(cmd *cobra.Command, args []string) { - - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } - - err = internal.SetInitContainerBlockFlag(k8Client, flags.nodeId, "false") - if err != nil { - panic(err) - } - - pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(flags.nodeId)) - - if err != nil { - internal.LogInfo("Failed to get pod with nodeId %d %s", flags.nodeId, err) - panic(err) - } - - // The pod is restarting after dataloss, so it takes longer to be ready - err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute) - - if err != nil { - internal.LogInfo("%s", err) - panic(err) - } - internal.LogInfo("Broker %d is recovered", flags.nodeId) - }, -} diff --git a/go-chaos/cmd/deploy.go b/go-chaos/cmd/deploy.go index e293fb7ca..1da510aa4 100644 --- a/go-chaos/cmd/deploy.go +++ b/go-chaos/cmd/deploy.go @@ -20,107 +20,108 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(deployCmd) - - deployCmd.AddCommand(deployProcessModelCmd) - deployProcessModelCmd.Flags().StringVar(&flags.processModelPath, "processModelPath", "", - "Specify the path to a BPMN process model, which should be deployed. Defaults to a benchmark process model with one task (included in zbchaos). If the path starts with 'bpmn/' zbchaos will look for a referenced model bundled within the cli, like: 'bpmn/one_task.bpmn'.") - - deployCmd.AddCommand(deployMultiVersionProcessModelCmd) - deployMultiVersionProcessModelCmd.Flags().IntVar(&flags.versionCount, "versionCount", 10, - "Specify how many different versions of a default BPMN and DMN model should be deployed. Useful for testing deployment distribution.") - - deployCmd.AddCommand(deployWorkerCmd) - deployCmd.AddCommand(deployChaosModels) -} - -var deployCmd = &cobra.Command{ - Use: "deploy", - Short: "Deploy certain resource", - Long: `Deploy certain resource, like process model(s) or kubernetes manifest.`, -} - -var deployProcessModelCmd = &cobra.Command{ - Use: "process", - Short: "Deploy a process model to Zeebe", - Long: `Deploy a process model to Zeebe. +func AddDeployCmd(rootCmd *cobra.Command, flags Flags) { + + var deployCmd = &cobra.Command{ + Use: "deploy", + Short: "Deploy certain resource", + Long: `Deploy certain resource, like process model(s) or kubernetes manifest.`, + } + + var deployProcessModelCmd = &cobra.Command{ + Use: "process", + Short: "Deploy a process model to Zeebe", + Long: `Deploy a process model to Zeebe. Can be used to deploy a specific process model or multiple version of a default BPMN and DMN model. Defaults to the later, which is useful for experimenting with deployment distribution.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + ensureNoError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() - zbClient, err := internal.CreateZeebeClient(port) - ensureNoError(err) - defer zbClient.Close() + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() - processDefinitionKey, err := internal.DeployModel(zbClient, flags.processModelPath) - ensureNoError(err) + processDefinitionKey, err := internal.DeployModel(zbClient, flags.processModelPath) + ensureNoError(err) - internal.LogInfo("Deployed given process model %s, under key %d!", flags.processModelPath, processDefinitionKey) - }, -} + internal.LogInfo("Deployed given process model %s, under key %d!", flags.processModelPath, processDefinitionKey) + }, + } -var deployMultiVersionProcessModelCmd = &cobra.Command{ - Use: "multi-version", - Short: "Deploy multiple versions to Zeebe", - Long: `Deploy multiple versions of process and dmn models to Zeebe. + var deployMultiVersionProcessModelCmd = &cobra.Command{ + Use: "multi-version", + Short: "Deploy multiple versions to Zeebe", + Long: `Deploy multiple versions of process and dmn models to Zeebe. Useful for experimenting with deployment distribution.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() - - zbClient, err := internal.CreateZeebeClient(port) - ensureNoError(err) - defer zbClient.Close() - - err = internal.DeployDifferentVersions(zbClient, int32(flags.versionCount)) - ensureNoError(err) - internal.LogInfo("Deployed different process models of different types and versions to zeebe!") - }, -} - -var deployWorkerCmd = &cobra.Command{ - Use: "worker", - Short: "Deploy a worker deployment to the Zeebe cluster", - Long: `Deploy a worker deployment to the Zeebe cluster. + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() + + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() + + err = internal.DeployDifferentVersions(zbClient, int32(flags.versionCount)) + ensureNoError(err) + internal.LogInfo("Deployed different process models of different types and versions to zeebe!") + }, + } + + var deployWorkerCmd = &cobra.Command{ + Use: "worker", + Short: "Deploy a worker deployment to the Zeebe cluster", + Long: `Deploy a worker deployment to the Zeebe cluster. The workers can be used as part of some chaos experiments to complete process instances etc.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + ensureNoError(err) - err = k8Client.CreateWorkerDeployment() - ensureNoError(err) + err = k8Client.CreateWorkerDeployment() + ensureNoError(err) - internal.LogInfo("Worker successfully deployed to the current namespace: %s", k8Client.GetCurrentNamespace()) - }, -} + internal.LogInfo("Worker successfully deployed to the current namespace: %s", k8Client.GetCurrentNamespace()) + }, + } -var deployChaosModels = &cobra.Command{ - Use: "chaos", - Short: "Deploy all chaos BPMN models to the Zeebe cluster", - Long: `Deploy all chaos BPMN models to the to the Zeebe cluster. + var deployChaosModels = &cobra.Command{ + Use: "chaos", + Short: "Deploy all chaos BPMN models to the Zeebe cluster", + Long: `Deploy all chaos BPMN models to the to the Zeebe cluster. The process models allow to execute chaos experiments.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + zbClient, closeFn, err := backend.ConnectToZeebeCluster(k8Client) + ensureNoError(err) + defer closeFn() + + err = internal.DeployChaosModels(zbClient) + ensureNoError(err) + + internal.LogInfo("Deployed successfully process models to run chaos experiments") + }, + } + + rootCmd.AddCommand(deployCmd) - zbClient, closeFn, err := backend.ConnectToZeebeCluster(k8Client) - ensureNoError(err) - defer closeFn() + deployCmd.AddCommand(deployProcessModelCmd) + deployProcessModelCmd.Flags().StringVar(&flags.processModelPath, "processModelPath", "", + "Specify the path to a BPMN process model, which should be deployed. Defaults to a benchmark process model with one task (included in zbchaos). If the path starts with 'bpmn/' zbchaos will look for a referenced model bundled within the cli, like: 'bpmn/one_task.bpmn'.") - err = internal.DeployChaosModels(zbClient) - ensureNoError(err) + deployCmd.AddCommand(deployMultiVersionProcessModelCmd) + deployMultiVersionProcessModelCmd.Flags().IntVar(&flags.versionCount, "versionCount", 10, + "Specify how many different versions of a default BPMN and DMN model should be deployed. Useful for testing deployment distribution.") - internal.LogInfo("Deployed successfully process models to run chaos experiments") - }, + deployCmd.AddCommand(deployWorkerCmd) + deployCmd.AddCommand(deployChaosModels) } diff --git a/go-chaos/cmd/disconnect.go b/go-chaos/cmd/disconnect.go index 56d077577..49daf8c43 100644 --- a/go-chaos/cmd/disconnect.go +++ b/go-chaos/cmd/disconnect.go @@ -19,10 +19,60 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/backend" ) -var ( -) +func ensureNoError(err error) { + if err != nil { + panic(err) + } +} + +func AddDisconnectCommand(rootCmd *cobra.Command, flags Flags) { + + var disconnect = &cobra.Command{ + Use: "disconnect", + Short: "Disconnect Zeebe nodes", + Long: `Disconnect Zeebe nodes, uses sub-commands to disconnect leaders, followers, etc.`, + } + + var disconnectBrokers = &cobra.Command{ + Use: "brokers", + Short: "Disconnect Zeebe Brokers", + Long: `Disconnect Zeebe Brokers with a given partition and role.`, + Run: func(cmd *cobra.Command, args []string) { + err := backend.DisconnectBroker(backend.DisconnectBrokerCfg{ + Broker1Cfg: backend.Broker{ + NodeId: flags.broker1NodeId, + PartitionId: flags.broker1PartitionId, + Role: flags.broker1Role, + }, + Broker2Cfg: backend.Broker{ + NodeId: flags.broker2NodeId, + PartitionId: flags.broker2PartitionId, + Role: flags.broker2Role, + }, + OneDirection: flags.oneDirection, + }) + ensureNoError(err) + }, + } + + var disconnectGateway = &cobra.Command{ + Use: "gateway", + Short: "Disconnect Zeebe Gateway", + Long: `Disconnect Zeebe Gateway from Broker with a given partition and role.`, + Run: func(cmd *cobra.Command, args []string) { + err := backend.DisconnectGateway(backend.DisconnectGatewayCfg{ + OneDirection: flags.oneDirection, + DisconnectToAll: flags.disconnectToAll, + BrokerCfg: backend.Broker{ + Role: flags.role, + PartitionId: flags.partitionId, + NodeId: flags.nodeId, + }, + }) + ensureNoError(err) + }, + } -func init() { rootCmd.AddCommand(disconnect) // disconnect brokers @@ -49,55 +99,3 @@ func init() { disconnectGateway.Flags().BoolVar(&flags.disconnectToAll, "all", false, "Specify whether the gateway should be disconnected to all brokers") disconnectGateway.MarkFlagsMutuallyExclusive("all", "partitionId", "nodeId") } - -var disconnect = &cobra.Command{ - Use: "disconnect", - Short: "Disconnect Zeebe nodes", - Long: `Disconnect Zeebe nodes, uses sub-commands to disconnect leaders, followers, etc.`, -} - -func ensureNoError(err error) { - if err != nil { - panic(err) - } -} - -var disconnectBrokers = &cobra.Command{ - Use: "brokers", - Short: "Disconnect Zeebe Brokers", - Long: `Disconnect Zeebe Brokers with a given partition and role.`, - Run: func(cmd *cobra.Command, args []string) { - err := backend.DisconnectBroker(backend.DisconnectBrokerCfg{ - Broker1Cfg: backend.Broker{ - NodeId: flags.broker1NodeId, - PartitionId: flags.broker1PartitionId, - Role: flags.broker1Role, - }, - Broker2Cfg: backend.Broker{ - NodeId: flags.broker2NodeId, - PartitionId: flags.broker2PartitionId, - Role: flags.broker2Role, - }, - OneDirection: flags.oneDirection, - }) - ensureNoError(err) - }, -} - -var disconnectGateway = &cobra.Command{ - Use: "gateway", - Short: "Disconnect Zeebe Gateway", - Long: `Disconnect Zeebe Gateway from Broker with a given partition and role.`, - Run: func(cmd *cobra.Command, args []string) { - err := backend.DisconnectGateway(backend.DisconnectGatewayCfg{ - OneDirection: flags.oneDirection, - DisconnectToAll: flags.disconnectToAll, - BrokerCfg: backend.Broker{ - Role: flags.role, - PartitionId: flags.partitionId, - NodeId: flags.nodeId, - }, - }) - ensureNoError(err) - }, -} diff --git a/go-chaos/cmd/exporting.go b/go-chaos/cmd/exporting.go index 85f2d0de7..0fbea11f1 100644 --- a/go-chaos/cmd/exporting.go +++ b/go-chaos/cmd/exporting.go @@ -22,29 +22,29 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(exportingCommand) +func AddExportingCmds(rootCmd *cobra.Command) { + var exportingCommand = &cobra.Command{ + Use: "exporting", + Short: "Controls Zeebe Exporting", + Long: "Can be used to start and stop exporting", + } - exportingCommand.AddCommand(pauseExportingCommand) - exportingCommand.AddCommand(resumeExportingCommand) -} + var pauseExportingCommand = &cobra.Command{ + Use: "pause", + Short: "Pause exporting on all partitions", + RunE: pauseExporting, + } -var exportingCommand = &cobra.Command{ - Use: "exporting", - Short: "Controls Zeebe Exporting", - Long: "Can be used to start and stop exporting", -} + var resumeExportingCommand = &cobra.Command{ + Use: "resume", + Short: "Resume exporting on all partitions", + RunE: resumeExporting, + } -var pauseExportingCommand = &cobra.Command{ - Use: "pause", - Short: "Pause exporting on all partitions", - RunE: pauseExporting, -} + rootCmd.AddCommand(exportingCommand) -var resumeExportingCommand = &cobra.Command{ - Use: "resume", - Short: "Resume exporting on all partitions", - RunE: resumeExporting, + exportingCommand.AddCommand(pauseExportingCommand) + exportingCommand.AddCommand(resumeExportingCommand) } func pauseExporting(cmd *cobra.Command, args []string) error { diff --git a/go-chaos/cmd/publish.go b/go-chaos/cmd/publish.go index 835fe110f..d4f64258d 100644 --- a/go-chaos/cmd/publish.go +++ b/go-chaos/cmd/publish.go @@ -22,41 +22,42 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(publishCmd) - publishCmd.Flags().IntVar(&flags.partitionId, "partitionId", 1, "Specify the id of the partition") - publishCmd.Flags().StringVar(&flags.msgName, "msgName", "msg", "Specify the name of the message, which should be published.") -} +func AddPublishCmd(rootCmd *cobra.Command, flags Flags) { -var publishCmd = &cobra.Command{ - Use: "publish", - Short: "Publish a message", - Long: `Publish a message to a certain partition.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - panicOnError(err) + var publishCmd = &cobra.Command{ + Use: "publish", + Short: "Publish a message", + Long: `Publish a message to a certain partition.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + panicOnError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() - zbClient, err := internal.CreateZeebeClient(port) - panicOnError(err) - defer zbClient.Close() + zbClient, err := internal.CreateZeebeClient(port) + panicOnError(err) + defer zbClient.Close() - topology, err := internal.GetTopology(zbClient) - panicOnError(err) + topology, err := internal.GetTopology(zbClient) + panicOnError(err) - correlationKey, err := internal.FindCorrelationKeyForPartition(flags.partitionId, int(topology.PartitionsCount)) - panicOnError(err) + correlationKey, err := internal.FindCorrelationKeyForPartition(flags.partitionId, int(topology.PartitionsCount)) + panicOnError(err) - internal.LogVerbose("Send message '%s', with correaltion key '%s' (ASCII: %d) ", flags.msgName, correlationKey, int(correlationKey[0])) + internal.LogVerbose("Send message '%s', with correaltion key '%s' (ASCII: %d) ", flags.msgName, correlationKey, int(correlationKey[0])) - messageResponse, err := zbClient.NewPublishMessageCommand().MessageName(flags.msgName).CorrelationKey(correlationKey).TimeToLive(time.Minute * 5).Send(context.TODO()) - partitionIdFromKey := internal.ExtractPartitionIdFromKey(messageResponse.Key) + messageResponse, err := zbClient.NewPublishMessageCommand().MessageName(flags.msgName).CorrelationKey(correlationKey).TimeToLive(time.Minute * 5).Send(context.TODO()) + partitionIdFromKey := internal.ExtractPartitionIdFromKey(messageResponse.Key) - internal.LogInfo("Message was sent and returned key %d, which corresponds to partition: %d", messageResponse.Key, partitionIdFromKey) - }, + internal.LogInfo("Message was sent and returned key %d, which corresponds to partition: %d", messageResponse.Key, partitionIdFromKey) + }, + } + + rootCmd.AddCommand(publishCmd) + publishCmd.Flags().IntVar(&flags.partitionId, "partitionId", 1, "Specify the id of the partition") + publishCmd.Flags().StringVar(&flags.msgName, "msgName", "msg", "Specify the name of the message, which should be published.") } func panicOnError(err error) { diff --git a/go-chaos/cmd/restart.go b/go-chaos/cmd/restart.go index c6060c018..f57f59d2a 100644 --- a/go-chaos/cmd/restart.go +++ b/go-chaos/cmd/restart.go @@ -19,7 +19,43 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { +func AddRestartCmd(rootCmd *cobra.Command, flags Flags) { + + var restartCmd = &cobra.Command{ + Use: "restart", + Short: "Restarts a Zeebe node", + Long: `Restarts a Zeebe node, it can be chosen between: broker, gateway or a worker.`, + } + + var restartBrokerCmd = &cobra.Command{ + Use: "broker", + Short: "Restarts a Zeebe broker", + Long: `Restarts a Zeebe broker with a certain role and given partition.`, + Run: func(cmd *cobra.Command, args []string) { + brokerPod := restartBroker(flags.nodeId, flags.partitionId, flags.role, nil) + internal.LogInfo("Restarted %s", brokerPod) + }, + } + + var restartGatewayCmd = &cobra.Command{ + Use: "gateway", + Short: "Restarts a Zeebe gateway", + Long: `Restarts a Zeebe gateway.`, + Run: func(cmd *cobra.Command, args []string) { + gatewayPod := restartGateway(nil) + internal.LogInfo("Restarted %s", gatewayPod) + }, + } + + var restartWorkerCmd = &cobra.Command{ + Use: "worker", + Short: "Restart a Zeebe worker", + Long: `Restart a Zeebe worker.`, + Run: func(cmd *cobra.Command, args []string) { + restartWorker(flags.all, "Restarted", nil) + }, + } + rootCmd.AddCommand(restartCmd) restartCmd.AddCommand(restartBrokerCmd) restartBrokerCmd.Flags().StringVar(&flags.role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER, INACTIVE]") @@ -31,38 +67,3 @@ func init() { restartCmd.AddCommand(restartWorkerCmd) restartWorkerCmd.Flags().BoolVar(&flags.all, "all", false, "Specify whether all workers should be restarted") } - -var restartCmd = &cobra.Command{ - Use: "restart", - Short: "Restarts a Zeebe node", - Long: `Restarts a Zeebe node, it can be chosen between: broker, gateway or a worker.`, -} - -var restartBrokerCmd = &cobra.Command{ - Use: "broker", - Short: "Restarts a Zeebe broker", - Long: `Restarts a Zeebe broker with a certain role and given partition.`, - Run: func(cmd *cobra.Command, args []string) { - brokerPod := restartBroker(flags.nodeId, flags.partitionId, flags.role, nil) - internal.LogInfo("Restarted %s", brokerPod) - }, -} - -var restartGatewayCmd = &cobra.Command{ - Use: "gateway", - Short: "Restarts a Zeebe gateway", - Long: `Restarts a Zeebe gateway.`, - Run: func(cmd *cobra.Command, args []string) { - gatewayPod := restartGateway(nil) - internal.LogInfo("Restarted %s", gatewayPod) - }, -} - -var restartWorkerCmd = &cobra.Command{ - Use: "worker", - Short: "Restart a Zeebe worker", - Long: `Restart a Zeebe worker.`, - Run: func(cmd *cobra.Command, args []string) { - restartWorker(flags.all, "Restarted", nil) - }, -} diff --git a/go-chaos/cmd/root.go b/go-chaos/cmd/root.go index f0f76bd2c..bbae0608b 100644 --- a/go-chaos/cmd/root.go +++ b/go-chaos/cmd/root.go @@ -65,38 +65,8 @@ type Flags struct { timeoutInSec int } -func (f *Flags) reset() { - f.partitionId = 0 - f.role = "" - f.nodeId = 0 - f.processModelPath = "" - f.versionCount = 0 - f.variables = "" - f.msgName = "" - f.awaitResult = false - f.broker1PartitionId = 0 - f.broker1Role = "" - f.broker1NodeId = 0 - f.broker2PartitionId = 0 - f.broker2Role = "" - f.broker2NodeId = 0 - f.backupId = "" - f.oneDirection = false - f.disconnectToAll = false - f.cpuStress = false - f.memoryStress = false - f.ioStress = false - f.timeoutSec = "" - f.all = false - f.version = 0 - f.bpmnProcessId = "" - f.timeoutInSec = 0 -} - -var flags = Flags{} - var Version = "development" -var Commit = "HEAD" +var Commit = "HEAD" var Verbose bool var KubeConfigPath string var Namespace string @@ -105,30 +75,32 @@ var ClientSecret string var Audience string var JsonLogging bool -var rootCmd = &cobra.Command{ - Use: "zbchaos", - Short: "Zeebe chaos is a chaos experiment tool for Zeebe", - Long: `A chaos experimenting toolkit for Zeebe. +func NewCmd() *cobra.Command { + flags := Flags{} + + rootCmd := &cobra.Command{ + Use: "zbchaos", + Short: "Zeebe chaos is a chaos experiment tool for Zeebe", + Long: `A chaos experimenting toolkit for Zeebe. Perfect to inject some chaos into your brokers and gateways.`, - PersistentPreRun: func(cmd *cobra.Command, args []string) { - internal.Verbosity = Verbose - internal.JsonLogging = JsonLogging - if JsonLogging { - internal.JsonLogger = log.With().Logger() - } - internal.Namespace = Namespace - internal.KubeConfigPath = KubeConfigPath - if ClientId != "" && ClientSecret != "" { - internal.ZeebeClientCredential, _ = zbc.NewOAuthCredentialsProvider(&zbc.OAuthProviderConfig{ - ClientID: ClientId, - ClientSecret: ClientSecret, - Audience: Audience, - }) - } - }, -} + PersistentPreRun: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + internal.JsonLogging = JsonLogging + if JsonLogging { + internal.JsonLogger = log.With().Logger() + } + internal.Namespace = Namespace + internal.KubeConfigPath = KubeConfigPath + if ClientId != "" && ClientSecret != "" { + internal.ZeebeClientCredential, _ = zbc.NewOAuthCredentialsProvider(&zbc.OAuthProviderConfig{ + ClientID: ClientId, + ClientSecret: ClientSecret, + Audience: Audience, + }) + } + }, + } -func init() { rootCmd.PersistentFlags().BoolVarP(&Verbose, "verbose", "v", false, "verbose output") rootCmd.PersistentFlags().BoolVarP(&JsonLogging, "jsonLogging", "", false, "json logging output") rootCmd.PersistentFlags().StringVar(&KubeConfigPath, "kubeconfig", "", "path the the kube config that will be used") @@ -136,14 +108,28 @@ func init() { rootCmd.PersistentFlags().StringVarP(&ClientId, "clientId", "c", "", "connect using the given clientId") rootCmd.PersistentFlags().StringVar(&ClientSecret, "clientSecret", "", "connect using the given client secret") rootCmd.PersistentFlags().StringVar(&Audience, "audience", "", "connect using the given client secret") -} -func NewCmd() *cobra.Command { + AddBackupCommand(rootCmd, flags) + AddBrokersCommand(rootCmd) + AddConnectCmd(rootCmd) + AddDatalossSimulationCmd(rootCmd, flags) + AddDeployCmd(rootCmd, flags) + AddDisconnectCommand(rootCmd, flags) + AddExportingCmds(rootCmd) + AddPublishCmd(rootCmd, flags) + AddRestartCmd(rootCmd, flags) + AddStressCmd(rootCmd, flags) + AddTerminateCommand(rootCmd, flags) + AddTopologyCmd(rootCmd) + AddVerifyCommands(rootCmd, flags) + AddVersionCmd(rootCmd) + AddWorkerCmd(rootCmd) + return rootCmd } func Execute() { - if err := rootCmd.Execute(); err != nil { + if err := NewCmd().Execute(); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } diff --git a/go-chaos/cmd/stress.go b/go-chaos/cmd/stress.go index f32cc8267..bf7f5dfb0 100644 --- a/go-chaos/cmd/stress.go +++ b/go-chaos/cmd/stress.go @@ -24,12 +24,58 @@ import ( v1 "k8s.io/api/core/v1" ) -var ( -) +func AddStressCmd(rootCmd *cobra.Command, flags Flags) { + stress := &cobra.Command{ + Use: "stress", + Short: "Put stress on a Zeebe node", + Long: `Put stress on a Zeebe node. Node can be choose from gateway or brokers. Stress can be of different kind: memory, io or CPU. The different stress types can be combined.`, + } -func init() { - rootCmd.AddCommand(stress) + stressBroker := &cobra.Command{ + Use: "broker", + Short: "Put stress on a Zeebe Broker", + Long: `Put stress on a Zeebe Broker. Broker can be identified via ID or partition and role. Stress can be of different kinds: memory, io or CPU.`, + Run: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() + + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() + + pod := getBrokerPod(k8Client, zbClient, flags.nodeId, flags.partitionId, flags.role) + internal.LogInfo("Put stress on %s", pod.Name) + + stressType := internal.StressType{CpuStress: flags.cpuStress, IoStress: flags.ioStress, MemStress: flags.memoryStress} + err = internal.PutStressOnPod(k8Client, flags.timeoutSec, pod.Name, stressType) + ensureNoError(err) + }, + } + + stressGateway := &cobra.Command{ + Use: "gateway", + Short: "Put stress on a Zeebe Gateway", + Long: `Put stress on a Zeebe Gateway. Stress can be of different kinds: memory, io or CPU.`, + Run: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + pod := getGatewayPod(k8Client) + internal.LogInfo("Put stress on %s", pod.Name) + + stressType := internal.StressType{CpuStress: flags.cpuStress, IoStress: flags.ioStress, MemStress: flags.memoryStress} + err = internal.PutStressOnPod(k8Client, flags.timeoutSec, pod.Name, stressType) + ensureNoError(err) + }, + } + rootCmd.AddCommand(stress) stress.PersistentFlags().BoolVar(&flags.cpuStress, "cpu", true, "Specify whether CPU stress should put on the node") stress.PersistentFlags().BoolVar(&flags.memoryStress, "memory", false, "Specify whether memory stress should put on the node") stress.PersistentFlags().BoolVar(&flags.ioStress, "io", false, "Specify whether io stress should put on the node") @@ -46,56 +92,6 @@ func init() { } -var stress = &cobra.Command{ - Use: "stress", - Short: "Put stress on a Zeebe node", - Long: `Put stress on a Zeebe node. Node can be choose from gateway or brokers. Stress can be of different kind: memory, io or CPU. The different stress types can be combined.`, -} - -var stressBroker = &cobra.Command{ - Use: "broker", - Short: "Put stress on a Zeebe Broker", - Long: `Put stress on a Zeebe Broker. Broker can be identified via ID or partition and role. Stress can be of different kinds: memory, io or CPU.`, - Run: func(cmd *cobra.Command, args []string) { - internal.Verbosity = Verbose - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() - - zbClient, err := internal.CreateZeebeClient(port) - ensureNoError(err) - defer zbClient.Close() - - pod := getBrokerPod(k8Client, zbClient, flags.nodeId, flags.partitionId, flags.role) - internal.LogInfo("Put stress on %s", pod.Name) - - stressType := internal.StressType{CpuStress: flags.cpuStress, IoStress: flags.ioStress, MemStress: flags.memoryStress} - err = internal.PutStressOnPod(k8Client, flags.timeoutSec, pod.Name, stressType) - ensureNoError(err) - }, -} - -var stressGateway = &cobra.Command{ - Use: "gateway", - Short: "Put stress on a Zeebe Gateway", - Long: `Put stress on a Zeebe Gateway. Stress can be of different kinds: memory, io or CPU.`, - Run: func(cmd *cobra.Command, args []string) { - internal.Verbosity = Verbose - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - pod := getGatewayPod(k8Client) - internal.LogInfo("Put stress on %s", pod.Name) - - stressType := internal.StressType{CpuStress: flags.cpuStress, IoStress: flags.ioStress, MemStress: flags.memoryStress} - err = internal.PutStressOnPod(k8Client, flags.timeoutSec, pod.Name, stressType) - ensureNoError(err) - }, -} - func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId int, brokerPartitionId int, brokerRole string) *v1.Pod { var brokerPod *v1.Pod var err error @@ -106,7 +102,7 @@ func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId } else { brokerPod, err = internal.GetBrokerPodForPartitionAndRole(k8Client, zbClient, brokerPartitionId, brokerRole) ensureNoError(err) - internal.LogVerbose("Found Broker %s as %s for partition %d.", brokerPod.Name, flags.role, brokerPartitionId) + internal.LogVerbose("Found Broker %s as %s for partition %d.", brokerPod.Name, brokerRole, brokerPartitionId) } return brokerPod diff --git a/go-chaos/cmd/terminate.go b/go-chaos/cmd/terminate.go index c8b7b5b29..a4acc44f5 100644 --- a/go-chaos/cmd/terminate.go +++ b/go-chaos/cmd/terminate.go @@ -22,8 +22,46 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) +func AddTerminateCommand(rootCmd *cobra.Command, flags Flags) { + + var terminateCmd = &cobra.Command{ + Use: "terminate", + Short: "Terminates a Zeebe node", + Long: `Terminates a Zeebe node, it can be chosen between: broker, gateway or a worker.`, + } + + var terminateBrokerCmd = &cobra.Command{ + Use: "broker", + Short: "Terminates a Zeebe broker", + Long: `Terminates a Zeebe broker with a certain role and given partition.`, + Run: func(cmd *cobra.Command, args []string) { + gracePeriodSec := int64(0) + brokerName := restartBroker(flags.nodeId, flags.partitionId, flags.role, &gracePeriodSec) + internal.LogInfo("Terminated %s", brokerName) + }, + } + + var terminateGatewayCmd = &cobra.Command{ + Use: "gateway", + Short: "Terminates a Zeebe gateway", + Long: `Terminates a Zeebe gateway.`, + Run: func(cmd *cobra.Command, args []string) { + gracePeriodSec := int64(0) + gatewayPod := restartGateway(&gracePeriodSec) + internal.LogInfo("Terminated %s", gatewayPod) + }, + } + + var terminateWorkerCmd = &cobra.Command{ + Use: "worker", + Short: "Terminates a Zeebe worker", + Long: `Terminates a Zeebe worker.`, + Run: func(cmd *cobra.Command, args []string) { + gracePeriodSec := int64(0) + restartWorker(flags.all, "Terminated", &gracePeriodSec) + }, + } -func init() { rootCmd.AddCommand(terminateCmd) terminateCmd.AddCommand(terminateBrokerCmd) @@ -36,44 +74,7 @@ func init() { terminateCmd.AddCommand(terminateWorkerCmd) terminateWorkerCmd.Flags().BoolVar(&flags.all, "all", false, "Specify whether all workers should be terminated") -} - -var terminateCmd = &cobra.Command{ - Use: "terminate", - Short: "Terminates a Zeebe node", - Long: `Terminates a Zeebe node, it can be chosen between: broker, gateway or a worker.`, -} - -var terminateBrokerCmd = &cobra.Command{ - Use: "broker", - Short: "Terminates a Zeebe broker", - Long: `Terminates a Zeebe broker with a certain role and given partition.`, - Run: func(cmd *cobra.Command, args []string) { - gracePeriodSec := int64(0) - brokerName := restartBroker(flags.nodeId, flags.partitionId, flags.role, &gracePeriodSec) - internal.LogInfo("Terminated %s", brokerName) - }, -} - -var terminateGatewayCmd = &cobra.Command{ - Use: "gateway", - Short: "Terminates a Zeebe gateway", - Long: `Terminates a Zeebe gateway.`, - Run: func(cmd *cobra.Command, args []string) { - gracePeriodSec := int64(0) - gatewayPod := restartGateway(&gracePeriodSec) - internal.LogInfo("Terminated %s", gatewayPod) - }, -} -var terminateWorkerCmd = &cobra.Command{ - Use: "worker", - Short: "Terminates a Zeebe worker", - Long: `Terminates a Zeebe worker.`, - Run: func(cmd *cobra.Command, args []string) { - gracePeriodSec := int64(0) - restartWorker(flags.all, "Terminated", &gracePeriodSec) - }, } // Restart a broker pod. Pod is identified either by nodeId or by partitionId and role. diff --git a/go-chaos/cmd/topology.go b/go-chaos/cmd/topology.go index ef2ed0ac7..179d2559c 100644 --- a/go-chaos/cmd/topology.go +++ b/go-chaos/cmd/topology.go @@ -26,38 +26,38 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(topologyCmd) -} +func AddTopologyCmd(rootCmd *cobra.Command) { -var topologyCmd = &cobra.Command{ - Use: "topology", - Short: "Print the Zeebe topology deployed in the current namespace", - Long: `Shows the current Zeebe topology, in the current kubernetes namespace.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } + var topologyCmd = &cobra.Command{ + Use: "topology", + Short: "Print the Zeebe topology deployed in the current namespace", + Long: `Shows the current Zeebe topology, in the current kubernetes namespace.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() - client, err := internal.CreateZeebeClient(port) - if err != nil { - panic(err) - } + client, err := internal.CreateZeebeClient(port) + if err != nil { + panic(err) + } - response, err := client.NewTopologyCommand().Send(context.TODO()) - if err != nil { - panic(err) - } + response, err := client.NewTopologyCommand().Send(context.TODO()) + if err != nil { + panic(err) + } - builder := strings.Builder{} - writeTopologyToOutput(&builder, response) - internal.LogInfo(builder.String()) - }, + builder := strings.Builder{} + writeTopologyToOutput(&builder, response) + internal.LogInfo(builder.String()) + }, + } + rootCmd.AddCommand(topologyCmd) } func writeTopologyToOutput(output io.Writer, response *pb.TopologyResponse) { diff --git a/go-chaos/cmd/verify.go b/go-chaos/cmd/verify.go index d5a0d1f22..7b28296dd 100644 --- a/go-chaos/cmd/verify.go +++ b/go-chaos/cmd/verify.go @@ -21,8 +21,65 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) +func AddVerifyCommands(rootCmd *cobra.Command, flags Flags) { + + var verifyCmd = &cobra.Command{ + Use: "verify", + Short: "Verify certain properties", + Long: `Verify certain properties on Zeebe nodes, like readiness or steady-state.`, + } + + var verifyReadinessCmd = &cobra.Command{ + Use: "readiness", + Short: "Verify readiness of a Zeebe nodes", + Long: `Verifies the readiness of Zeebe nodes.`, + Run: func(cmd *cobra.Command, args []string) { + + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + err = k8Client.AwaitReadiness() + ensureNoError(err) + + internal.LogInfo("All Zeebe nodes are running.") + }, + } + + var verifyInstanceCreation = &cobra.Command{ + Use: "instance-creation", + Short: "Verify the instance creation", + Long: `Verifies that an instance from a specific process model can be created on a specific partition. +Process instances are created until the required partition is reached.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() + + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() + + processInstanceCreator, err := internal.CreateProcessInstanceCreator(zbClient, internal.ProcessInstanceCreationOptions{ + BpmnProcessId: flags.bpmnProcessId, + Version: int32(flags.version), + AwaitResult: flags.awaitResult, + Variables: flags.variables, + }) + ensureNoError(err) + if flags.awaitResult { + internal.LogVerbose("We await the result of the process instance creation, thus we skip the partition id check.") + flags.partitionId = 0 + } + err = internal.CreateProcessInstanceOnPartition(processInstanceCreator, int32(flags.partitionId), time.Duration(flags.timeoutInSec)*time.Second) + ensureNoError(err) + + internal.LogInfo("The steady-state was successfully verified!") + }, + } -func init() { rootCmd.AddCommand(verifyCmd) verifyCmd.AddCommand(verifyReadinessCmd) verifyCmd.AddCommand(verifyInstanceCreation) @@ -35,62 +92,4 @@ func init() { verifyInstanceCreation.Flags().StringVar(&flags.bpmnProcessId, "bpmnProcessId", "benchmark", "Specify the BPMN process ID for which the instance should be created.") verifyInstanceCreation.Flags().IntVar(&flags.version, "version", -1, "Specify the version for which the instance should be created, defaults to latest version.") - -} - -var verifyCmd = &cobra.Command{ - Use: "verify", - Short: "Verify certain properties", - Long: `Verify certain properties on Zeebe nodes, like readiness or steady-state.`, -} - -var verifyReadinessCmd = &cobra.Command{ - Use: "readiness", - Short: "Verify readiness of a Zeebe nodes", - Long: `Verifies the readiness of Zeebe nodes.`, - Run: func(cmd *cobra.Command, args []string) { - - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - err = k8Client.AwaitReadiness() - ensureNoError(err) - - internal.LogInfo("All Zeebe nodes are running.") - }, -} - -var verifyInstanceCreation = &cobra.Command{ - Use: "instance-creation", - Short: "Verify the instance creation", - Long: `Verifies that an instance from a specific process model can be created on a specific partition. -Process instances are created until the required partition is reached.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - ensureNoError(err) - - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) - defer closeFn() - - zbClient, err := internal.CreateZeebeClient(port) - ensureNoError(err) - defer zbClient.Close() - - processInstanceCreator, err := internal.CreateProcessInstanceCreator(zbClient, internal.ProcessInstanceCreationOptions{ - BpmnProcessId: flags.bpmnProcessId, - Version: int32(flags.version), - AwaitResult: flags.awaitResult, - Variables: flags.variables, - }) - ensureNoError(err) - if flags.awaitResult { - internal.LogVerbose("We await the result of the process instance creation, thus we skip the partition id check.") - flags.partitionId = 0 - } - err = internal.CreateProcessInstanceOnPartition(processInstanceCreator, int32(flags.partitionId), time.Duration(flags.timeoutInSec)*time.Second) - ensureNoError(err) - - internal.LogInfo("The steady-state was successfully verified!") - }, } diff --git a/go-chaos/cmd/version.go b/go-chaos/cmd/version.go index 36112c538..ced2f6f01 100644 --- a/go-chaos/cmd/version.go +++ b/go-chaos/cmd/version.go @@ -22,21 +22,21 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) - func VersionString() string { commit := Commit[0:int(math.Min(8, float64(len(Commit))))] return fmt.Sprintf("zbchaos %s (commit: %s)", Version, commit) } -var versionCmd = &cobra.Command{ - Use: "version", - Short: "Print the version of zbchaos", - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { - internal.LogInfo(VersionString()) - }, -} +func AddVersionCmd(rootCmd *cobra.Command) { + + var versionCmd = &cobra.Command{ + Use: "version", + Short: "Print the version of zbchaos", + Args: cobra.NoArgs, + Run: func(cmd *cobra.Command, args []string) { + internal.LogInfo(VersionString()) + }, + } -func init() { rootCmd.AddCommand(versionCmd) } diff --git a/go-chaos/cmd/worker.go b/go-chaos/cmd/worker.go index 836b2a9ff..82d4f9761 100644 --- a/go-chaos/cmd/worker.go +++ b/go-chaos/cmd/worker.go @@ -36,17 +36,16 @@ const ENV_CLIENT_ID = "CHAOS_AUTOMATION_CLUSTER_CLIENT_ID" const ENV_CLIENT_SECRET = "CHAOS_AUTOMATION_CLUSTER_CLIENT_SECRET" const ENV_ADDRESS = "CHAOS_AUTOMATION_CLUSTER_ADDRESS" -func init() { +func AddWorkerCmd(rootCmd *cobra.Command) { + var workerCommand = &cobra.Command{ + Use: "worker", + Short: "Starts a worker for zbchaos jobs", + Long: "Starts a worker for zbchaos jobs that executes zbchaos commands", + Run: start_worker, + } rootCmd.AddCommand(workerCommand) } -var workerCommand = &cobra.Command{ - Use: "worker", - Short: "Starts a worker for zbchaos jobs", - Long: "Starts a worker for zbchaos jobs that executes zbchaos commands", - Run: start_worker, -} - func start_worker(cmd *cobra.Command, args []string) { // The credentials are set via env var's. // We use here different names for the environment variables on purpose. @@ -94,9 +93,9 @@ func handleZbChaosJob(client zbworker.JobClient, job entities.Job) { func runZbChaosCommand(args []string, ctx context.Context) error { internal.LogInfo("Running command with args: %v ", args) - flags.reset() - rootCmd.SetArgs(args) - _, err := rootCmd.ExecuteContextC(ctx) + cmd := NewCmd() + cmd.SetArgs(args) + _, err := cmd.ExecuteContextC(ctx) if err != nil { return err } diff --git a/go-chaos/cmd/zeebePods.go b/go-chaos/cmd/zeebePods.go index 790bb7976..655c85936 100644 --- a/go-chaos/cmd/zeebePods.go +++ b/go-chaos/cmd/zeebePods.go @@ -19,27 +19,27 @@ import ( "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) -func init() { - rootCmd.AddCommand(getZeebeBrokersCmd) -} +func AddBrokersCommand(rootCmd *cobra.Command) { + var getZeebeBrokersCmd = &cobra.Command{ + Use: "brokers", + Short: "Print the name of the Zeebe broker pods", + Long: `Show all names of deployed Zeebe brokers, in the current kubernetes namespace.`, + Run: func(cmd *cobra.Command, args []string) { + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } -var getZeebeBrokersCmd = &cobra.Command{ - Use: "brokers", - Short: "Print the name of the Zeebe broker pods", - Long: `Show all names of deployed Zeebe brokers, in the current kubernetes namespace.`, - Run: func(cmd *cobra.Command, args []string) { - k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } + pods, err := k8Client.GetBrokerPodNames() + if err != nil { + panic(err) + } - pods, err := k8Client.GetBrokerPodNames() - if err != nil { - panic(err) - } + for _, item := range pods { + internal.LogInfo("%s", item) + } + }, + } - for _, item := range pods { - internal.LogInfo("%s", item) - } - }, + rootCmd.AddCommand(getZeebeBrokersCmd) }