Skip to content

Commit

Permalink
Support disconnect gateway within zbchaos (#233)
Browse files Browse the repository at this point in the history
blocked by #232 

- Allow to disconnect first gateway with broker of specific nodeId or
partitionId and role.
- Allow to disconnect first gateway with all brokers.
- Allow to disconnect only one direction (asymetric)
- Allow to connect all again

Features are similar or more than:

*
https://github.com/zeebe-io/zeebe-chaos/blob/main/chaos-workers/chaos-experiments/scripts/disconnect-standalone-gateway.sh
*
https://github.com/zeebe-io/zeebe-chaos/blob/main/chaos-workers/chaos-experiments/scripts/connect-standalone-gateway.sh

Note: right now we only support disconnecting the first standalone
gateway, this was how we did it before as well. We can improve that
later.

closes #225 

-----


### Example

Example usage:

```
$ ./zbchaos disconnect gateway
Disconnect zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 from zell-chaos-zeebe-2
Disconnect zell-chaos-zeebe-2 from zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
```

In gateway we can see logs like: `"Failed to activate jobs for type
benchmark-task from partition 2`

One direction:

```
$ ./zbchaos disconnect gateway --one-direction
Disconnect zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 from zell-chaos-zeebe-2
```

With verbosity:

```

$ ./zbchaos disconnect gateway --verbose --one-direction
Connecting to zell-chaos
Running experiment in self-managed environment.
Did not find zeebe cluster to pause reconciliation, ignoring. 
Patched statefulset
Patched deployment
Successfully created port forwarding tunnel
Found Broker zell-chaos-zeebe-2 as LEADER for partition 2.
Execute ["apt" "-qq" "update"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
4 packages can be upgraded. Run 'apt list --upgradable' to see them.
Execute ["apt" "-qq" "install" "-y" "iproute2"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
iproute2 is already the newest version (5.5.0-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 4 not upgraded.
Execute ["ip" "route" "replace" "unreachable" "10.0.30.4"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Disconnect zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 from zell-chaos-zeebe-2
```

With nodeId
```
$ ./zbchaos disconnect gateway --verbose --one-direction --nodeId 1
Connecting to zell-chaos
Running experiment in self-managed environment.
Did not find zeebe cluster to pause reconciliation, ignoring. 
Patched statefulset
Patched deployment
Successfully created port forwarding tunnel
Found Broker zell-chaos-zeebe-1 with node id 1.
Execute ["apt" "-qq" "update"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
4 packages can be upgraded. Run 'apt list --upgradable' to see them.
Execute ["apt" "-qq" "install" "-y" "iproute2"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
iproute2 is already the newest version (5.5.0-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 4 not upgraded.
Execute ["ip" "route" "replace" "unreachable" "10.0.17.4"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Disconnect zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 from zell-chaos-zeebe-1
```

When specifying node id and partition id
```
$ ./zbchaos disconnect gateway --verbose --one-direction --nodeId 1 --partitionId 2
Error: if any flags in the group [partitionId nodeId] are set none of the others can be; [nodeId partitionId] were all set
```


Specify partition

```
$ ./zbchaos disconnect gateway --verbose --one-direction --partitionId 3
Connecting to zell-chaos
Running experiment in self-managed environment.
Did not find zeebe cluster to pause reconciliation, ignoring. 
Patched statefulset
Patched deployment
Successfully created port forwarding tunnel
Found Broker zell-chaos-zeebe-0 as LEADER for partition 3.
Execute ["apt" "-qq" "update"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
4 packages can be upgraded. Run 'apt list --upgradable' to see them.
Execute ["apt" "-qq" "install" "-y" "iproute2"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
iproute2 is already the newest version (5.5.0-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 4 not upgraded.
Execute ["ip" "route" "replace" "unreachable" "10.0.12.5"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Disconnect zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 from zell-chaos-zeebe-0
```

Disconnect all

```
$ ./zbchaos disconnect gateway --verbose --one-direction --all
Connecting to zell-chaos
Running experiment in self-managed environment.
Did not find zeebe cluster to pause reconciliation, ignoring. 
Patched statefulset
Patched deployment
Successfully created port forwarding tunnel
Execute ["apt" "-qq" "update"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
4 packages can be upgraded. Run 'apt list --upgradable' to see them.
Execute ["apt" "-qq" "install" "-y" "iproute2"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
iproute2 is already the newest version (5.5.0-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 4 not upgraded.
Execute ["ip" "route" "replace" "unreachable" "10.0.12.5"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Disconnect zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 from zell-chaos-zeebe-0
Execute ["apt" "-qq" "update"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
4 packages can be upgraded. Run 'apt list --upgradable' to see them.
Execute ["apt" "-qq" "install" "-y" "iproute2"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
iproute2 is already the newest version (5.5.0-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 4 not upgraded.
Execute ["ip" "route" "replace" "unreachable" "10.0.17.4"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Disconnect zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 from zell-chaos-zeebe-1
Execute ["apt" "-qq" "update"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
4 packages can be upgraded. Run 'apt list --upgradable' to see them.
Execute ["apt" "-qq" "install" "-y" "iproute2"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
iproute2 is already the newest version (5.5.0-1ubuntu1).
0 upgraded, 0 newly installed, 0 to remove and 4 not upgraded.
Execute ["ip" "route" "replace" "unreachable" "10.0.30.4"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Disconnect zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 from zell-chaos-zeebe-2
```


Connect all again


```
$ ./zbchaos connect gateway --verbose
Connecting to zell-chaos
Running experiment in self-managed environment.
Execute ["sh" "-c" "command -v ip"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
/usr/sbin/ip
Execute ["sh" "-c" "ip route del unreachable 10.0.12.5"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Connected zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 again with zell-chaos-zeebe-0, removed unreachable routes.
Execute ["sh" "-c" "command -v ip"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
/usr/sbin/ip
Execute ["sh" "-c" "ip route del unreachable 10.0.17.4"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Connected zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 again with zell-chaos-zeebe-1, removed unreachable routes.
Execute ["sh" "-c" "command -v ip"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
/usr/sbin/ip
Execute ["sh" "-c" "ip route del unreachable 10.0.30.4"] on pod zell-chaos-zeebe-gateway-7c4f86f95f-2kp24
Connected zell-chaos-zeebe-gateway-7c4f86f95f-2kp24 again with zell-chaos-zeebe-2, removed unreachable routes.
```
  • Loading branch information
ChrisKujawa authored Nov 18, 2022
2 parents a736b9d + 408f418 commit 11f79e0
Show file tree
Hide file tree
Showing 6 changed files with 232 additions and 37 deletions.
36 changes: 36 additions & 0 deletions go-chaos/cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
func init() {
rootCmd.AddCommand(connect)
connect.AddCommand(connectBrokers)
connect.AddCommand(connectGateway)
}

var connect = &cobra.Command{
Expand Down Expand Up @@ -69,3 +70,38 @@ var connectBrokers = &cobra.Command{
}
},
}

