diff --git a/go-chaos/cmd/cluster.go b/go-chaos/cmd/cluster.go index c2786d8fe..bc074e19c 100644 --- a/go-chaos/cmd/cluster.go +++ b/go-chaos/cmd/cluster.go @@ -53,24 +53,34 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) { return scaleCluster(flags) }, } + var forceFailoverCommand = &cobra.Command{ + Use: "forceFailover", + Short: "Force scale down the cluster", + RunE: func(cmd *cobra.Command, args []string) error { + return forceFailover(flags) + }, + } rootCmd.AddCommand(clusterCommand) clusterCommand.AddCommand(statusCommand) clusterCommand.AddCommand(waitCommand) + clusterCommand.AddCommand(forceFailoverCommand) waitCommand.Flags().Int64Var(&flags.changeId, "changeId", 0, "The id of the change to wait for") waitCommand.MarkFlagRequired("changeId") clusterCommand.AddCommand(scaleCommand) scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to") + scaleCommand.Flags().Int32Var(&flags.replicationFactor, "replicationFactor", -1, "The new replication factor") scaleCommand.MarkFlagRequired("brokers") + forceFailoverCommand.Flags().Int32Var(&flags.regions, "regions", 1, "The number of regions in the cluster") + forceFailoverCommand.Flags().Int32Var(&flags.regionId, "regionId", 0, "The id of the region to failover to") + forceFailoverCommand.MarkFlagRequired("regions") + forceFailoverCommand.MarkFlagRequired("regionId") } func scaleCluster(flags *Flags) error { k8Client, err := createK8ClientWithFlags(flags) ensureNoError(err) - err = k8Client.AwaitReadiness() - ensureNoError(err) - port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() currentTopology, err := QueryTopology(port) @@ -85,9 +95,9 @@ func scaleCluster(flags *Flags) error { } if len(currentTopology.Brokers) > flags.brokers { - _, err = scaleDownBrokers(k8Client, port, flags.brokers) + _, err = scaleDownBrokers(k8Client, port, flags.brokers, flags.replicationFactor) } else if len(currentTopology.Brokers) < flags.brokers { - _, err = scaleUpBrokers(k8Client, port, flags.brokers) + _, err = scaleUpBrokers(k8Client, port, flags.brokers, flags.replicationFactor) } else { return fmt.Errorf("cluster is already at size %d", flags.brokers) } @@ -96,8 +106,8 @@ func scaleCluster(flags *Flags) error { return nil } -func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) { - changeResponse, err := requestBrokerScaling(port, brokers) +func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) { + changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor) ensureNoError(err) _, err = k8Client.ScaleZeebeCluster(brokers) ensureNoError(err) @@ -105,8 +115,8 @@ func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeR return changeResponse, nil } -func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) { - changeResponse, err := requestBrokerScaling(port, brokers) +func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) { + changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor) ensureNoError(err) // Wait for brokers to leave before scaling down @@ -118,16 +128,28 @@ func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*Chang return changeResponse, nil } -func requestBrokerScaling(port int, brokers int) (*ChangeResponse, error) { +func requestBrokerScaling(port int, brokers int, replicationFactor int32) (*ChangeResponse, error) { brokerIds := make([]int32, brokers) for i := 0; i < brokers; i++ { brokerIds[i] = int32(i) } - url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers", port) + return sendScaleRequest(port, brokerIds, false, replicationFactor) +} + +func sendScaleRequest(port int, brokerIds []int32, force bool, replicationFactor int32) (*ChangeResponse, error) { + forceParam := "false" + if force { + forceParam = "true" + } + url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers?force=%s", port, forceParam) + if replicationFactor > 0 { + url = url + fmt.Sprintf("&replicationFactor=%d", replicationFactor) + } request, err := json.Marshal(brokerIds) if err != nil { return nil, err } + internal.LogInfo("Requesting scaling %s with input %s", url, request) resp, err := http.Post(url, "application/json", bytes.NewReader(request)) if err != nil { return nil, err @@ -222,6 +244,40 @@ func waitForChange(port int, changeId int64) error { return fmt.Errorf("change %d did not complete within 25 minutes", changeId) } +func forceFailover(flags *Flags) error { + k8Client, err := createK8ClientWithFlags(flags) + ensureNoError(err) + + port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) + defer closePortForward() + currentTopology, err := QueryTopology(port) + ensureNoError(err) + if currentTopology.PendingChange != nil { + return fmt.Errorf("cluster is already scaling") + } + + brokersInRegion := getBrokers(currentTopology, flags.regions, flags.regionId) + + changeResponse, err := sendScaleRequest(port, brokersInRegion, true, -1) + ensureNoError(err) + + err = waitForChange(port, changeResponse.ChangeId) + ensureNoError(err) + + return nil +} + +func getBrokers(topology *CurrentTopology, regions int32, regionId int32) []int32 { + brokersInRegion := make([]int32, 0) + for _, b := range topology.Brokers { + if b.Id%regions == regionId { + brokersInRegion = append(brokersInRegion, b.Id) + } + } + + return brokersInRegion +} + type ChangeStatus string const ( diff --git a/go-chaos/cmd/dataloss_sim.go b/go-chaos/cmd/dataloss_sim.go index b1c6a6812..decb3db6e 100644 --- a/go-chaos/cmd/dataloss_sim.go +++ b/go-chaos/cmd/dataloss_sim.go @@ -104,14 +104,16 @@ func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags *Flags) { panic(err) } - // The pod is restarting after dataloss, so it takes longer to be ready - err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute) - - if err != nil { - internal.LogInfo("%s", err) - panic(err) + if flags.awaitReadiness { + // The pod is restarting after dataloss, so it takes longer to be ready + err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute) + + if err != nil { + internal.LogInfo("%s", err) + panic(err) + } + internal.LogInfo("Broker %d is recovered", flags.nodeId) } - internal.LogInfo("Broker %d is recovered", flags.nodeId) }, } @@ -122,4 +124,5 @@ func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags *Flags) { datalossDelete.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker") datalossRecover.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker") + datalossRecover.Flags().BoolVar(&flags.awaitReadiness, "awaitReadiness", true, "If true wait until the recovered pod is ready") } diff --git a/go-chaos/cmd/root.go b/go-chaos/cmd/root.go index 51841fc38..d265abc2e 100644 --- a/go-chaos/cmd/root.go +++ b/go-chaos/cmd/root.go @@ -78,8 +78,14 @@ type Flags struct { jobType string // cluster - changeId int64 - brokers int + changeId int64 + brokers int + regionId int32 + regions int32 + replicationFactor int32 + + // dataloss + awaitReadiness bool } var Version = "development"