diff --git a/go-chaos/cmd/backup.go b/go-chaos/cmd/backup.go new file mode 100644 index 000000000..0ffcb9f18 --- /dev/null +++ b/go-chaos/cmd/backup.go @@ -0,0 +1,140 @@ +// Copyright 2022 Camunda Services GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cmd + +import ( + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "time" + + "github.com/spf13/cobra" + "github.com/zeebe-io/zeebe-chaos/go-chaos/internal" +) + +var ( + backupId string +) + +func init() { + rootCmd.AddCommand(backupCommand) + + backupCommand.AddCommand(takeBackupCommand) + takeBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") + backupCommand.AddCommand(waitForBackupCommand) + waitForBackupCommand.Flags().StringVar(&backupId, "backupId", strconv.FormatInt(time.Now().UnixMilli(), 10), "optionally specify the backup id to use, uses the current timestamp by default") +} + +var backupCommand = &cobra.Command{ + Use: "backup", + Short: "Controls Zeebe backups", + Long: "Can be used to take backups and query their status", +} + +var takeBackupCommand = &cobra.Command{ + Use: "take", + Short: "Trigger a backup", + RunE: takeBackup, +} + +var waitForBackupCommand = &cobra.Command{ + Use: "wait", + Short: "Wait for a backup to complete or fail", + RunE: waitForBackup, +} + +func takeBackup(cmd *cobra.Command, args []string) error { + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } + + port := 9600 + closePortForward, err := k8Client.GatewayPortForward(port, port) + if err != nil { + panic(err.Error()) + } + defer closePortForward() + url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, backupId) + resp, err := http.Post(url, "", nil) + if err != nil { + return err + } + defer resp.Body.Close() + return err +} + +func waitForBackup(cmd *cobra.Command, args []string) error { + k8Client, err := internal.CreateK8Client() + if err != nil { + panic(err) + } + + port := 9600 + closePortForward, err := k8Client.GatewayPortForward(port, port) + if err != nil { + panic(err.Error()) + } + defer closePortForward() + + for { + backup, err := getBackupStatus(port, backupId) + if err != nil { + time.Sleep(5 * time.Second) + continue + } + + switch backup.Status { + case "COMPLETED": + return nil + case "FAILED": + return errors.New("backup failed") + case "DOES_NOT_EXIST": + return errors.New("backup does not exist") + } + time.Sleep(5 * time.Second) + } + +} + +func getBackupStatus(port int, backupId string) (*BackupStatus, error) { + url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, backupId) + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var backup BackupStatus + err = json.Unmarshal(body, &backup) + if err != nil { + return nil, err + } + + fmt.Printf("Found backup %s with status: %s\n", backupId, backup.Status) + + return &backup, nil +} + +type BackupStatus struct { + Status string +} diff --git a/go-chaos/cmd/disconnect.go b/go-chaos/cmd/disconnect.go index 723f21b9b..0cf917e19 100644 --- a/go-chaos/cmd/disconnect.go +++ b/go-chaos/cmd/disconnect.go @@ -77,7 +77,7 @@ var disconnectBrokers = &cobra.Command{ } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err.Error()) } diff --git a/go-chaos/cmd/publish.go b/go-chaos/cmd/publish.go index 62173d120..afda4bcb2 100644 --- a/go-chaos/cmd/publish.go +++ b/go-chaos/cmd/publish.go @@ -38,7 +38,7 @@ var publishCmd = &cobra.Command{ panicOnError(err) port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) panicOnError(err) defer closeFn() diff --git a/go-chaos/cmd/restart.go b/go-chaos/cmd/restart.go index c9316d499..4c557b7c4 100644 --- a/go-chaos/cmd/restart.go +++ b/go-chaos/cmd/restart.go @@ -47,7 +47,7 @@ var restartCmd = &cobra.Command{ } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err) } diff --git a/go-chaos/cmd/terminate.go b/go-chaos/cmd/terminate.go index c0818cecf..5ad90479a 100644 --- a/go-chaos/cmd/terminate.go +++ b/go-chaos/cmd/terminate.go @@ -44,7 +44,7 @@ var terminateCmd = &cobra.Command{ } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err.Error()) } diff --git a/go-chaos/cmd/topology.go b/go-chaos/cmd/topology.go index 2516747cd..730b670be 100644 --- a/go-chaos/cmd/topology.go +++ b/go-chaos/cmd/topology.go @@ -41,7 +41,7 @@ var topologyCmd = &cobra.Command{ } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err) } diff --git a/go-chaos/cmd/verify.go b/go-chaos/cmd/verify.go index 903851434..286908750 100644 --- a/go-chaos/cmd/verify.go +++ b/go-chaos/cmd/verify.go @@ -73,7 +73,7 @@ A process model will be deployed and process instances are created until the req } port := 26500 - closeFn, err := k8Client.GatewayPortForward(port) + closeFn, err := k8Client.GatewayPortForward(port, port) if err != nil { panic(err.Error()) } diff --git a/go-chaos/internal/pods.go b/go-chaos/internal/pods.go index 09282ac15..7e1cb6c73 100644 --- a/go-chaos/internal/pods.go +++ b/go-chaos/internal/pods.go @@ -202,7 +202,7 @@ func (c K8Client) RestartPod(podName string) error { // GatewayPortForward creates a port forwarding to a zeebe gateway with the given port // https://github.com/gruntwork-io/terratest/blob/master/modules/k8s/tunnel.go#L187-L196 // https://github.com/kubernetes/client-go/issues/51#issuecomment-436200428 -func (c K8Client) GatewayPortForward(port int) (func(), error) { +func (c K8Client) GatewayPortForward(localPort int, remotePort int) (func(), error) { names, err := c.GetGatewayPodNames() if err != nil { return nil, err @@ -213,7 +213,7 @@ func (c K8Client) GatewayPortForward(port int) (func(), error) { } portForwardCreateURL := c.createPortForwardUrl(names) - portForwarder, err := c.createPortForwarder(port, portForwardCreateURL) + portForwarder, err := c.createPortForwarder(localPort, remotePort, portForwardCreateURL) if err != nil { return nil, err } @@ -245,7 +245,7 @@ func (c K8Client) GatewayPortForward(port int) (func(), error) { } // Create the k8 port forwarder, with the given port and k8 client -func (c K8Client) createPortForwarder(port int, portForwardCreateURL *url.URL) (*portforward.PortForwarder, error) { +func (c K8Client) createPortForwarder(localPort int, remotePort int, portForwardCreateURL *url.URL) (*portforward.PortForwarder, error) { // Construct the spdy client required by the client-go portforward library config, err := c.ClientConfig.ClientConfig() if err != nil { @@ -264,7 +264,7 @@ func (c K8Client) createPortForwarder(port int, portForwardCreateURL *url.URL) ( // Construct a new PortForwarder struct that manages the instructed port forward tunnel stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) out, errOut := new(bytes.Buffer), new(bytes.Buffer) - ports := []string{fmt.Sprintf("%d:%d", port, 26500)} + ports := []string{fmt.Sprintf("%d:%d", localPort, remotePort)} portforwarder, err := portforward.New(dialer, ports, stopChan, readyChan, out, errOut) if err != nil { if Verbosity {