Skip to content

Commit

Permalink
Do not fail retry of scale up operation (#511)
Browse files Browse the repository at this point in the history
Scale up operation can some times take longer depending on the size of
data in each partition. By default the job is locked for 5 minutes and
retried after. If the scale up operation do not complete with in this 5
minutes, the retry failed because there is already an operation in
progress. This results in an incident after 3 retries. Manual retry to
resolve incident also failed because the command fails when the
clusterSize is already the requested one.

This PR fixes this by:
1. Do not fail scale up command if clusterSize is already the requested
one
2. Do not wait for the operation to complete in scale up command.
Instead, run two commands separately - scale and wait. This way only
`wait` has to be retried. Since it is a query, it can be safely retried.
3. To allow using `wait` in chaos experiments and e2e tests, allow it to
run without specifiying a changeId. When no changeId is specified, it
reads the changeId from the `pendingChange` or `lastChange`.
  • Loading branch information
deepthidevaki authored Mar 14, 2024
2 parents cb1621c + 0321ed2 commit c62497a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 3 deletions.
15 changes: 12 additions & 3 deletions go-chaos/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func AddClusterCommands(rootCmd *cobra.Command, flags *Flags) {
clusterCommand.AddCommand(waitCommand)
clusterCommand.AddCommand(forceFailoverCommand)
waitCommand.Flags().Int64Var(&flags.changeId, "changeId", 0, "The id of the change to wait for")
waitCommand.MarkFlagRequired("changeId")
clusterCommand.AddCommand(scaleCommand)
scaleCommand.Flags().IntVar(&flags.brokers, "brokers", 0, "The amount of brokers to scale to")
scaleCommand.Flags().Int32Var(&flags.replicationFactor, "replicationFactor", -1, "The new replication factor")
Expand Down Expand Up @@ -99,7 +98,8 @@ func scaleCluster(flags *Flags) error {
} else if len(currentTopology.Brokers) < flags.brokers {
_, err = scaleUpBrokers(k8Client, port, flags.brokers, flags.replicationFactor)
} else {
return fmt.Errorf("cluster is already at size %d", flags.brokers)
internal.LogInfo("cluster is already at size %d", flags.brokers)
return nil
}
ensureNoError(err)

Expand All @@ -111,7 +111,6 @@ func scaleUpBrokers(k8Client internal.K8Client, port int, brokers int, replicati
ensureNoError(err)
_, err = k8Client.ScaleZeebeCluster(brokers)
ensureNoError(err)
waitForChange(port, changeResponse.ChangeId)
return changeResponse, nil
}

Expand Down Expand Up @@ -218,6 +217,16 @@ func waitForChange(port int, changeId int64) error {
internal.LogInfo("Failed to query topology: %s", err)
continue
}
if changeId <= 0 {
if topology.PendingChange != nil {
changeId = topology.PendingChange.Id
} else if topology.LastChange != nil {
changeId = topology.LastChange.Id
} else {
internal.LogInfo("No change exists")
return nil
}
}
changeStatus := describeChangeStatus(topology, int64(changeId))
switch changeStatus {
case ChangeStatusCompleted:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@
"arguments": ["cluster", "scale", "--brokers", "5"]
}
},
{
"type": "action",
"name": "Wait for scale up to complete",
"provider": {
"type": "process",
"path": "zbchaos",
"arguments": ["cluster", "wait"],
"timeout": 900
}
},
{
"name": "All pods should be ready",
"type": "probe",
Expand Down

0 comments on commit c62497a

Please sign in to comment.