From dd004240163b8e17760fc316c162b3aec1dd0cd5 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Thu, 7 Mar 2024 13:19:30 +0100 Subject: [PATCH 1/4] feat: add command for force scale down --- go-chaos/cmd/cluster.go | 57 ++++++++++++++++++++++++++++++++++++++++- go-chaos/cmd/root.go | 2 ++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/go-chaos/cmd/cluster.go b/go-chaos/cmd/cluster.go index c2786d8fe..e5d7ec6f2 100644 --- a/go-chaos/cmd/cluster.go +++ b/go-chaos/cmd/cluster.go @@ -53,15 +53,27 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) { return scaleCluster(flags) }, } + var forceFailoverCommand = &cobra.Command{ + Use: "forceFailover", + Short: "Force scale down the cluster", + RunE: func(cmd *cobra.Command, args []string) error { + return forceFailover(flags) + }, + } rootCmd.AddCommand(clusterCommand) clusterCommand.AddCommand(statusCommand) clusterCommand.AddCommand(waitCommand) + clusterCommand.AddCommand(forceFailoverCommand) waitCommand.Flags().Int64Var(&flags.changeId, "changeId", 0, "The id of the change to wait for") waitCommand.MarkFlagRequired("changeId") clusterCommand.AddCommand(scaleCommand) scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to") scaleCommand.MarkFlagRequired("brokers") + forceFailoverCommand.Flags().Int32Var(&flags.regions, "regions", 1, "The number of regions in the cluster") + forceFailoverCommand.Flags().Int32Var(&flags.regionId, "regionId", 0, "The id of the region to failover to") + forceFailoverCommand.MarkFlagRequired("regions") + forceFailoverCommand.MarkFlagRequired("regionId") } func scaleCluster(flags *Flags) error { @@ -123,11 +135,20 @@ func requestBrokerScaling(port int, brokers int) (*ChangeResponse, error) { for i := 0; i < brokers; i++ { brokerIds[i] = int32(i) } - url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers", port) + return sendScaleRequest(port, brokerIds, false) +} + +func sendScaleRequest(port int, brokerIds []int32, force bool) (*ChangeResponse, error) { + forceParam := "false" + if force { + forceParam = "true" + } + url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers?force=%s", port, forceParam) request, err := json.Marshal(brokerIds) if err != nil { return nil, err } + internal.LogInfo("Requesting scaling %s with input %s", url, request) resp, err := http.Post(url, "application/json", bytes.NewReader(request)) if err != nil { return nil, err @@ -222,6 +243,40 @@ func waitForChange(port int, changeId int64) error { return fmt.Errorf("change %d did not complete within 25 minutes", changeId) } +func forceFailover(flags *Flags) error { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + + port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) + defer closePortForward() + currentTopology, err := QueryTopology(port) + ensureNoError(err) + if currentTopology.PendingChange != nil { + return fmt.Errorf("cluster is already scaling") + } + + brokersInRegion := getBrokers(currentTopology, flags.regions, flags.regionId) + + changeResponse, err := sendScaleRequest(port, brokersInRegion, true) + ensureNoError(err) + + err = waitForChange(port, changeResponse.ChangeId) + ensureNoError(err) + + return nil +} + +func getBrokers(topology *CurrentTopology, regions int32, regionId int32) []int32 { + brokersInRegion := make([]int32, 0) + for _, b := range topology.Brokers { + if b.Id%regions == regionId { + brokersInRegion = append(brokersInRegion, b.Id) + } + } + + return brokersInRegion +} + type ChangeStatus string const ( diff --git a/go-chaos/cmd/root.go b/go-chaos/cmd/root.go index 51841fc38..0ab691306 100644 --- a/go-chaos/cmd/root.go +++ b/go-chaos/cmd/root.go @@ -80,6 +80,8 @@ type Flags struct { // cluster changeId int64 brokers int + regionId int32 + regions int32 } var Version = "development" From 5745c04370a7d9b4174120093c7f5cea0c2e6a67 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Thu, 7 Mar 2024 13:27:58 +0100 Subject: [PATCH 2/4] feat: allow changing replicationFactor when scaling --- go-chaos/cmd/cluster.go | 24 ++++++++++++++---------- go-chaos/cmd/root.go | 9 +++++---- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/go-chaos/cmd/cluster.go b/go-chaos/cmd/cluster.go index e5d7ec6f2..e9806ae8b 100644 --- a/go-chaos/cmd/cluster.go +++ b/go-chaos/cmd/cluster.go @@ -69,6 +69,7 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) { waitCommand.MarkFlagRequired("changeId") clusterCommand.AddCommand(scaleCommand) scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to") + scaleCommand.Flags().Int32Var(&flags.replicationFactor, "replicationFactor", -1, "The new replication factor") scaleCommand.MarkFlagRequired("brokers") forceFailoverCommand.Flags().Int32Var(&flags.regions, "regions", 1, "The number of regions in the cluster") forceFailoverCommand.Flags().Int32Var(&flags.regionId, "regionId", 0, "The id of the region to failover to") @@ -97,9 +98,9 @@ func scaleCluster(flags *Flags) error { } if len(currentTopology.Brokers) > flags.brokers { - _, err = scaleDownBrokers(k8Client, port, flags.brokers) + _, err = scaleDownBrokers(k8Client, port, flags.brokers, flags.replicationFactor) } else if len(currentTopology.Brokers) < flags.brokers { - _, err = scaleUpBrokers(k8Client, port, flags.brokers) + _, err = scaleUpBrokers(k8Client, port, flags.brokers, flags.replicationFactor) } else { return fmt.Errorf("cluster is already at size %d", flags.brokers) } @@ -108,8 +109,8 @@ func scaleCluster(flags *Flags) error { return nil } -func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) { - changeResponse, err := requestBrokerScaling(port, brokers) +func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) { + changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor) ensureNoError(err) _, err = k8Client.ScaleZeebeCluster(brokers) ensureNoError(err) @@ -117,8 +118,8 @@ func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeR return changeResponse, nil } -func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) { - changeResponse, err := requestBrokerScaling(port, brokers) +func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) { + changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor) ensureNoError(err) // Wait for brokers to leave before scaling down @@ -130,20 +131,23 @@ func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*Chang return changeResponse, nil } -func requestBrokerScaling(port int, brokers int) (*ChangeResponse, error) { +func requestBrokerScaling(port int, brokers int, replicationFactor int32) (*ChangeResponse, error) { brokerIds := make([]int32, brokers) for i := 0; i < brokers; i++ { brokerIds[i] = int32(i) } - return sendScaleRequest(port, brokerIds, false) + return sendScaleRequest(port, brokerIds, false, replicationFactor) } -func sendScaleRequest(port int, brokerIds []int32, force bool) (*ChangeResponse, error) { +func sendScaleRequest(port int, brokerIds []int32, force bool, replicationFactor int32) (*ChangeResponse, error) { forceParam := "false" if force { forceParam = "true" } url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers?force=%s", port, forceParam) + if replicationFactor > 0 { + url = url + fmt.Sprintf("&replicationFactor=%d", replicationFactor) + } request, err := json.Marshal(brokerIds) if err != nil { return nil, err @@ -257,7 +261,7 @@ func forceFailover(flags *Flags) error { brokersInRegion := getBrokers(currentTopology, flags.regions, flags.regionId) - changeResponse, err := sendScaleRequest(port, brokersInRegion, true) + changeResponse, err := sendScaleRequest(port, brokersInRegion, true, -1) ensureNoError(err) err = waitForChange(port, changeResponse.ChangeId) diff --git a/go-chaos/cmd/root.go b/go-chaos/cmd/root.go index 0ab691306..b02d3c012 100644 --- a/go-chaos/cmd/root.go +++ b/go-chaos/cmd/root.go @@ -78,10 +78,11 @@ type Flags struct { jobType string // cluster - changeId int64 - brokers int - regionId int32 - regions int32 + changeId int64 + brokers int + regionId int32 + regions int32 + replicationFactor int32 } var Version = "development" From f38ccd555d8e2955e18d20bf4378cb350476d548 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Thu, 7 Mar 2024 13:56:18 +0100 Subject: [PATCH 3/4] feat: optionally disable waiting for ready after dataloss recover --- go-chaos/cmd/dataloss_sim.go | 17 ++++++++++------- go-chaos/cmd/root.go | 3 +++ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/go-chaos/cmd/dataloss_sim.go b/go-chaos/cmd/dataloss_sim.go index b1c6a6812..decb3db6e 100644 --- a/go-chaos/cmd/dataloss_sim.go +++ b/go-chaos/cmd/dataloss_sim.go @@ -104,14 +104,16 @@ func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags *Flags) { panic(err) } - // The pod is restarting after dataloss, so it takes longer to be ready - err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute) - - if err != nil { - internal.LogInfo("%s", err) - panic(err) + if flags.awaitReadiness { + // The pod is restarting after dataloss, so it takes longer to be ready + err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute) + + if err != nil { + internal.LogInfo("%s", err) + panic(err) + } + internal.LogInfo("Broker %d is recovered", flags.nodeId) } - internal.LogInfo("Broker %d is recovered", flags.nodeId) }, } @@ -122,4 +124,5 @@ func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags *Flags) { datalossDelete.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker") datalossRecover.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker") + datalossRecover.Flags().BoolVar(&flags.awaitReadiness, "awaitReadiness", true, "If true wait until the recovered pod is ready") } diff --git a/go-chaos/cmd/root.go b/go-chaos/cmd/root.go index b02d3c012..d265abc2e 100644 --- a/go-chaos/cmd/root.go +++ b/go-chaos/cmd/root.go @@ -83,6 +83,9 @@ type Flags struct { regionId int32 regions int32 replicationFactor int32 + + // dataloss + awaitReadiness bool } var Version = "development" From ec28c2b093719a54f7d0c3fcbbb129472ddae732 Mon Sep 17 00:00:00 2001 From: Deepthi Devaki Akkoorath Date: Thu, 7 Mar 2024 14:36:37 +0100 Subject: [PATCH 4/4] refactor: do not wait for pod readiness before scaling Awaiting pod readiness can be added as a seperate step in the chaos tests. To reuse this command for failback in 2-region test, we are removing thsi from the command. During failback, the brokers in the other region will not be ready until the scale up is completed. --- go-chaos/cmd/cluster.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go-chaos/cmd/cluster.go b/go-chaos/cmd/cluster.go index e9806ae8b..bc074e19c 100644 --- a/go-chaos/cmd/cluster.go +++ b/go-chaos/cmd/cluster.go @@ -81,9 +81,6 @@ func scaleCluster(flags *Flags) error { k8Client, err := createK8ClientWithFlags(flags) ensureNoError(err) - err = k8Client.AwaitReadiness() - ensureNoError(err) - port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() currentTopology, err := QueryTopology(port)