From 368a88a9666713c4050c7d1f4f8fabdebe9fa038 Mon Sep 17 00:00:00 2001 From: Christopher Zell <zelldon91@googlemail.com> Date: Wed, 16 Nov 2022 15:09:30 +0100 Subject: [PATCH 1/4] refactor: extract get pods --- go-chaos/internal/pods.go | 11 +++++- go-chaos/internal/pods_test.go | 62 ++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/go-chaos/internal/pods.go b/go-chaos/internal/pods.go index e4e73bb7d..4e9dc0536 100644 --- a/go-chaos/internal/pods.go +++ b/go-chaos/internal/pods.go @@ -66,7 +66,7 @@ func (c K8Client) extractPodNames(list *v1.PodList) ([]string, error) { return names, nil } -func (c K8Client) GetGatewayPodNames() ([]string, error) { +func (c K8Client) GetGatewayPods() (*v1.PodList, error) { listOptions := metav1.ListOptions{ LabelSelector: c.getGatewayLabels(), // we check for running gateways, since terminated gateways can be lying around @@ -78,6 +78,15 @@ func (c K8Client) GetGatewayPodNames() ([]string, error) { return nil, err } + return list, err +} + +func (c K8Client) GetGatewayPodNames() ([]string, error) { + list, err := c.GetGatewayPods() + if err != nil { + return nil, err + } + if list == nil || len(list.Items) == 0 { return c.GetBrokerPodNames() } diff --git a/go-chaos/internal/pods_test.go b/go-chaos/internal/pods_test.go index 9695f566a..8d8d1d5dd 100644 --- a/go-chaos/internal/pods_test.go +++ b/go-chaos/internal/pods_test.go @@ -123,6 +123,30 @@ func Test_GetNoBrokerPodNames(t *testing.T) { require.Empty(t, names) } +func Test_GetSelfManagedGatewayPod(t *testing.T) { + // given + k8Client := CreateFakeClient() + + // gateway + selector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels()) + require.NoError(t, err) + k8Client.CreatePodWithLabelsAndName(t, selector, "gateway") + + // broker + selector, err = metav1.ParseToLabelSelector(getSelfManagedBrokerLabels()) + require.NoError(t, err) + k8Client.CreatePodWithLabelsAndName(t, selector, "broker") + + // when + pods, err := k8Client.GetGatewayPods() + + // then + require.NoError(t, err) + require.NotNil(t, pods) + require.NotEmpty(t, pods) + assert.Equal(t, "gateway", pods.Items[0].Name, "Expected to retrieve gateway") +} + func Test_GetSelfManagedGatewayPodNames(t *testing.T) { // given k8Client := CreateFakeClient() @@ -172,6 +196,44 @@ func Test_GetSaasGatewayPodNames(t *testing.T) { assert.Equal(t, "gateway", names[0], "Expected to retrieve gateway") } +func Test_GetSaaSGatewayPod(t *testing.T) { + // given + k8Client := CreateFakeClient() + k8Client.createSaaSCRD(t) + + // gateway + selector, err := metav1.ParseToLabelSelector(getSaasGatewayLabels()) + require.NoError(t, err) + k8Client.CreatePodWithLabelsAndName(t, selector, "gateway") + + // broker + selector, err = metav1.ParseToLabelSelector(getSelfManagedBrokerLabels()) + require.NoError(t, err) + k8Client.CreatePodWithLabelsAndName(t, selector, "broker") + + // when + pods, err := k8Client.GetGatewayPods() + + // then + require.NoError(t, err) + require.NotNil(t, pods) + require.NotEmpty(t, pods) + assert.Equal(t, "gateway", pods.Items[0].Name, "Expected to retrieve gateway") +} + +func Test_GetNoGatewayPods(t *testing.T) { + // given + k8Client := CreateFakeClient() + + // when + pods, err := k8Client.GetGatewayPods() + + // then + require.NoError(t, err) + require.NotNil(t, pods) + require.Empty(t, pods) +} + func Test_GetNoGatewayPodNames(t *testing.T) { // given k8Client := CreateFakeClient() From da3b9dff440c21336c1241cf3b67dd38c01c6c66 Mon Sep 17 00:00:00 2001 From: Christopher Zell <zelldon91@googlemail.com> Date: Wed, 16 Nov 2022 15:10:29 +0100 Subject: [PATCH 2/4] fix: use correct container name --- go-chaos/internal/network.go | 2 +- go-chaos/internal/network_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/go-chaos/internal/network.go b/go-chaos/internal/network.go index 1bc46218d..30a6e8dd7 100644 --- a/go-chaos/internal/network.go +++ b/go-chaos/internal/network.go @@ -67,7 +67,7 @@ func (c K8Client) ApplyNetworkPatchOnGateway() error { "spec":{ "containers":[ { - "name": "zeebe", + "name": "zeebe-gateway", "securityContext":{ "capabilities":{ "add":["NET_ADMIN"] diff --git a/go-chaos/internal/network_test.go b/go-chaos/internal/network_test.go index 0571b2009..e2c8aa021 100644 --- a/go-chaos/internal/network_test.go +++ b/go-chaos/internal/network_test.go @@ -47,7 +47,7 @@ func Test_ShouldApplyNetworkPatchOnDeployment(t *testing.T) { k8Client := CreateFakeClient() selector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels()) require.NoError(t, err) - k8Client.CreateDeploymentWithLabelsAndName(t, selector, "gateway") + k8Client.CreateDeploymentWithLabelsAndName(t, selector, "zeebe-gateway") // when err = k8Client.ApplyNetworkPatchOnGateway() From d11c16b10191050beeeb19d592d92fdea27a17f1 Mon Sep 17 00:00:00 2001 From: Christopher Zell <zelldon91@googlemail.com> Date: Wed, 16 Nov 2022 15:11:34 +0100 Subject: [PATCH 3/4] feat: disconnect gateway subcommand Allow to disconnect first gateway with broker of specific nodeId or partitionId and role. Allow to disconnect first gateway with all brokers. Allow to disconnect only one direction (asymetric) --- go-chaos/cmd/disconnect.go | 138 ++++++++++++++++++++++++++++--------- 1 file changed, 104 insertions(+), 34 deletions(-) diff --git a/go-chaos/cmd/disconnect.go b/go-chaos/cmd/disconnect.go index 36387472a..0346516bd 100644 --- a/go-chaos/cmd/disconnect.go +++ b/go-chaos/cmd/disconnect.go @@ -15,6 +15,7 @@ package cmd import ( + "errors" "fmt" "github.com/camunda/zeebe/clients/go/v8/pkg/zbc" @@ -24,30 +25,36 @@ import ( ) var ( - oneDirection bool + oneDirection bool + disconnectToAll bool ) func init() { rootCmd.AddCommand(disconnect) + // disconnect brokers disconnect.AddCommand(disconnectBrokers) - // broker 1 disconnectBrokers.Flags().StringVar(&broker1Role, "broker1Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the first Broker") disconnectBrokers.Flags().IntVar(&broker1PartitionId, "broker1PartitionId", 1, "Specify the partition id of the first Broker") - disconnectBrokers.Flags().IntVar(&broker1NodeId, "broker1NodeId", -1, "Specify the nodeId of the first Broker") disconnectBrokers.MarkFlagsMutuallyExclusive("broker1PartitionId", "broker1NodeId") - // broker 2 disconnectBrokers.Flags().StringVar(&broker2Role, "broker2Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the second Broker") disconnectBrokers.Flags().IntVar(&broker2PartitionId, "broker2PartitionId", 2, "Specify the partition id of the second Broker") - disconnectBrokers.Flags().IntVar(&broker2NodeId, "broker2NodeId", -1, "Specify the nodeId of the second Broker") - // general disconnectBrokers.Flags().BoolVar(&oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)") disconnectBrokers.MarkFlagsMutuallyExclusive("broker2PartitionId", "broker2NodeId", "one-direction") + + // disconnect gateway + disconnect.AddCommand(disconnectGateway) + disconnectGateway.Flags().IntVar(&nodeId, "nodeId", -1, "Specify the nodeId of the Broker") + disconnectGateway.Flags().StringVar(&role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the Broker") + disconnectGateway.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the partition id of the Broker") + disconnectGateway.Flags().BoolVar(&oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)") + disconnectGateway.Flags().BoolVar(&disconnectToAll, "all", false, "Specify whether the gateway should be disconnected to all brokers") + disconnectGateway.MarkFlagsMutuallyExclusive("all", "partitionId", "nodeId") } var disconnect = &cobra.Command{ @@ -56,6 +63,12 @@ var disconnect = &cobra.Command{ Long: `Disconnect Zeebe nodes, uses sub-commands to disconnect leaders, followers, etc.`, } +func ensureNoError(err error) { + if err != nil { + panic(err) + } +} + var disconnectBrokers = &cobra.Command{ Use: "brokers", Short: "Disconnect Zeebe Brokers", @@ -63,14 +76,13 @@ var disconnectBrokers = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { internal.Verbosity = Verbose k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } + ensureNoError(err) + + err = k8Client.PauseReconciliation() + ensureNoError(err) err = k8Client.ApplyNetworkPatch() - if err != nil { - panic(err) - } + ensureNoError(err) if Verbose { fmt.Println("Patched statefulset") @@ -81,9 +93,7 @@ var disconnectBrokers = &cobra.Command{ defer closeFn() zbClient, err := internal.CreateZeebeClient(port) - if err != nil { - panic(err.Error()) - } + ensureNoError(err) defer zbClient.Close() broker1Pod := getBrokerPod(k8Client, zbClient, broker1NodeId, broker1PartitionId, broker1Role) @@ -94,19 +104,7 @@ var disconnectBrokers = &cobra.Command{ return } - err = internal.MakeIpUnreachableForPod(k8Client, broker2Pod.Status.PodIP, broker1Pod.Name) - if err != nil { - panic(err.Error()) - } - fmt.Printf("Disconnect %s from %s\n", broker1Pod.Name, broker2Pod.Name) - - if !oneDirection { - err = internal.MakeIpUnreachableForPod(k8Client, broker1Pod.Status.PodIP, broker2Pod.Name) - if err != nil { - panic(err.Error()) - } - fmt.Printf("Disconnect %s from %s\n", broker2Pod.Name, broker1Pod.Name) - } + disconnectPods(k8Client, broker1Pod, broker2Pod) }, } @@ -115,17 +113,13 @@ func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId var err error if brokerNodeId >= 0 { brokerPod, err = internal.GetBrokerPodForNodeId(k8Client, int32(brokerNodeId)) - if err != nil { - panic(err.Error()) - } + ensureNoError(err) if Verbose { fmt.Printf("Found Broker %s with node id %d.\n", brokerPod.Name, brokerNodeId) } } else { brokerPod, err = internal.GetBrokerPodForPartitionAndRole(k8Client, zbClient, brokerPartitionId, brokerRole) - if err != nil { - panic(err.Error()) - } + ensureNoError(err) if Verbose { fmt.Printf("Found Broker %s as %s for partition %d.\n", brokerPod.Name, role, brokerPartitionId) } @@ -133,3 +127,79 @@ func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId return brokerPod } + +func getGatewayPod(k8Client internal.K8Client) *v1.Pod { + pods, err := k8Client.GetGatewayPods() + ensureNoError(err) + + if pods != nil && len(pods.Items) > 0 { + return &pods.Items[0] + } + + panic(errors.New(fmt.Sprintf("Expected to find standalone gateway, but found nothing."))) +} + +var disconnectGateway = &cobra.Command{ + Use: "gateway", + Short: "Disconnect Zeebe Gateway", + Long: `Disconnect Zeebe Gateway from Broker with a given partition and role.`, + Run: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + err = k8Client.PauseReconciliation() + ensureNoError(err) + + err = k8Client.ApplyNetworkPatch() + ensureNoError(err) + + if Verbose { + fmt.Println("Patched statefulset") + } + + err = k8Client.ApplyNetworkPatchOnGateway() + ensureNoError(err) + + if Verbose { + fmt.Println("Patched deployment") + } + + err = k8Client.AwaitReadiness() + ensureNoError(err) + + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() + + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() + + gatewayPod := getGatewayPod(k8Client) + + if disconnectToAll { + pods, err := k8Client.GetBrokerPods() + ensureNoError(err) + + for _, brokerPod := range pods.Items { + disconnectPods(k8Client, gatewayPod, &brokerPod) + } + } else { + broker2Pod := getBrokerPod(k8Client, zbClient, nodeId, partitionId, role) + disconnectPods(k8Client, gatewayPod, broker2Pod) + } + }, +} + +func disconnectPods(k8Client internal.K8Client, firstPod *v1.Pod, secondPod *v1.Pod) { + err := internal.MakeIpUnreachableForPod(k8Client, secondPod.Status.PodIP, firstPod.Name) + ensureNoError(err) + fmt.Printf("Disconnect %s from %s\n", firstPod.Name, secondPod.Name) + + if !oneDirection { + err = internal.MakeIpUnreachableForPod(k8Client, firstPod.Status.PodIP, secondPod.Name) + ensureNoError(err) + fmt.Printf("Disconnect %s from %s\n", secondPod.Name, firstPod.Name) + } +} From 408f4188cc12189b1256478d707acb53d00ff3bd Mon Sep 17 00:00:00 2001 From: Christopher Zell <zelldon91@googlemail.com> Date: Wed, 16 Nov 2022 15:28:20 +0100 Subject: [PATCH 4/4] feat: connect gateway again with brokers --- go-chaos/cmd/connect.go | 36 ++++++++++++++++++++++++++++++++++++ go-chaos/internal/network.go | 18 ++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/go-chaos/cmd/connect.go b/go-chaos/cmd/connect.go index e6bade80a..e83930c59 100644 --- a/go-chaos/cmd/connect.go +++ b/go-chaos/cmd/connect.go @@ -24,6 +24,7 @@ import ( func init() { rootCmd.AddCommand(connect) connect.AddCommand(connectBrokers) + connect.AddCommand(connectGateway) } var connect = &cobra.Command{ @@ -69,3 +70,38 @@ var connectBrokers = &cobra.Command{ } }, } + +var connectGateway = &cobra.Command{ + Use: "gateway", + Short: "Connect Zeebe Gateway", + Long: `Connect all Zeebe Gateway again, after it has been disconnected.`, + Run: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + // No patch is need, since we expect that disconnect was executed before. + // If not all fine and the pods are already connected. + + // We run connect on all nodes + brokerPods, err := k8Client.GetBrokerPods() + ensureNoError(err) + + if len(brokerPods.Items) <= 0 { + panic(fmt.Sprintf("Expected to find broker(s) in current namespace %s, but found nothing\n", k8Client.GetCurrentNamespace())) + } + + gatewayPod := getGatewayPod(k8Client) + + for _, brokerPod := range brokerPods.Items { + err = internal.MakeIpReachable(k8Client, gatewayPod.Name, brokerPod.Status.PodIP) + if err != nil { + if Verbose { + fmt.Printf("Error on connection gateway: %s. Error: %s\n", gatewayPod.Name, err.Error()) + } + } else { + fmt.Printf("Connected %s again with %s, removed unreachable routes.\n", gatewayPod.Name, brokerPod.Name) + } + } + }, +} diff --git a/go-chaos/internal/network.go b/go-chaos/internal/network.go index 30a6e8dd7..d9c636308 100644 --- a/go-chaos/internal/network.go +++ b/go-chaos/internal/network.go @@ -134,3 +134,21 @@ func MakeIpReachableForPod(k8Client K8Client, podName string) error { return nil } + +func MakeIpReachable(k8Client K8Client, podName string, ip string) error { + // We try to reduce the system output in order to not break the execution. There is a limit for the sout for exec, + // for more details see remotecommand.StreamOptions + + err := k8Client.ExecuteCmdOnPod([]string{"sh", "-c", "command -v ip"}, podName) + if err != nil && strings.Contains(err.Error(), "exit code 127") { + return errors.New("Execution exited with exit code 127 (Command not found). It is likely that the broker was not disconnected or restarted in between.") + } + + var buf bytes.Buffer + err = k8Client.ExecuteCmdOnPodWriteIntoOutput([]string{"sh", "-c", fmt.Sprintf("ip route del unreachable %s", ip)}, podName, &buf) + if err != nil { + return err + } + + return nil +}