Skip to content

Commit

Permalink
Add commands for force failover to a region (#509)
Browse files Browse the repository at this point in the history
Related to camunda/zeebe-e2e-test#421 

This PR adds commands for 
1. Force failover to one region `zbchaos forceFailover --regions=2
--regionId=1`
    * Removes region 0 and keeps the brokers from region 1 
2. And adding the region back `zbchaos scale --brokers=4
--replicationFactor=4`
* Re-uses the scale command, and added the parameter `replicationFactor`

The commands are manually tested on a benchmark cluster.

Also adds an optional `awaitReadiness` flag to `dataloss recover`
command. When using the new API for failover, all brokers in the failed
region must be started in parallel. Waiting for one broker to be ready
before the others are started can prevent the failback from completing.
  • Loading branch information
deepthidevaki authored Mar 12, 2024
2 parents 54581d2 + ec28c2b commit e96c047
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 20 deletions.
78 changes: 67 additions & 11 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,34 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) {
return scaleCluster(flags)
},
}
var forceFailoverCommand = &cobra.Command{
Use: "forceFailover",
Short: "Force scale down the cluster",
RunE: func(cmd *cobra.Command, args []string) error {
return forceFailover(flags)
},
}

rootCmd.AddCommand(clusterCommand)
clusterCommand.AddCommand(statusCommand)
clusterCommand.AddCommand(waitCommand)
clusterCommand.AddCommand(forceFailoverCommand)
waitCommand.Flags().Int64Var(&flags.changeId, "changeId", 0, "The id of the change to wait for")
waitCommand.MarkFlagRequired("changeId")
clusterCommand.AddCommand(scaleCommand)
scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to")
scaleCommand.Flags().Int32Var(&flags.replicationFactor, "replicationFactor", -1, "The new replication factor")
scaleCommand.MarkFlagRequired("brokers")
forceFailoverCommand.Flags().Int32Var(&flags.regions, "regions", 1, "The number of regions in the cluster")
forceFailoverCommand.Flags().Int32Var(&flags.regionId, "regionId", 0, "The id of the region to failover to")
forceFailoverCommand.MarkFlagRequired("regions")
forceFailoverCommand.MarkFlagRequired("regionId")
}

func scaleCluster(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

err = k8Client.AwaitReadiness()
ensureNoError(err)

port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()
currentTopology, err := QueryTopology(port)
Expand All @@ -85,9 +95,9 @@ func scaleCluster(flags *Flags) error {
}

if len(currentTopology.Brokers) > flags.brokers {
_, err = scaleDownBrokers(k8Client, port, flags.brokers)
_, err = scaleDownBrokers(k8Client, port, flags.brokers, flags.replicationFactor)
} else if len(currentTopology.Brokers) < flags.brokers {
_, err = scaleUpBrokers(k8Client, port, flags.brokers)
_, err = scaleUpBrokers(k8Client, port, flags.brokers, flags.replicationFactor)
} else {
return fmt.Errorf("cluster is already at size %d", flags.brokers)
}
Expand All @@ -96,17 +106,17 @@ func scaleCluster(flags *Flags) error {
return nil
}

func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor)
ensureNoError(err)
_, err = k8Client.ScaleZeebeCluster(brokers)
ensureNoError(err)
waitForChange(port, changeResponse.ChangeId)
return changeResponse, nil
}

func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor)
ensureNoError(err)

// Wait for brokers to leave before scaling down
Expand All @@ -118,16 +128,28 @@ func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*Chang
return changeResponse, nil
}

func requestBrokerScaling(port int, brokers int) (*ChangeResponse, error) {
func requestBrokerScaling(port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
brokerIds := make([]int32, brokers)
for i := 0; i < brokers; i++ {
brokerIds[i] = int32(i)
}
url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers", port)
return sendScaleRequest(port, brokerIds, false, replicationFactor)
}

func sendScaleRequest(port int, brokerIds []int32, force bool, replicationFactor int32) (*ChangeResponse, error) {
forceParam := "false"
if force {
forceParam = "true"
}
url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers?force=%s", port, forceParam)
if replicationFactor > 0 {
url = url + fmt.Sprintf("&replicationFactor=%d", replicationFactor)
}
request, err := json.Marshal(brokerIds)
if err != nil {
return nil, err
}
internal.LogInfo("Requesting scaling %s with input %s", url, request)
resp, err := http.Post(url, "application/json", bytes.NewReader(request))
if err != nil {
return nil, err
Expand Down Expand Up @@ -222,6 +244,40 @@ func waitForChange(port int, changeId int64) error {
return fmt.Errorf("change %d did not complete within 25 minutes", changeId)
}

func forceFailover(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()
currentTopology, err := QueryTopology(port)
ensureNoError(err)
if currentTopology.PendingChange != nil {
return fmt.Errorf("cluster is already scaling")
}

brokersInRegion := getBrokers(currentTopology, flags.regions, flags.regionId)

changeResponse, err := sendScaleRequest(port, brokersInRegion, true, -1)
ensureNoError(err)

err = waitForChange(port, changeResponse.ChangeId)
ensureNoError(err)

return nil
}

func getBrokers(topology *CurrentTopology, regions int32, regionId int32) []int32 {
brokersInRegion := make([]int32, 0)
for _, b := range topology.Brokers {
if b.Id%regions == regionId {
brokersInRegion = append(brokersInRegion, b.Id)
}
}

return brokersInRegion
}

type ChangeStatus string

const (
Expand Down
17 changes: 10 additions & 7 deletions go-chaos/cmd/dataloss_sim.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags *Flags) {
panic(err)
}

// The pod is restarting after dataloss, so it takes longer to be ready
err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute)

if err != nil {
internal.LogInfo("%s", err)
panic(err)
if flags.awaitReadiness {
// The pod is restarting after dataloss, so it takes longer to be ready
err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute)

if err != nil {
internal.LogInfo("%s", err)
panic(err)
}
internal.LogInfo("Broker %d is recovered", flags.nodeId)
}
internal.LogInfo("Broker %d is recovered", flags.nodeId)
},
}

Expand All @@ -122,4 +124,5 @@ func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags *Flags) {

datalossDelete.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker")
datalossRecover.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker")
datalossRecover.Flags().BoolVar(&flags.awaitReadiness, "awaitReadiness", true, "If true wait until the recovered pod is ready")
}
10 changes: 8 additions & 2 deletions go-chaos/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,14 @@ type Flags struct {
jobType string

// cluster
changeId int64
brokers int
changeId int64
brokers int
regionId int32
regions int32
replicationFactor int32

// dataloss
awaitReadiness bool
}

var Version = "development"
Expand Down

0 comments on commit e96c047

Please sign in to comment.