diff --git a/go-chaos/cmd/worker.go b/go-chaos/cmd/worker.go index 33715ebb7..c80edc956 100644 --- a/go-chaos/cmd/worker.go +++ b/go-chaos/cmd/worker.go @@ -16,13 +16,11 @@ package cmd import ( "context" - "time" - "github.com/camunda/zeebe/clients/go/v8/pkg/entities" - "github.com/camunda/zeebe/clients/go/v8/pkg/worker" "github.com/camunda/zeebe/clients/go/v8/pkg/zbc" "github.com/spf13/cobra" "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" + "github.com/zeebe-io/zeebe-chaos/go-chaos/worker" ) const jobType = "zbchaos" @@ -38,26 +36,6 @@ var workerCommand = &cobra.Command{ Run: start_worker, } -type ChaosProvider struct { - Path string - Arguments []string - Timeout int64 -} - -type AuthenticationProvider struct { - Audience string - AuthorizationUrl string - ClientId string - ClientSecret string - ContactPoint string -} - -type ZbChaosVariables struct { - ClusterId *string - Provider ChaosProvider - AuthenticationDetails AuthenticationProvider -} - func start_worker(cmd *cobra.Command, args []string) { // the credentials are set via env var's credsProvider, err := zbc.NewOAuthCredentialsProvider(&zbc.OAuthProviderConfig{}) @@ -74,41 +52,11 @@ func start_worker(cmd *cobra.Command, args []string) { } // Allow only one job at a time, otherwise job handling might interfere (e.g. override global vars) - jobWorker := client.NewJobWorker().JobType(jobType).Handler(handleZbChaosJob).MaxJobsActive(1).Open() + worker.CommandRunner = runZbChaosCommand + jobWorker := client.NewJobWorker().JobType(jobType).Handler(worker.HandleZbChaosJob).MaxJobsActive(1).Open() jobWorker.AwaitClose() } -func handleZbChaosJob(client worker.JobClient, job entities.Job) { - ctx := context.Background() - - jobVariables := ZbChaosVariables{ - Provider: ChaosProvider{ - Timeout: 15 * 60, // 15 minute default Timeout - }, - } - err := job.GetVariablesAs(&jobVariables) - if err != nil { - // Can't parse variables, no sense in retrying - _, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(0).Send(ctx) - return - } - - timeout := time.Duration(jobVariables.Provider.Timeout) * time.Second - commandCtx, cancelCommand := context.WithTimeout(ctx, timeout) - defer cancelCommand() - - clusterAccessArgs := append([]string{""}, "--namespace", *jobVariables.ClusterId+"-zeebe", "--clientId", jobVariables.AuthenticationDetails.ClientId, "--clientSecret", jobVariables.AuthenticationDetails.ClientSecret, "--audience", jobVariables.AuthenticationDetails.Audience) - commandArgs := append(clusterAccessArgs, jobVariables.Provider.Arguments...) - - err = runZbChaosCommand(commandArgs, commandCtx) - if err != nil { - _, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries - 1).Send(ctx) - return - } - - _, _ = client.NewCompleteJobCommand().JobKey(job.Key).Send(ctx) -} - func runZbChaosCommand(args []string, ctx context.Context) error { internal.LogInfo("Running command with args: %v ", args) rootCmd.SetArgs(args) diff --git a/go-chaos/worker/chaos_worker.go b/go-chaos/worker/chaos_worker.go new file mode 100644 index 000000000..266ae630a --- /dev/null +++ b/go-chaos/worker/chaos_worker.go @@ -0,0 +1,62 @@ +package worker + +import ( + "context" + "time" + + "github.com/camunda/zeebe/clients/go/v8/pkg/entities" + "github.com/camunda/zeebe/clients/go/v8/pkg/worker" +) + +var CommandRunner func([]string, context.Context) error + +type ChaosProvider struct { + Path string + Arguments []string + Timeout int64 +} + +type AuthenticationProvider struct { + Audience string + AuthorizationUrl string + ClientId string + ClientSecret string + ContactPoint string +} + +type ZbChaosVariables struct { + ClusterId *string + Provider ChaosProvider + AuthenticationDetails AuthenticationProvider +} + +func HandleZbChaosJob(client worker.JobClient, job entities.Job) { + ctx := context.Background() + + jobVariables := ZbChaosVariables{ + Provider: ChaosProvider{ + Timeout: 15 * 60, // 15 minute default Timeout + }, + } + err := job.GetVariablesAs(&jobVariables) + if err != nil { + // Can't parse variables, no sense in retrying + _, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(0).Send(ctx) + return + } + + timeout := time.Duration(jobVariables.Provider.Timeout) * time.Second + commandCtx, cancelCommand := context.WithTimeout(ctx, timeout) + defer cancelCommand() + + clusterAccessArgs := append([]string{}, "--namespace", *jobVariables.ClusterId+"-zeebe", "--clientId", jobVariables.AuthenticationDetails.ClientId, "--clientSecret", jobVariables.AuthenticationDetails.ClientSecret, "--audience", jobVariables.AuthenticationDetails.Audience) + commandArgs := append(clusterAccessArgs, jobVariables.Provider.Arguments...) + + err = CommandRunner(commandArgs, commandCtx) + if err != nil { + _, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries - 1).Send(ctx) + return + } + + _, _ = client.NewCompleteJobCommand().JobKey(job.Key).Send(ctx) +}