Skip to content

Commit

Permalink
refactor: move worker code
Browse files Browse the repository at this point in the history
Move worker code into own package, for better encapsulation and testing purposes.
  • Loading branch information
ChrisKujawa committed Nov 24, 2022
1 parent 600b90e commit 1686394
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 55 deletions.
58 changes: 3 additions & 55 deletions go-chaos/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ package cmd

import (
"context"
"time"

"github.com/camunda/zeebe/clients/go/v8/pkg/entities"
"github.com/camunda/zeebe/clients/go/v8/pkg/worker"
"github.com/camunda/zeebe/clients/go/v8/pkg/zbc"
"github.com/spf13/cobra"
"github.com/zeebe-io/zeebe-chaos/go-chaos/internal"
"github.com/zeebe-io/zeebe-chaos/go-chaos/worker"
)

const jobType = "zbchaos"
Expand All @@ -38,26 +36,6 @@ var workerCommand = &cobra.Command{
Run: start_worker,
}

type ChaosProvider struct {
Path string
Arguments []string
Timeout int64
}

type AuthenticationProvider struct {
Audience string
AuthorizationUrl string
ClientId string
ClientSecret string
ContactPoint string
}

type ZbChaosVariables struct {
ClusterId *string
Provider ChaosProvider
AuthenticationDetails AuthenticationProvider
}

func start_worker(cmd *cobra.Command, args []string) {
// the credentials are set via env var's
credsProvider, err := zbc.NewOAuthCredentialsProvider(&zbc.OAuthProviderConfig{})
Expand All @@ -74,41 +52,11 @@ func start_worker(cmd *cobra.Command, args []string) {
}

// Allow only one job at a time, otherwise job handling might interfere (e.g. override global vars)
jobWorker := client.NewJobWorker().JobType(jobType).Handler(handleZbChaosJob).MaxJobsActive(1).Open()
worker.CommandRunner = runZbChaosCommand
jobWorker := client.NewJobWorker().JobType(jobType).Handler(worker.HandleZbChaosJob).MaxJobsActive(1).Open()
jobWorker.AwaitClose()
}

func handleZbChaosJob(client worker.JobClient, job entities.Job) {
ctx := context.Background()

jobVariables := ZbChaosVariables{
Provider: ChaosProvider{
Timeout: 15 * 60, // 15 minute default Timeout
},
}
err := job.GetVariablesAs(&jobVariables)
if err != nil {
// Can't parse variables, no sense in retrying
_, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(0).Send(ctx)
return
}

timeout := time.Duration(jobVariables.Provider.Timeout) * time.Second
commandCtx, cancelCommand := context.WithTimeout(ctx, timeout)
defer cancelCommand()

clusterAccessArgs := append([]string{""}, "--namespace", *jobVariables.ClusterId+"-zeebe", "--clientId", jobVariables.AuthenticationDetails.ClientId, "--clientSecret", jobVariables.AuthenticationDetails.ClientSecret, "--audience", jobVariables.AuthenticationDetails.Audience)
commandArgs := append(clusterAccessArgs, jobVariables.Provider.Arguments...)

err = runZbChaosCommand(commandArgs, commandCtx)
if err != nil {
_, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries - 1).Send(ctx)
return
}

_, _ = client.NewCompleteJobCommand().JobKey(job.Key).Send(ctx)
}

func runZbChaosCommand(args []string, ctx context.Context) error {
internal.LogInfo("Running command with args: %v ", args)
rootCmd.SetArgs(args)
Expand Down
62 changes: 62 additions & 0 deletions go-chaos/worker/chaos_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package worker

import (
"context"
"time"

"github.com/camunda/zeebe/clients/go/v8/pkg/entities"
"github.com/camunda/zeebe/clients/go/v8/pkg/worker"
)

var CommandRunner func([]string, context.Context) error

type ChaosProvider struct {
Path string
Arguments []string
Timeout int64
}

type AuthenticationProvider struct {
Audience string
AuthorizationUrl string
ClientId string
ClientSecret string
ContactPoint string
}

type ZbChaosVariables struct {
ClusterId *string
Provider ChaosProvider
AuthenticationDetails AuthenticationProvider
}

func HandleZbChaosJob(client worker.JobClient, job entities.Job) {
ctx := context.Background()

jobVariables := ZbChaosVariables{
Provider: ChaosProvider{
Timeout: 15 * 60, // 15 minute default Timeout
},
}
err := job.GetVariablesAs(&jobVariables)
if err != nil {
// Can't parse variables, no sense in retrying
_, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(0).Send(ctx)
return
}

timeout := time.Duration(jobVariables.Provider.Timeout) * time.Second
commandCtx, cancelCommand := context.WithTimeout(ctx, timeout)
defer cancelCommand()

clusterAccessArgs := append([]string{}, "--namespace", *jobVariables.ClusterId+"-zeebe", "--clientId", jobVariables.AuthenticationDetails.ClientId, "--clientSecret", jobVariables.AuthenticationDetails.ClientSecret, "--audience", jobVariables.AuthenticationDetails.Audience)
commandArgs := append(clusterAccessArgs, jobVariables.Provider.Arguments...)

err = CommandRunner(commandArgs, commandCtx)
if err != nil {
_, _ = client.NewFailJobCommand().JobKey(job.Key).Retries(job.Retries - 1).Send(ctx)
return
}

_, _ = client.NewCompleteJobCommand().JobKey(job.Key).Send(ctx)
}

0 comments on commit 1686394

Please sign in to comment.