var connectGateway = &cobra.Command{
Use: "gateway",
Short: "Connect Zeebe Gateway",
Long: `Connect all Zeebe Gateway again, after it has been disconnected.`,
Run: func(cmd *cobra.Command, args []string) {
internal.Verbosity = Verbose
k8Client, err := internal.CreateK8Client()
ensureNoError(err)

// No patch is need, since we expect that disconnect was executed before.
// If not all fine and the pods are already connected.

// We run connect on all nodes
brokerPods, err := k8Client.GetBrokerPods()
ensureNoError(err)

if len(brokerPods.Items) <= 0 {
panic(fmt.Sprintf("Expected to find broker(s) in current namespace %s, but found nothing\n", k8Client.GetCurrentNamespace()))
}

gatewayPod := getGatewayPod(k8Client)

for _, brokerPod := range brokerPods.Items {
err = internal.MakeIpReachable(k8Client, gatewayPod.Name, brokerPod.Status.PodIP)
if err != nil {
if Verbose {
fmt.Printf("Error on connection gateway: %s. Error: %s\n", gatewayPod.Name, err.Error())
}
} else {
fmt.Printf("Connected %s again with %s, removed unreachable routes.\n", gatewayPod.Name, brokerPod.Name)
}
}
},
}
138 changes: 104 additions & 34 deletions go-chaos/cmd/disconnect.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package cmd

