Skip to content

Commit

Permalink
Add zbchaos commands for enabling tests to simulate multi-region data…
Browse files Browse the repository at this point in the history
…loss and recover (#230)

Related to camunda/camunda#9960 

This PR adds commands to support testing multi-region dataloss and
recovery. We have added commands for
- `zbchaos prepare` Prepare the cluster by adding init container to the
statefulset and adding a configMap. With the help of init container and
the ConfigMap, we can control when a broker that suffered from dataloss
is restarted.
The configMap contains flags `block_{nodeId} = true|false` corresponding
to each nodeId. This is available to InitContainer in the mounted volume
via a file corresponding to each flag `/etc/config/block_{nodeId}`.
InitContainer is blocked if the flag is set to true. When the configMap
is updated, this is reflected in the container eventually. There might
be a delay, but eventually `/etc/config/block_{nodeId}` will have the
updated value and the InitContainer can break out of the loop.
- `zbchaos dataloss delete` Delete the broker and its data. Sets the
flag in configMap to true to block the startup in InitContainer.
- `zbchaos dataloss recover` Restarts a broker and wait until it has
recovered the data. Resets the flag in configMap so that the
initContainer can exit and the broker container can start. Also wait
until the pods are ready, which is necessary to ensure that the broker
have recovered all data.

`prepare` is added as a generic command, not part of dataloss, because
this can be used to apply other patches (eg:- apply patch for enabling
network permissions). We have to run this command only once per cluster,
and repeat the tests without re-running prepare.

To test the dataloss and recovery, we want to setup a cluster with 4
brokers, and replication factor 4. Node 0 and 1 belongs to region 1 and
Node 2 and 3 belongs to region 2. Assuming there is such a cluster in
the given namespace, we can simulate the data loss and recovery by
running the following commands:
1. zbchaos dataloss prepare --namespace NAMESPACE // Need tor run only once in
the namespace
2. zbchaos dataloss delete --namespace NAMESPACE --nodeId 0
3. zbchaos dataloss delete --namespace NAMESPACE --nodeId 1 
4. zbchaos recover --namespace NAMESAPCE --nodeId 0 // Wait until one
node is fully recovered before recovering the second one
5. zbchaos recover --namespace NAMESPACE --nodeId 1 

The above commands simulates full data loss of region 1 and moving the
failed pods from region 1 to region 2. You can then repeat steps 2-5 to
simulate moving those pods back to region 1. Or run steps 2-5 with nodes
2 and 3 to simulate dataloss of region 2.

PS:- It works for clusters in our benchmark deployed via helm. It is not
fully tested for SaaS yet.

PPS:- This PR contains only the supporting commands. The test is not
automated yet.
  • Loading branch information
deepthidevaki authored Nov 16, 2022
2 parents 41492fb + aa1d2c0 commit 2a1cfb2
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 2 deletions.
113 changes: 113 additions & 0 deletions go-chaos/cmd/dataloss_sim.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright 2022 Camunda Services GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cmd

import (
"fmt"
"github.com/spf13/cobra"
"github.com/zeebe-io/zeebe-chaos/go-chaos/internal"
)

func init() {
rootCmd.AddCommand(datalossCmd)
datalossCmd.AddCommand(prepareCmd)
datalossCmd.AddCommand(datalossDelete)
datalossCmd.AddCommand(datalossRecover)

datalossDelete.Flags().IntVar(&nodeId, "nodeId", 1, "Specify the id of the broker")
datalossRecover.Flags().IntVar(&nodeId, "nodeId", 1, "Specify the id of the broker")
}

var datalossCmd = &cobra.Command{
Use: "dataloss",
Short: "Simulate dataloss and recover",
Long: `Simulate dataloss of a broker, and recover from it.`,
}

var prepareCmd = &cobra.Command{
Use: "prepare",
Short: "Prepare the k8s deployment for dataloss test",
Long: `Prepares the k8s deployment - such as applying patches to statefulsets - to enable applying dataloss commands.`,
Run: func(cmd *cobra.Command, args []string) {
k8Client, err := internal.CreateK8Client()
if err != nil {
panic(err)
}

// Add Init container for dataloss simulation test
err = k8Client.ApplyInitContainerPatch()

if err != nil {
panic(err)
}

fmt.Printf("Prepared cluster in namesapce %s\n", k8Client.GetCurrentNamespace())
},
}

var datalossDelete = &cobra.Command{
Use: "delete",
Short: "Delete data of a broker",
Long: `Delete data of a broker by deleting the pvc and the pod`,
Run: func(cmd *cobra.Command, args []string) {

k8Client, err := internal.CreateK8Client()
if err != nil {
panic(err)
}

pod, err := internal.GetBrokerPodForNodeId(k8Client, int32(nodeId))

if err != nil {
fmt.Printf("Failed to get pod with nodeId %d %s\n", nodeId, err)
panic(err)
}

k8Client.DeletePvcOfBroker(pod.Name)

internal.SetInitContainerBlockFlag(k8Client, nodeId, "true")
err = k8Client.RestartPod(pod.Name)
if err != nil {
fmt.Printf("Failed to restart pod %s\n", pod.Name)
panic(err)
}

fmt.Printf("Deleted pod %s in namespace %s\n", pod.Name, k8Client.GetCurrentNamespace())
},
}

var datalossRecover = &cobra.Command{
Use: "recover",
Short: "Recover broker after full data loss",
Long: `Restart the broker after full data loss, wait until the data is fully recovered`,
Run: func(cmd *cobra.Command, args []string) {

k8Client, err := internal.CreateK8Client()
if err != nil {
panic(err)
}

err = internal.SetInitContainerBlockFlag(k8Client, nodeId, "false")
if err != nil {
panic(err)
}

err = k8Client.AwaitReadiness()
if err != nil {
fmt.Printf("%s\n", err)
}
fmt.Printf("Restarted broker %d\n", nodeId)
},
}
176 changes: 176 additions & 0 deletions go-chaos/internal/dataloss_sim_helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2022 Camunda Services GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"strconv"
)

