Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add commands for force failover to a region #509

Merged
merged 4 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 67 additions & 11 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,34 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) {
return scaleCluster(flags)
},
}
var forceFailoverCommand = &cobra.Command{
Use: "forceFailover",
Short: "Force scale down the cluster",
RunE: func(cmd *cobra.Command, args []string) error {
return forceFailover(flags)
},
}

rootCmd.AddCommand(clusterCommand)
clusterCommand.AddCommand(statusCommand)
clusterCommand.AddCommand(waitCommand)
clusterCommand.AddCommand(forceFailoverCommand)
waitCommand.Flags().Int64Var(&flags.changeId, "changeId", 0, "The id of the change to wait for")
waitCommand.MarkFlagRequired("changeId")
clusterCommand.AddCommand(scaleCommand)
scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to")
scaleCommand.Flags().Int32Var(&flags.replicationFactor, "replicationFactor", -1, "The new replication factor")
scaleCommand.MarkFlagRequired("brokers")
forceFailoverCommand.Flags().Int32Var(&flags.regions, "regions", 1, "The number of regions in the cluster")
forceFailoverCommand.Flags().Int32Var(&flags.regionId, "regionId", 0, "The id of the region to failover to")
forceFailoverCommand.MarkFlagRequired("regions")
forceFailoverCommand.MarkFlagRequired("regionId")
}

func scaleCluster(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

err = k8Client.AwaitReadiness()
ensureNoError(err)

port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()
currentTopology, err := QueryTopology(port)
Expand All @@ -85,9 +95,9 @@ func scaleCluster(flags *Flags) error {
}

if len(currentTopology.Brokers) > flags.brokers {
_, err = scaleDownBrokers(k8Client, port, flags.brokers)
_, err = scaleDownBrokers(k8Client, port, flags.brokers, flags.replicationFactor)
} else if len(currentTopology.Brokers) < flags.brokers {
_, err = scaleUpBrokers(k8Client, port, flags.brokers)
_, err = scaleUpBrokers(k8Client, port, flags.brokers, flags.replicationFactor)
} else {
return fmt.Errorf("cluster is already at size %d", flags.brokers)
}
Expand All @@ -96,17 +106,17 @@ func scaleCluster(flags *Flags) error {
return nil
}

func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor)
ensureNoError(err)
_, err = k8Client.ScaleZeebeCluster(brokers)
ensureNoError(err)
waitForChange(port, changeResponse.ChangeId)
return changeResponse, nil
}

func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers)
func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
changeResponse, err := requestBrokerScaling(port, brokers, replicationFactor)
ensureNoError(err)

// Wait for brokers to leave before scaling down
Expand All @@ -118,16 +128,28 @@ func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int) (*Chang
return changeResponse, nil
}

func requestBrokerScaling(port int, brokers int) (*ChangeResponse, error) {
func requestBrokerScaling(port int, brokers int, replicationFactor int32) (*ChangeResponse, error) {
brokerIds := make([]int32, brokers)
for i := 0; i < brokers; i++ {
brokerIds[i] = int32(i)
}
url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers", port)
return sendScaleRequest(port, brokerIds, false, replicationFactor)
}

func sendScaleRequest(port int, brokerIds []int32, force bool, replicationFactor int32) (*ChangeResponse, error) {
forceParam := "false"
if force {
forceParam = "true"
}
url := fmt.Sprintf("http://localhost:%d/actuator/cluster/brokers?force=%s", port, forceParam)
if replicationFactor > 0 {
url = url + fmt.Sprintf("&replicationFactor=%d", replicationFactor)
}
request, err := json.Marshal(brokerIds)
if err != nil {
return nil, err
}
internal.LogInfo("Requesting scaling %s with input %s", url, request)
resp, err := http.Post(url, "application/json", bytes.NewReader(request))
if err != nil {
return nil, err
Expand Down Expand Up @@ -222,6 +244,40 @@ func waitForChange(port int, changeId int64) error {
return fmt.Errorf("change %d did not complete within 25 minutes", changeId)
}

func forceFailover(flags *Flags) error {
k8Client, err := createK8ClientWithFlags(flags)
ensureNoError(err)

port, closePortForward := k8Client.MustGatewayPortForward(0, 9600)
defer closePortForward()
currentTopology, err := QueryTopology(port)
ensureNoError(err)
if currentTopology.PendingChange != nil {
return fmt.Errorf("cluster is already scaling")
}

brokersInRegion := getBrokers(currentTopology, flags.regions, flags.regionId)

changeResponse, err := sendScaleRequest(port, brokersInRegion, true, -1)
ensureNoError(err)

err = waitForChange(port, changeResponse.ChangeId)
ensureNoError(err)

return nil
}

func getBrokers(topology *CurrentTopology, regions int32, regionId int32) []int32 {
brokersInRegion := make([]int32, 0)
for _, b := range topology.Brokers {
if b.Id%regions == regionId {
brokersInRegion = append(brokersInRegion, b.Id)
}
}

return brokersInRegion
}

type ChangeStatus string

const (
Expand Down
17 changes: 10 additions & 7 deletions go-chaos/cmd/dataloss_sim.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags *Flags) {
panic(err)
}

// The pod is restarting after dataloss, so it takes longer to be ready
err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute)

if err != nil {
internal.LogInfo("%s", err)
panic(err)
if flags.awaitReadiness {
// The pod is restarting after dataloss, so it takes longer to be ready
err = k8Client.AwaitPodReadiness(pod.Name, 10*time.Minute)

if err != nil {
internal.LogInfo("%s", err)
panic(err)
}
internal.LogInfo("Broker %d is recovered", flags.nodeId)
}
internal.LogInfo("Broker %d is recovered", flags.nodeId)
},
}

Expand All @@ -122,4 +124,5 @@ func AddDatalossSimulationCmd(rootCmd *cobra.Command, flags *Flags) {

datalossDelete.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker")
datalossRecover.Flags().IntVar(&flags.nodeId, "nodeId", 1, "Specify the id of the broker")
datalossRecover.Flags().BoolVar(&flags.awaitReadiness, "awaitReadiness", true, "If true wait until the recovered pod is ready")
}
10 changes: 8 additions & 2 deletions go-chaos/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,14 @@ type Flags struct {
jobType string

// cluster
changeId int64
brokers int
changeId int64
brokers int
regionId int32
regions int32
replicationFactor int32

// dataloss
awaitReadiness bool
}

var Version = "development"
Expand Down
Loading