From d759f20ce4be3c1dbbf90d47d1a68b30260d7fcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Fri, 14 Oct 2022 15:26:22 +0200 Subject: [PATCH 1/4] fix: allow forwarding arbitrary ports --- go-chaos/cmd/disconnect.go | 2 +- go-chaos/cmd/publish.go | 2 +- go-chaos/cmd/restart.go | 2 +- go-chaos/cmd/terminate.go | 2 +- go-chaos/cmd/topology.go | 2 +- go-chaos/cmd/verify.go | 2 +- go-chaos/internal/pods.go | 8 ++++---- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/go-chaos/cmd/disconnect.go b/go-chaos/cmd/disconnect.go index 723f21b9b..0cf917e19 100644 --- a/go-chaos/cmd/disconnect.go +++ b/go-chaos/cmd/disconnect.go @@ -77,7 +77,7 @@ var disconnectBrokers = &cobra.Command{ } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err.Error()) } diff --git a/go-chaos/cmd/publish.go b/go-chaos/cmd/publish.go index 62173d120..afda4bcb2 100644 --- a/go-chaos/cmd/publish.go +++ b/go-chaos/cmd/publish.go @@ -38,7 +38,7 @@ var publishCmd = &cobra.Command{ panicOnError(err) port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) panicOnError(err) defer closeFn() diff --git a/go-chaos/cmd/restart.go b/go-chaos/cmd/restart.go index c9316d499..4c557b7c4 100644 --- a/go-chaos/cmd/restart.go +++ b/go-chaos/cmd/restart.go @@ -47,7 +47,7 @@ var restartCmd = &cobra.Command{ } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err) } diff --git a/go-chaos/cmd/terminate.go b/go-chaos/cmd/terminate.go index c0818cecf..5ad90479a 100644 --- a/go-chaos/cmd/terminate.go +++ b/go-chaos/cmd/terminate.go @@ -44,7 +44,7 @@ var terminateCmd = &cobra.Command{ } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err.Error()) } diff --git a/go-chaos/cmd/topology.go b/go-chaos/cmd/topology.go index 2516747cd..730b670be 100644 --- a/go-chaos/cmd/topology.go +++ b/go-chaos/cmd/topology.go @@ -41,7 +41,7 @@ var topologyCmd = &cobra.Command{ } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err) } diff --git a/go-chaos/cmd/verify.go b/go-chaos/cmd/verify.go index 903851434..286908750 100644 --- a/go-chaos/cmd/verify.go +++ b/go-chaos/cmd/verify.go @@ -73,7 +73,7 @@ A process model will be deployed and process instances are created until the req } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err.Error()) } diff --git a/go-chaos/internal/pods.go b/go-chaos/internal/pods.go index 09282ac15..7e1cb6c73 100644 --- a/go-chaos/internal/pods.go +++ b/go-chaos/internal/pods.go @@ -202,7 +202,7 @@ func (c K8Client) RestartPod(podName string) error { // GatewayPortForward creates a port forwarding to a zeebe gateway with the given port // https://github.com/gruntwork-io/terratest/blob/master/modules/k8s/tunnel.go#L187-L196 // https://github.com/kubernetes/client-go/issues/51#issuecomment-436200428 -func (c K8Client) GatewayPortForward(port int) (func(), error) { +func (c K8Client) GatewayPortForward(localPort int, remotePort int) (func(), error) { names, err := c.GetGatewayPodNames() if err != nil { return nil, err @@ -213,7 +213,7 @@ func (c K8Client) GatewayPortForward(port int) (func(), error) { } portForwardCreateURL := c.createPortForwardUrl(names) - portForwarder, err := c.createPortForwarder(port, portForwardCreateURL) + portForwarder, err := c.createPortForwarder(localPort, remotePort, portForwardCreateURL) if err != nil { return nil, err } @@ -245,7 +245,7 @@ func (c K8Client) GatewayPortForward(port int) (func(), error) { } // Create the k8 port forwarder, with the given port and k8 client -func (c K8Client) createPortForwarder(port int, portForwardCreateURL *url.URL) (*portforward.PortForwarder, error) { +func (c K8Client) createPortForwarder(localPort int, remotePort int, portForwardCreateURL *url.URL) (*portforward.PortForwarder, error) { // Construct the spdy client required by the client-go portforward library config, err := c.ClientConfig.ClientConfig() if err != nil { @@ -264,7 +264,7 @@ func (c K8Client) createPortForwarder(port int, portForwardCreateURL *url.URL) ( // Construct a new PortForwarder struct that manages the instructed port forward tunnel stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) out, errOut := new(bytes.Buffer), new(bytes.Buffer) - ports := []string{fmt.Sprintf("%d:%d", port, 26500)} + ports := []string{fmt.Sprintf("%d:%d", localPort, remotePort)} portforwarder, err := portforward.New(dialer, ports, stopChan, readyChan, out, errOut) if err != nil { if Verbosity { From 5c4e86a3305bc435c19ce59e838f82c705942448 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Fri, 14 Oct 2022 15:29:39 +0200 Subject: [PATCH 2/4] feat: add command to take backup --- go-chaos/cmd/backup.go | 59 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 go-chaos/cmd/backup.go diff --git a/go-chaos/cmd/backup.go b/go-chaos/cmd/backup.go new file mode 100644 index 000000000..f69acae7e --- /dev/null +++ b/go-chaos/cmd/backup.go @@ -0,0 +1,59 @@ +// Copyright 2022 Camunda Services GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "fmt" + "net/http" + "time" + + "github.com/spf13/cobra" + "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" +) + +func init() { + rootCmd.AddCommand(backupCommand) + backupCommand.AddCommand(takeBackupCommand) +} + +var backupCommand = &cobra.Command{ + Use: "backup", + Short: "Controls Zeebe backups", + Long: "Can be used to take backups and query their status", +} + +var takeBackupCommand = &cobra.Command{ + Use: "take", + Short: "Trigger a backup", + RunE: take_backup, +} + +func take_backup(cmd *cobra.Command, args []string) error { + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } + + port := 9600 + closeFn, err := k8Client.GatewayPortForward(port, port) + if err != nil { + panic(err.Error()) + } + defer closeFn() + timestamp := time.Now().UnixMilli() + url := fmt.Sprintf("http://localhost:%d/actuator/backups/%d", port, timestamp) + _, err = http.Post(url, "", nil) + return err +} From 19f9c5ec1fcdc2a9a358dcfd4ae8473699376753 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Fri, 14 Oct 2022 15:39:43 +0200 Subject: [PATCH 3/4] feat: allow specifying backup id --- go-chaos/cmd/backup.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/go-chaos/cmd/backup.go b/go-chaos/cmd/backup.go index f69acae7e..26ebb619d 100644 --- a/go-chaos/cmd/backup.go +++ b/go-chaos/cmd/backup.go @@ -17,15 +17,21 @@ package cmd import ( "fmt" "net/http" + "strconv" "time" "github.com/spf13/cobra" "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" ) +var ( + backupId string +) + func init() { rootCmd.AddCommand(backupCommand) backupCommand.AddCommand(takeBackupCommand) + takeBackupCommand.Flags().StringVar(&backupId, "backup-id", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") } var backupCommand = &cobra.Command{ @@ -52,8 +58,7 @@ func take_backup(cmd *cobra.Command, args []string) error { panic(err.Error()) } defer closeFn() - timestamp := time.Now().UnixMilli() - url := fmt.Sprintf("http://localhost:%d/actuator/backups/%d", port, timestamp) + url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, backupId) _, err = http.Post(url, "", nil) return err } From 89d96a2a22e62913616deb47d10b7bef90c5e2b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Mon, 17 Oct 2022 09:55:11 +0200 Subject: [PATCH 4/4] feat: add command to wait for backup --- go-chaos/cmd/backup.go | 88 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 6 deletions(-) diff --git a/go-chaos/cmd/backup.go b/go-chaos/cmd/backup.go index 26ebb619d..0ffcb9f18 100644 --- a/go-chaos/cmd/backup.go +++ b/go-chaos/cmd/backup.go @@ -15,7 +15,10 @@ package cmd import ( + "encoding/json" + "errors" "fmt" + "io" "net/http" "strconv" "time" @@ -30,8 +33,11 @@ var ( func init() { rootCmd.AddCommand(backupCommand) + backupCommand.AddCommand(takeBackupCommand) - takeBackupCommand.Flags().StringVar(&backupId, "backup-id", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") + takeBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") + backupCommand.AddCommand(waitForBackupCommand) + waitForBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") } var backupCommand = &cobra.Command{ @@ -43,22 +49,92 @@ var backupCommand = &cobra.Command{ var takeBackupCommand = &cobra.Command{ Use: "take", Short: "Trigger a backup", - RunE: take_backup, + RunE: takeBackup, +} + +var waitForBackupCommand = &cobra.Command{ + Use: "wait", + Short: "Wait for a backup to complete or fail", + RunE: waitForBackup, } -func take_backup(cmd *cobra.Command, args []string) error { +func takeBackup(cmd *cobra.Command, args []string) error { k8Client, err := internal.CreateK8Client() if err != nil { panic(err) } port := 9600 - closeFn, err := k8Client.GatewayPortForward(port, port) + closePortForward, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err.Error()) } - defer closeFn() + defer closePortForward() url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, backupId) - _, err = http.Post(url, "", nil) + resp, err := http.Post(url, "", nil) + if err != nil { + return err + } + defer resp.Body.Close() return err } + +func waitForBackup(cmd *cobra.Command, args []string) error { + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } + + port := 9600 + closePortForward, err := k8Client.GatewayPortForward(port, port) + if err != nil { + panic(err.Error()) + } + defer closePortForward() + + for { + backup, err := getBackupStatus(port, backupId) + if err != nil { + time.Sleep(5 * time.Second) + continue + } + + switch backup.Status { + case "COMPLETED": + return nil + case "FAILED": + return errors.New("backup failed") + case "DOES_NOT_EXIST": + return errors.New("backup does not exist") + } + time.Sleep(5 * time.Second) + } + +} + +func getBackupStatus(port int, backupId string) (*BackupStatus, error) { + url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, backupId) + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var backup BackupStatus + err = json.Unmarshal(body, &backup) + if err != nil { + return nil, err + } + + fmt.Printf("Found backup %s with status: %s\n", backupId, backup.Status) + + return &backup, nil +} + +type BackupStatus struct { + Status string +}