const configMapName = "zeebe-control-pod-restart-flags"

/*
Used for dataloss simulation test, to restrict when a deleted zeebe broker is restarted.
This add an InitContainer to zeebe pods. The init container is blocked in an infinite loop, until the value of `block_{node_id}` in the config map is set to false.
To restrict when a deleted pod is restarted, first update the configmap and set the respective `block_{node_id}` true.
Then delete the pod. Once it is time to restart the pod, update the config map to set the `block_{nodeId}` to false.
The updated config map will be eventually (usually with in a minute) by the init container and breaks out of the loop.
The init container exits and the zeebe container will be started.
*/
func (c K8Client) ApplyInitContainerPatch() error {
// apply config map
err := createConfigMapForInitContainer(c)
if err != nil {
fmt.Printf("Failed to create config map %s", err)
return err
}

statefulSet, err := c.GetZeebeStatefulSet()
if err != nil {
fmt.Printf("Failed to get statefulset %s", err)
return err
}

c.PauseReconciliation()

// Adds init container patch
patch := []byte(`{
"spec": {
"template": {
"spec": {
"volumes": [
{
"name": "zeebe-control-pod-restart-flags-mount",
"configMap": {
"name": "zeebe-control-pod-restart-flags"
}
}
],
"initContainers": [
{
"name": "busybox",
"image": "busybox:1.28",
"command": [
"/bin/sh",
"-c"
],
"args": [
"while true; do block=$(cat /etc/config/block_${K8S_NAME##*-}); if [ $block == \"false\" ]; then break; fi; echo "Startup is blocked."; sleep 10; done"
],
"env": [
{
"name": "K8S_NAME",
"valueFrom": {
"fieldRef": {
"fieldPath": "metadata.name"
}
}
}
],
"volumeMounts": [
{
"name": "zeebe-control-pod-restart-flags-mount",
"mountPath": "/etc/config"
}
]
}
]
}
}
}
}`)
_, err = c.Clientset.AppsV1().StatefulSets(c.GetCurrentNamespace()).Patch(context.TODO(), statefulSet.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
if err != nil {
fmt.Printf("Failed to apply init container patch %s", err)
return err
}
if Verbosity {
fmt.Printf("Applied init container patch to %s \n", statefulSet.Name)
}
return err
}

func createConfigMapForInitContainer(c K8Client) error {
cm, err := c.Clientset.CoreV1().ConfigMaps(c.GetCurrentNamespace()).Get(context.TODO(), configMapName, metav1.GetOptions{})
if err == nil {
fmt.Printf("Config map %s already exists. Will not create again. \n", cm.Name)
return nil
}

if k8sErrors.IsNotFound(err) {
// create config map
cm := corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
Namespace: c.GetCurrentNamespace(),
},

Data: map[string]string{
// When set to true the corresponding zeebe pods will be prevented from starting up.
// It will be blocked in the Init container until this flag is set back to false.
"block_0": "false",
"block_1": "false",
"block_2": "false",
"block_3": "false",
"block_4": "false",
"block_5": "false",
"block_6": "false",
"block_7": "false",
"block_8": "false",
},
}

_, err := c.Clientset.CoreV1().ConfigMaps(c.GetCurrentNamespace()).Create(context.TODO(), &cm, metav1.CreateOptions{})
if err != nil {
fmt.Printf("Failed to create configmap %s", err)
return err
}
if Verbosity {
fmt.Printf("Created config map %s in namespace %s \n", cm.Name, c.GetCurrentNamespace())
}
return nil
}

