diff --git a/go-chaos/cmd/connect.go b/go-chaos/cmd/connect.go index e6bade80a..e83930c59 100644 --- a/go-chaos/cmd/connect.go +++ b/go-chaos/cmd/connect.go @@ -24,6 +24,7 @@ import ( func init() { rootCmd.AddCommand(connect) connect.AddCommand(connectBrokers) + connect.AddCommand(connectGateway) } var connect = &cobra.Command{ @@ -69,3 +70,38 @@ var connectBrokers = &cobra.Command{ } }, } + +var connectGateway = &cobra.Command{ + Use: "gateway", + Short: "Connect Zeebe Gateway", + Long: `Connect all Zeebe Gateway again, after it has been disconnected.`, + Run: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + // No patch is need, since we expect that disconnect was executed before. + // If not all fine and the pods are already connected. + + // We run connect on all nodes + brokerPods, err := k8Client.GetBrokerPods() + ensureNoError(err) + + if len(brokerPods.Items) <= 0 { + panic(fmt.Sprintf("Expected to find broker(s) in current namespace %s, but found nothing\n", k8Client.GetCurrentNamespace())) + } + + gatewayPod := getGatewayPod(k8Client) + + for _, brokerPod := range brokerPods.Items { + err = internal.MakeIpReachable(k8Client, gatewayPod.Name, brokerPod.Status.PodIP) + if err != nil { + if Verbose { + fmt.Printf("Error on connection gateway: %s. Error: %s\n", gatewayPod.Name, err.Error()) + } + } else { + fmt.Printf("Connected %s again with %s, removed unreachable routes.\n", gatewayPod.Name, brokerPod.Name) + } + } + }, +} diff --git a/go-chaos/cmd/disconnect.go b/go-chaos/cmd/disconnect.go index 36387472a..0346516bd 100644 --- a/go-chaos/cmd/disconnect.go +++ b/go-chaos/cmd/disconnect.go @@ -15,6 +15,7 @@ package cmd import ( + "errors" "fmt" "github.com/camunda/zeebe/clients/go/v8/pkg/zbc" @@ -24,30 +25,36 @@ import ( ) var ( - oneDirection bool + oneDirection bool + disconnectToAll bool ) func init() { rootCmd.AddCommand(disconnect) + // disconnect brokers disconnect.AddCommand(disconnectBrokers) - // broker 1 disconnectBrokers.Flags().StringVar(&broker1Role, "broker1Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the first Broker") disconnectBrokers.Flags().IntVar(&broker1PartitionId, "broker1PartitionId", 1, "Specify the partition id of the first Broker") - disconnectBrokers.Flags().IntVar(&broker1NodeId, "broker1NodeId", -1, "Specify the nodeId of the first Broker") disconnectBrokers.MarkFlagsMutuallyExclusive("broker1PartitionId", "broker1NodeId") - // broker 2 disconnectBrokers.Flags().StringVar(&broker2Role, "broker2Role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the second Broker") disconnectBrokers.Flags().IntVar(&broker2PartitionId, "broker2PartitionId", 2, "Specify the partition id of the second Broker") - disconnectBrokers.Flags().IntVar(&broker2NodeId, "broker2NodeId", -1, "Specify the nodeId of the second Broker") - // general disconnectBrokers.Flags().BoolVar(&oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)") disconnectBrokers.MarkFlagsMutuallyExclusive("broker2PartitionId", "broker2NodeId", "one-direction") + + // disconnect gateway + disconnect.AddCommand(disconnectGateway) + disconnectGateway.Flags().IntVar(&nodeId, "nodeId", -1, "Specify the nodeId of the Broker") + disconnectGateway.Flags().StringVar(&role, "role", "LEADER", "Specify the partition role [LEADER, FOLLOWER] of the Broker") + disconnectGateway.Flags().IntVar(&partitionId, "partitionId", 1, "Specify the partition id of the Broker") + disconnectGateway.Flags().BoolVar(&oneDirection, "one-direction", false, "Specify whether the network partition should be setup only in one direction (asymmetric)") + disconnectGateway.Flags().BoolVar(&disconnectToAll, "all", false, "Specify whether the gateway should be disconnected to all brokers") + disconnectGateway.MarkFlagsMutuallyExclusive("all", "partitionId", "nodeId") } var disconnect = &cobra.Command{ @@ -56,6 +63,12 @@ var disconnect = &cobra.Command{ Long: `Disconnect Zeebe nodes, uses sub-commands to disconnect leaders, followers, etc.`, } +func ensureNoError(err error) { + if err != nil { + panic(err) + } +} + var disconnectBrokers = &cobra.Command{ Use: "brokers", Short: "Disconnect Zeebe Brokers", @@ -63,14 +76,13 @@ var disconnectBrokers = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { internal.Verbosity = Verbose k8Client, err := internal.CreateK8Client() - if err != nil { - panic(err) - } + ensureNoError(err) + + err = k8Client.PauseReconciliation() + ensureNoError(err) err = k8Client.ApplyNetworkPatch() - if err != nil { - panic(err) - } + ensureNoError(err) if Verbose { fmt.Println("Patched statefulset") @@ -81,9 +93,7 @@ var disconnectBrokers = &cobra.Command{ defer closeFn() zbClient, err := internal.CreateZeebeClient(port) - if err != nil { - panic(err.Error()) - } + ensureNoError(err) defer zbClient.Close() broker1Pod := getBrokerPod(k8Client, zbClient, broker1NodeId, broker1PartitionId, broker1Role) @@ -94,19 +104,7 @@ var disconnectBrokers = &cobra.Command{ return } - err = internal.MakeIpUnreachableForPod(k8Client, broker2Pod.Status.PodIP, broker1Pod.Name) - if err != nil { - panic(err.Error()) - } - fmt.Printf("Disconnect %s from %s\n", broker1Pod.Name, broker2Pod.Name) - - if !oneDirection { - err = internal.MakeIpUnreachableForPod(k8Client, broker1Pod.Status.PodIP, broker2Pod.Name) - if err != nil { - panic(err.Error()) - } - fmt.Printf("Disconnect %s from %s\n", broker2Pod.Name, broker1Pod.Name) - } + disconnectPods(k8Client, broker1Pod, broker2Pod) }, } @@ -115,17 +113,13 @@ func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId var err error if brokerNodeId >= 0 { brokerPod, err = internal.GetBrokerPodForNodeId(k8Client, int32(brokerNodeId)) - if err != nil { - panic(err.Error()) - } + ensureNoError(err) if Verbose { fmt.Printf("Found Broker %s with node id %d.\n", brokerPod.Name, brokerNodeId) } } else { brokerPod, err = internal.GetBrokerPodForPartitionAndRole(k8Client, zbClient, brokerPartitionId, brokerRole) - if err != nil { - panic(err.Error()) - } + ensureNoError(err) if Verbose { fmt.Printf("Found Broker %s as %s for partition %d.\n", brokerPod.Name, role, brokerPartitionId) } @@ -133,3 +127,79 @@ func getBrokerPod(k8Client internal.K8Client, zbClient zbc.Client, brokerNodeId return brokerPod } + +func getGatewayPod(k8Client internal.K8Client) *v1.Pod { + pods, err := k8Client.GetGatewayPods() + ensureNoError(err) + + if pods != nil && len(pods.Items) > 0 { + return &pods.Items[0] + } + + panic(errors.New(fmt.Sprintf("Expected to find standalone gateway, but found nothing."))) +} + +var disconnectGateway = &cobra.Command{ + Use: "gateway", + Short: "Disconnect Zeebe Gateway", + Long: `Disconnect Zeebe Gateway from Broker with a given partition and role.`, + Run: func(cmd *cobra.Command, args []string) { + internal.Verbosity = Verbose + k8Client, err := internal.CreateK8Client() + ensureNoError(err) + + err = k8Client.PauseReconciliation() + ensureNoError(err) + + err = k8Client.ApplyNetworkPatch() + ensureNoError(err) + + if Verbose { + fmt.Println("Patched statefulset") + } + + err = k8Client.ApplyNetworkPatchOnGateway() + ensureNoError(err) + + if Verbose { + fmt.Println("Patched deployment") + } + + err = k8Client.AwaitReadiness() + ensureNoError(err) + + port := 26500 + closeFn := k8Client.MustGatewayPortForward(port, port) + defer closeFn() + + zbClient, err := internal.CreateZeebeClient(port) + ensureNoError(err) + defer zbClient.Close() + + gatewayPod := getGatewayPod(k8Client) + + if disconnectToAll { + pods, err := k8Client.GetBrokerPods() + ensureNoError(err) + + for _, brokerPod := range pods.Items { + disconnectPods(k8Client, gatewayPod, &brokerPod) + } + } else { + broker2Pod := getBrokerPod(k8Client, zbClient, nodeId, partitionId, role) + disconnectPods(k8Client, gatewayPod, broker2Pod) + } + }, +} + +func disconnectPods(k8Client internal.K8Client, firstPod *v1.Pod, secondPod *v1.Pod) { + err := internal.MakeIpUnreachableForPod(k8Client, secondPod.Status.PodIP, firstPod.Name) + ensureNoError(err) + fmt.Printf("Disconnect %s from %s\n", firstPod.Name, secondPod.Name) + + if !oneDirection { + err = internal.MakeIpUnreachableForPod(k8Client, firstPod.Status.PodIP, secondPod.Name) + ensureNoError(err) + fmt.Printf("Disconnect %s from %s\n", secondPod.Name, firstPod.Name) + } +} diff --git a/go-chaos/internal/network.go b/go-chaos/internal/network.go index 1bc46218d..d9c636308 100644 --- a/go-chaos/internal/network.go +++ b/go-chaos/internal/network.go @@ -67,7 +67,7 @@ func (c K8Client) ApplyNetworkPatchOnGateway() error { "spec":{ "containers":[ { - "name": "zeebe", + "name": "zeebe-gateway", "securityContext":{ "capabilities":{ "add":["NET_ADMIN"] @@ -134,3 +134,21 @@ func MakeIpReachableForPod(k8Client K8Client, podName string) error { return nil } + +func MakeIpReachable(k8Client K8Client, podName string, ip string) error { + // We try to reduce the system output in order to not break the execution. There is a limit for the sout for exec, + // for more details see remotecommand.StreamOptions + + err := k8Client.ExecuteCmdOnPod([]string{"sh", "-c", "command -v ip"}, podName) + if err != nil && strings.Contains(err.Error(), "exit code 127") { + return errors.New("Execution exited with exit code 127 (Command not found). It is likely that the broker was not disconnected or restarted in between.") + } + + var buf bytes.Buffer + err = k8Client.ExecuteCmdOnPodWriteIntoOutput([]string{"sh", "-c", fmt.Sprintf("ip route del unreachable %s", ip)}, podName, &buf) + if err != nil { + return err + } + + return nil +} diff --git a/go-chaos/internal/network_test.go b/go-chaos/internal/network_test.go index 0571b2009..e2c8aa021 100644 --- a/go-chaos/internal/network_test.go +++ b/go-chaos/internal/network_test.go @@ -47,7 +47,7 @@ func Test_ShouldApplyNetworkPatchOnDeployment(t *testing.T) { k8Client := CreateFakeClient() selector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels()) require.NoError(t, err) - k8Client.CreateDeploymentWithLabelsAndName(t, selector, "gateway") + k8Client.CreateDeploymentWithLabelsAndName(t, selector, "zeebe-gateway") // when err = k8Client.ApplyNetworkPatchOnGateway() diff --git a/go-chaos/internal/pods.go b/go-chaos/internal/pods.go index e4e73bb7d..4e9dc0536 100644 --- a/go-chaos/internal/pods.go +++ b/go-chaos/internal/pods.go @@ -66,7 +66,7 @@ func (c K8Client) extractPodNames(list *v1.PodList) ([]string, error) { return names, nil } -func (c K8Client) GetGatewayPodNames() ([]string, error) { +func (c K8Client) GetGatewayPods() (*v1.PodList, error) { listOptions := metav1.ListOptions{ LabelSelector: c.getGatewayLabels(), // we check for running gateways, since terminated gateways can be lying around @@ -78,6 +78,15 @@ func (c K8Client) GetGatewayPodNames() ([]string, error) { return nil, err } + return list, err +} + +func (c K8Client) GetGatewayPodNames() ([]string, error) { + list, err := c.GetGatewayPods() + if err != nil { + return nil, err + } + if list == nil || len(list.Items) == 0 { return c.GetBrokerPodNames() } diff --git a/go-chaos/internal/pods_test.go b/go-chaos/internal/pods_test.go index 9695f566a..8d8d1d5dd 100644 --- a/go-chaos/internal/pods_test.go +++ b/go-chaos/internal/pods_test.go @@ -123,6 +123,30 @@ func Test_GetNoBrokerPodNames(t *testing.T) { require.Empty(t, names) } +func Test_GetSelfManagedGatewayPod(t *testing.T) { + // given + k8Client := CreateFakeClient() + + // gateway + selector, err := metav1.ParseToLabelSelector(getSelfManagedGatewayLabels()) + require.NoError(t, err) + k8Client.CreatePodWithLabelsAndName(t, selector, "gateway") + + // broker + selector, err = metav1.ParseToLabelSelector(getSelfManagedBrokerLabels()) + require.NoError(t, err) + k8Client.CreatePodWithLabelsAndName(t, selector, "broker") + + // when + pods, err := k8Client.GetGatewayPods() + + // then + require.NoError(t, err) + require.NotNil(t, pods) + require.NotEmpty(t, pods) + assert.Equal(t, "gateway", pods.Items[0].Name, "Expected to retrieve gateway") +} + func Test_GetSelfManagedGatewayPodNames(t *testing.T) { // given k8Client := CreateFakeClient() @@ -172,6 +196,44 @@ func Test_GetSaasGatewayPodNames(t *testing.T) { assert.Equal(t, "gateway", names[0], "Expected to retrieve gateway") } +func Test_GetSaaSGatewayPod(t *testing.T) { + // given + k8Client := CreateFakeClient() + k8Client.createSaaSCRD(t) + + // gateway + selector, err := metav1.ParseToLabelSelector(getSaasGatewayLabels()) + require.NoError(t, err) + k8Client.CreatePodWithLabelsAndName(t, selector, "gateway") + + // broker + selector, err = metav1.ParseToLabelSelector(getSelfManagedBrokerLabels()) + require.NoError(t, err) + k8Client.CreatePodWithLabelsAndName(t, selector, "broker") + + // when + pods, err := k8Client.GetGatewayPods() + + // then + require.NoError(t, err) + require.NotNil(t, pods) + require.NotEmpty(t, pods) + assert.Equal(t, "gateway", pods.Items[0].Name, "Expected to retrieve gateway") +} + +func Test_GetNoGatewayPods(t *testing.T) { + // given + k8Client := CreateFakeClient() + + // when + pods, err := k8Client.GetGatewayPods() + + // then + require.NoError(t, err) + require.NotNil(t, pods) + require.Empty(t, pods) +} + func Test_GetNoGatewayPodNames(t *testing.T) { // given k8Client := CreateFakeClient()