diff --git a/go-chaos/cmd/cluster.go b/go-chaos/cmd/cluster.go index 36b3edfcf..59bb212e2 100644 --- a/go-chaos/cmd/cluster.go +++ b/go-chaos/cmd/cluster.go @@ -119,7 +119,8 @@ func scaleDownBrokers(k8Client internal.K8Client, port int, brokers int, replica ensureNoError(err) // Wait for brokers to leave before scaling down - err = waitForChange(port, changeResponse.ChangeId) + timeout := time.Minute * 25 + err = waitForChange(port, changeResponse.ChangeId, timeout) ensureNoError(err) _, err = k8Client.ScaleZeebeCluster(brokers) @@ -204,12 +205,13 @@ func portForwardAndWaitForChange(flags *Flags) error { port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() - return waitForChange(port, flags.changeId) + // Wait for shorter time. Retry and longer timeout can be configured in the chaos experiment description + timeout := time.Minute * 5 + return waitForChange(port, flags.changeId, timeout) } -func waitForChange(port int, changeId int64) error { +func waitForChange(port int, changeId int64, timeout time.Duration) error { interval := time.Second * 5 - timeout := (time.Minute * 25) iterations := int(timeout / interval) for i := 0; i < int(iterations); i++ { topology, err := QueryTopology(port) @@ -270,7 +272,8 @@ func forceFailover(flags *Flags) error { changeResponse, err := sendScaleRequest(port, brokersInRegion, true, -1) ensureNoError(err) - err = waitForChange(port, changeResponse.ChangeId) + timeout := time.Minute * 5 + err = waitForChange(port, changeResponse.ChangeId, timeout) ensureNoError(err) return nil diff --git a/go-chaos/worker/chaos_worker.go b/go-chaos/worker/chaos_worker.go index 71b97f926..62f0ddea2 100644 --- a/go-chaos/worker/chaos_worker.go +++ b/go-chaos/worker/chaos_worker.go @@ -107,7 +107,9 @@ func HandleZbChaosJob(client worker.JobClient, job entities.Job, commandRunner C err = commandRunner(commandArgs, commandCtx) if err != nil { internal.LogInfo("Error on running command. [key: %d, args: %s]. Error: %s", job.Key, commandArgs, err.Error()) - _, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries - 1).Send(ctx) + backoffDuration := time.Duration(10) * time.Second + // Do not reduce number of retries. The failed job can be retried several times until the configured timeout in chaos action provider + _, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries).RetryBackoff(backoffDuration).Send(ctx) return } diff --git a/go-chaos/worker/chaos_worker_test.go b/go-chaos/worker/chaos_worker_test.go index b702b7a80..07a586093 100644 --- a/go-chaos/worker/chaos_worker_test.go +++ b/go-chaos/worker/chaos_worker_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "testing" + "time" "github.com/camunda/zeebe/clients/go/v8/pkg/entities" "github.com/camunda/zeebe/clients/go/v8/pkg/pb" @@ -237,7 +238,9 @@ func Test_ShouldFailJobWhenHandleFails(t *testing.T) { // then assert.True(t, fakeJobClient.Failed) assert.Equal(t, 123, fakeJobClient.Key) - assert.Equal(t, 2, fakeJobClient.RetriesVal) + // retry count is not decreased + assert.Equal(t, 3, fakeJobClient.RetriesVal) + assert.Equal(t, time.Duration(10)*time.Second, fakeJobClient.RetryBackoff) var expectedArgs = []string{ "--namespace", "clusterId-zeebe", "disconnect", "gateway", @@ -254,7 +257,6 @@ func createVariablesAsJson() (string, error) { marshal, err := json.Marshal(variables) return string(marshal), err - } func createZbChaosVariables() ZbChaosVariables { diff --git a/go-chaos/worker/fake.go b/go-chaos/worker/fake.go index 1593e9ab4..7cd2a25b8 100644 --- a/go-chaos/worker/fake.go +++ b/go-chaos/worker/fake.go @@ -16,6 +16,7 @@ package worker import ( "context" + "time" "github.com/camunda/zeebe/clients/go/v8/pkg/commands" "github.com/camunda/zeebe/clients/go/v8/pkg/pb" @@ -25,12 +26,13 @@ import ( type FakeJobClient struct { worker.JobClient - Key int - RetriesVal int - ErrorMsg string - Failed bool - Succeeded bool - Variables interface{} + Key int + RetriesVal int + RetryBackoff time.Duration + ErrorMsg string + Failed bool + Succeeded bool + Variables interface{} } type FakeCompleteClient struct { @@ -84,6 +86,11 @@ func (f *FakeFailClient) Retries(retries int32) commands.FailJobCommandStep3 { return f } +func (f *FakeFailClient) RetryBackoff(retryBackoff time.Duration) commands.FailJobCommandStep3 { + f.JobClient.RetryBackoff = retryBackoff + return f +} + func (f *FakeFailClient) ErrorMessage(errorMsg string) commands.FailJobCommandStep3 { f.JobClient.ErrorMsg = errorMsg return f