import (
"errors"
"fmt"

"github.com/camunda/zeebe/clients/go/v8/pkg/zbc"
Expand All @@ -24,30 +25,36 @@ import (
)

var (
oneDirection bool
oneDirection bool
disconnectToAll bool
)

func init() {
rootCmd.AddCommand(disconnect)

// disconnect brokers
disconnect.AddCommand(disconnectBrokers)

// broker 1
disconnectBrokers.Flags().StringVar(&broker1Role, "broker1Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the first Broker")
disconnectBrokers.Flags().IntVar(&broker1PartitionId, "broker1PartitionId", 1, "Specify the partition id of the first Broker")

disconnectBrokers.Flags().IntVar(&broker1NodeId, "broker1NodeId", -1, "Specify the nodeId of the first Broker")
disconnectBrokers.MarkFlagsMutuallyExclusive("broker1PartitionId", "broker1NodeId")

// broker 2
disconnectBrokers.Flags().StringVar(&broker2Role, "broker2Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the second Broker")
disconnectBrokers.Flags().IntVar(&broker2PartitionId, "broker2PartitionId", 2, "Specify the partition id of the second Broker")

disconnectBrokers.Flags().IntVar(&broker2NodeId, "broker2NodeId", -1, "Specify the nodeId of the second Broker")

// general
disconnectBrokers.Flags().BoolVar(&oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)")
disconnectBrokers.MarkFlagsMutuallyExclusive("broker2PartitionId", "broker2NodeId", "one-direction")

// disconnect gateway
disconnect.AddCommand(disconnectGateway)
disconnectGateway.Flags().IntVar(&nodeId, "nodeId", -1, "Specify the nodeId of the Broker")
disconnectGateway.Flags().StringVar(&role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the Broker")
disconnectGateway.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the partition id of the Broker")
disconnectGateway.Flags().BoolVar(&oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)")
disconnectGateway.Flags().BoolVar(&disconnectToAll, "all", false, "Specify whether the gateway should be disconnected to all brokers")
disconnectGateway.MarkFlagsMutuallyExclusive("all", "partitionId", "nodeId")
}

var disconnect = &cobra.Command{
Expand All @@ -56,21 +63,26 @@ var disconnect = &cobra.Command{
Long: `Disconnect Zeebe nodes, uses sub-commands to disconnect leaders, followers, etc.`,
}

func ensureNoError(err error) {
if err != nil {
panic(err)
}
}

var disconnectBrokers = &cobra.Command{
Use: "brokers",
Short: "Disconnect Zeebe Brokers",
Long: `Disconnect Zeebe Brokers with a given partition and role.`,
Run: func(cmd *cobra.Command, args []string) {
internal.Verbosity = Verbose
k8Client, err := internal.CreateK8Client()
if err != nil {
panic(err)
}
ensureNoError(err)

err = k8Client.PauseReconciliation()
ensureNoError(err)

err = k8Client.ApplyNetworkPatch()
if err != nil {
panic(err)
}
ensureNoError(err)

if Verbose {
fmt.Println("Patched statefulset")
Expand All @@ -81,9 +93,7 @@ var disconnectBrokers = &cobra.Command{
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
if err != nil {
panic(err.Error())
}
ensureNoError(err)
defer zbClient.Close()

broker1Pod := getBrokerPod(k8Client, zbClient, broker1NodeId, broker1PartitionId, broker1Role)
Expand All @@ -94,19 +104,7 @@ var disconnectBrokers = &cobra.Command{
return
}

err = internal.MakeIpUnreachableForPod(k8Client, broker2Pod.Status.PodIP, broker1Pod.Name)
if err != nil {
panic(err.Error())
}
fmt.Printf("Disconnect %s from %s\n", broker1Pod.Name, broker2Pod.Name)

if !oneDirection {
err = internal.MakeIpUnreachableForPod(k8Client, broker1Pod.Status.PodIP, broker2Pod.Name)
if err != nil {
panic(err.Error())
}
fmt.Printf("Disconnect %s from %s\n", broker2Pod.Name, broker1Pod.Name)
}
disconnectPods(k8Client, broker1Pod, broker2Pod)
},
}

Expand All @@ -115,21 +113,93 @@ func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId
var err error
if brokerNodeId >= 0 {
brokerPod, err = internal.GetBrokerPodForNodeId(k8Client, int32(brokerNodeId))
if err != nil {
panic(err.Error())
}
ensureNoError(err)
if Verbose {
fmt.Printf("Found Broker %s with node id %d.\n", brokerPod.Name, brokerNodeId)
}
} else {
brokerPod, err = internal.GetBrokerPodForPartitionAndRole(k8Client, zbClient, brokerPartitionId, brokerRole)
if err != nil {
panic(err.Error())
}
ensureNoError(err)
if Verbose {
fmt.Printf("Found Broker %s as %s for partition %d.\n", brokerPod.Name, role, brokerPartitionId)
}
}

return brokerPod
}

func getGatewayPod(k8Client internal.K8Client) *v1.Pod {
pods, err := k8Client.GetGatewayPods()
ensureNoError(err)

if pods != nil && len(pods.Items) > 0 {
return &pods.Items[0]
}

panic(errors.New(fmt.Sprintf("Expected to find standalone gateway, but found nothing.")))
}

var disconnectGateway = &cobra.Command{
Use: "gateway",
Short: "Disconnect Zeebe Gateway",
Long: `Disconnect Zeebe Gateway from Broker with a given partition and role.`,
Run: func(cmd *cobra.Command, args []string) {
internal.Verbosity = Verbose
k8Client, err := internal.CreateK8Client()
ensureNoError(err)

err = k8Client.PauseReconciliation()
ensureNoError(err)

err = k8Client.ApplyNetworkPatch()
ensureNoError(err)

if Verbose {
fmt.Println("Patched statefulset")
}

err = k8Client.ApplyNetworkPatchOnGateway()
ensureNoError(err)

if Verbose {
fmt.Println("Patched deployment")
}

err = k8Client.AwaitReadiness()
ensureNoError(err)

port := 26500
closeFn := k8Client.MustGatewayPortForward(port, port)
defer closeFn()

zbClient, err := internal.CreateZeebeClient(port)
ensureNoError(err)
defer zbClient.Close()

gatewayPod := getGatewayPod(k8Client)

if disconnectToAll {
pods, err := k8Client.GetBrokerPods()
ensureNoError(err)

for _, brokerPod := range pods.Items {
disconnectPods(k8Client, gatewayPod, &brokerPod)
}
} else {
broker2Pod := getBrokerPod(k8Client, zbClient, nodeId, partitionId, role)
disconnectPods(k8Client, gatewayPod, broker2Pod)
}
},
}

func disconnectPods(k8Client internal.K8Client, firstPod *v1.Pod, secondPod *v1.Pod) {
err := internal.MakeIpUnreachableForPod(k8Client, secondPod.Status.PodIP, firstPod.Name)
ensureNoError(err)
fmt.Printf("Disconnect %s from %s\n", firstPod.Name, secondPod.Name)

if !oneDirection {
err = internal.MakeIpUnreachableForPod(k8Client, firstPod.Status.PodIP, secondPod.Name)
ensureNoError(err)
fmt.Printf("Disconnect %s from %s\n", secondPod.Name, firstPod.Name)
}
}
20 changes: 19 additions & 1 deletion go-chaos/internal/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (c K8Client) ApplyNetworkPatchOnGateway() error {
"spec":{
"containers":[
{
"name": "zeebe",
"name": "zeebe-gateway",
"securityContext":{
"capabilities":{
"add":["NET_ADMIN"]
Expand Down Expand Up @@ -134,3 +134,21 @@ func MakeIpReachableForPod(k8Client K8Client, podName string) error {

return nil
}

func MakeIpReachable(k8Client K8Client, podName string, ip string) error {
// We try to reduce the system output in order to not break the execution. There is a limit for the sout for exec,
// for more details see remotecommand.StreamOptions

err := k8Client.ExecuteCmdOnPod([]string{"sh", "-c", "command -v ip"}, podName)
if err != nil && strings.Contains(err.Error(), "exit code 127") {
return errors.New("Execution exited with exit code 127 (Command not found). It is likely that the broker was not disconnected or restarted in between.")
}

var buf bytes.Buffer
err = k8Client.ExecuteCmdOnPodWriteIntoOutput([]string{"sh", "-c", fmt.Sprintf("ip route del unreachable %s", ip)}, podName, &buf)
if err != nil {
return err
}

return nil
}
2 changes: 1 addition & 1 deletion go-chaos/internal/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func Test_ShouldApplyNetworkPatchOnDeployment(t *testing.T) {
k8Client := CreateFakeClient()
selector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels())
require.NoError(t, err)
k8Client.CreateDeploymentWithLabelsAndName(t, selector, "gateway")
k8Client.CreateDeploymentWithLabelsAndName(t, selector, "zeebe-gateway")

// when
err = k8Client.ApplyNetworkPatchOnGateway()
Expand Down
11 changes: 10 additions & 1 deletion go-chaos/internal/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (c K8Client) extractPodNames(list *v1.PodList) ([]string, error) {
return names, nil
}

func (c K8Client) GetGatewayPodNames() ([]string, error) {
func (c K8Client) GetGatewayPods() (*v1.PodList, error) {
listOptions := metav1.ListOptions{
LabelSelector: c.getGatewayLabels(),
// we check for running gateways, since terminated gateways can be lying around
Expand All @@ -78,6 +78,15 @@ func (c K8Client) GetGatewayPodNames() ([]string, error) {
return nil, err
}

return list, err
}

func (c K8Client) GetGatewayPodNames() ([]string, error) {
list, err := c.GetGatewayPods()
if err != nil {
return nil, err
}

if list == nil || len(list.Items) == 0 {
return c.GetBrokerPodNames()
}
Expand Down
Loading

0 comments on commit 11f79e0

Please sign in to comment.