Skip to content

Commit

Permalink
Allow to deploy Zeebe workers with zbchaos (#247)
Browse files Browse the repository at this point in the history
Add new sub-command to deploy Zeebe workers into a Zeebe cluster, which
can be used for several chaos experiments. This allows completing
process instances etc.

closes #235

--------

**Example:**


Non-verbose: Deploy worker:

```sh
$ ./zbchaos deploy worker
Worker successfully deployed to the current namespace: zell-chaos
```

Verbose: Deploy worker

```sh
$ ./zbchaos deploy worker -v
Connecting to zell-chaos
Running experiment in self-managed environment.
Deploy worker deployment to the current namespace: zell-chaos
Worker successfully deployed to the current namespace: zell-chaos
```

```sh
$ k get deployments.apps 
NAME                       READY   UP-TO-DATE   AVAILABLE   AGE
worker                     3/3     3            3           107s
```

Error when deployment already exists:


```
$ ./zbchaos deploy worker -v
Connecting to zell-chaos
Running experiment in self-managed environment.
Deploy worker deployment to the current namespace: zell-chaos
panic: deployments.apps "worker" already exists

```
  • Loading branch information
ChrisKujawa authored Nov 22, 2022
2 parents bd506ed + 0e11043 commit 7130a14
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 12 deletions.
34 changes: 22 additions & 12 deletions go-chaos/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func init() {
"Specify how many different versions of a default BPMN and DMN model should be deployed. Useful for testing deployment distribution.")

deployProcessModelCmd.MarkFlagsMutuallyExclusive("processModelPath", "multipleVersions")

deployCmd.AddCommand(deployWorkerCmd)
}

var deployCmd = &cobra.Command{
Expand All @@ -48,34 +50,42 @@ Can be used to deploy a specific process model or multiple version of a default
Defaults to the later, which is useful for experimenting with deployment distribution.`,
Run: func(cmd *cobra.Command, args []string) {
k8Client, err := internal.CreateK8Client()
if err != nil {
panic(err)
}
ensureNoError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
if err != nil {
panic(err.Error())
}
ensureNoError(err)
defer zbClient.Close()

if len(processModelPath) == 0 {
// deploy multi version
err := internal.DeployDifferentVersions(zbClient, int32(multipleVersions))
if err != nil {
panic(err.Error())
}
ensureNoError(err)
fmt.Printf("Deployed different process models of different types and versions to zeebe!\n")
} else {
processDefinitionKey, err := internal.DeployModel(zbClient, processModelPath)
if err != nil {
panic(err.Error())
}
ensureNoError(err)

fmt.Printf("Deployed given process model %s, under key %d!\n", processModelPath, processDefinitionKey)
}
},
}

var deployWorkerCmd = &cobra.Command{
Use: "worker",
Short: "Deploy a worker deployment to the Zeebe cluster",
Long: `Deploy a worker deployment to the Zeebe cluster.
The workers can be used as part of some chaos experiments to complete process instances etc.`,
Run: func(cmd *cobra.Command, args []string) {
k8Client, err := internal.CreateK8Client()
ensureNoError(err)

err = k8Client.CreateWorkerDeployment()
ensureNoError(err)

fmt.Printf("Worker successfully deployed to the current namespace: %s\n", k8Client.GetCurrentNamespace())
},
}
29 changes: 29 additions & 0 deletions go-chaos/internal/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
package internal

import (
"bytes"
"context"
"embed"
"errors"
"fmt"

v12 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/yaml"
)

// k8Deployments holds our static k8 manifests, which are copied with the go:embed directive
//
//go:embed manifests/*
var k8Deployments embed.FS

func (c K8Client) getGatewayDeployment() (*v12.Deployment, error) {

listOptions := metav1.ListOptions{
Expand All @@ -40,3 +48,24 @@ func (c K8Client) getGatewayDeployment() (*v12.Deployment, error) {
}
return &deploymentList.Items[0], err
}

func (c K8Client) CreateWorkerDeployment() error {
workerBytes, err := k8Deployments.ReadFile("manifests/worker.yaml")
if err != nil {
return err
}

if Verbosity {
fmt.Printf("Deploy worker deployment to the current namespace: %s\n", c.GetCurrentNamespace())
}

decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(workerBytes), 0)
deployment := &v12.Deployment{}
err = decoder.Decode(deployment)
if err != nil {
return err
}

_, err = c.Clientset.AppsV1().Deployments(c.GetCurrentNamespace()).Create(context.TODO(), deployment, metav1.CreateOptions{})
return err
}
17 changes: 17 additions & 0 deletions go-chaos/internal/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package internal

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -96,3 +97,19 @@ func Test_ShouldReturnSaaSGatewayDeployment(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, "gateway", deployment.Name)
}

func Test_ShouldDeployWorkerDeployment(t *testing.T) {
// given
k8Client := CreateFakeClient()

// when
err := k8Client.CreateWorkerDeployment()

// then
require.NoError(t, err)
deploymentList, err := k8Client.Clientset.AppsV1().Deployments(k8Client.GetCurrentNamespace()).List(context.TODO(), metav1.ListOptions{})
require.NoError(t, err)

assert.Equal(t, 1, len(deploymentList.Items))
assert.Equal(t, "worker", deploymentList.Items[0].Name)
}
38 changes: 38 additions & 0 deletions go-chaos/internal/manifests/worker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: worker
labels:
app: worker
spec:
selector:
matchLabels:
app: worker
replicas: 3
template:
metadata:
labels:
app: worker
spec:
containers:
- name: worker
image: gcr.io/zeebe-io/worker:zeebe
imagePullPolicy: Always
env:
- name: JAVA_OPTIONS
value: >-
-Dapp.brokerUrl=zeebe-service:26500
-Dzeebe.client.requestTimeout=62000
-Dapp.worker.capacity=10
-Dapp.worker.pollingDelay=1ms
-Dapp.worker.completionDelay=50ms
-XX:+HeapDumpOnOutOfMemoryError
- name: LOG_LEVEL
value: "debug"
resources:
limits:
cpu: 4
memory: 2Gi
requests:
cpu: 1
memory: 512Mi

0 comments on commit 7130a14

Please sign in to comment.