fmt.Printf("Failed to query configmap %s\n", err)
return err
}

// If the flag set to true, init container will be caught in a loop and prevents the start up of the zeebe broker.
// When the flag is set to false, init container exits and zeebe broker will be restarted.
func SetInitContainerBlockFlag(k8Client K8Client, nodeId int, flag string) error {
cm, err := k8Client.Clientset.CoreV1().ConfigMaps(k8Client.GetCurrentNamespace()).Get(context.TODO(), configMapName, metav1.GetOptions{})
if err != nil {
return err
}

cm.Data["block_"+strconv.Itoa(nodeId)] = flag

cm, err = k8Client.Clientset.CoreV1().ConfigMaps(k8Client.GetCurrentNamespace()).Update(context.TODO(), cm, metav1.UpdateOptions{})

if err != nil {
return err
}
return nil
}
4 changes: 2 additions & 2 deletions go-chaos/internal/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,9 @@ func (c K8Client) checkIfBrokersAreRunning() (bool, error) {

allRunning := true
for _, pod := range pods.Items {
if pod.Status.Phase != v1.PodRunning {
if !pod.Status.ContainerStatuses[0].Ready { // assuming there is only one container
if Verbosity {
fmt.Printf("Pod %s is in phase %s, which is not running. Wait for some seconds.\n", pod.Name, pod.Status.Phase)
fmt.Printf("Pod %s is in phase %s, but not ready. Wait for some seconds.\n", pod.Name, pod.Status.Phase)
}
allRunning = false
break
Expand Down
62 changes: 62 additions & 0 deletions go-chaos/internal/volume.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2022 Camunda Services GmbH
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"
"errors"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func (c K8Client) DeletePvcOfBroker(podName string) error {
volume, err := c.GetVolume(podName)
if err != nil {
return err
}

pvc, err := c.Clientset.CoreV1().PersistentVolumeClaims(c.GetCurrentNamespace()).Get(context.TODO(), volume.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
if err != nil {
return err
}

fmt.Printf("Deleting PV %s\n", pvc.Spec.VolumeName)
err = c.Clientset.CoreV1().PersistentVolumes().Delete(context.TODO(), pvc.Spec.VolumeName, metav1.DeleteOptions{})
if err != nil {
return err
}
fmt.Printf("Deleting PVC %s in namespace %s \n", pvc.Name, c.GetCurrentNamespace())
err = c.Clientset.CoreV1().PersistentVolumeClaims(c.GetCurrentNamespace()).Delete(context.TODO(), volume.PersistentVolumeClaim.ClaimName, metav1.DeleteOptions{})
if err != nil {
return err
}

return nil
}

func (c K8Client) GetVolume(podName string) (*v1.Volume, error) {
pod, err := c.Clientset.CoreV1().Pods(c.GetCurrentNamespace()).Get(context.TODO(), podName, metav1.GetOptions{})
if err != nil {
return nil, err
}
for _, volume := range pod.Spec.Volumes {
if volume.Name == "data" {
return &volume, nil
}
}

return nil, errors.New("PVC not found")
}
Loading

0 comments on commit 2a1cfb2

Please sign in to comment.