From 95b043d73d314cfa2328c774566398ce9dd47fe1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ole=20Sch=C3=B6nburg?= Date: Mon, 18 Dec 2023 14:37:31 +0100 Subject: [PATCH] feat(zbchaos): port-forward to random local port --- go-chaos/backend/clients.go | 3 +-- go-chaos/cmd/backup.go | 6 ++---- go-chaos/cmd/cluster.go | 6 ++---- go-chaos/cmd/deploy.go | 6 ++---- go-chaos/cmd/exporting.go | 6 ++---- go-chaos/cmd/publish.go | 3 +-- go-chaos/cmd/stress.go | 3 +-- go-chaos/cmd/terminate.go | 3 +-- go-chaos/cmd/topology.go | 3 +-- go-chaos/cmd/verify.go | 9 +++------ go-chaos/internal/pods.go | 14 +++++++++++--- go-chaos/worker/chaos_worker.go | 3 +-- 12 files changed, 28 insertions(+), 37 deletions(-) diff --git a/go-chaos/backend/clients.go b/go-chaos/backend/clients.go index 32a1e85c5..7c4fc926a 100644 --- a/go-chaos/backend/clients.go +++ b/go-chaos/backend/clients.go @@ -20,8 +20,7 @@ import ( ) func ConnectToZeebeCluster(k8Client internal.K8Client) (zbc.Client, func(), error) { - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) zbClient, err := internal.CreateZeebeClient(port) if err != nil { diff --git a/go-chaos/cmd/backup.go b/go-chaos/cmd/backup.go index f15244ae2..947e86e37 100644 --- a/go-chaos/cmd/backup.go +++ b/go-chaos/cmd/backup.go @@ -193,8 +193,7 @@ func takeBackup(flags *Flags) error { return err } - port := 9600 - closePortForward := k8Client.MustGatewayPortForward(port, port) + port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() url := fmt.Sprintf("http://localhost:%d/actuator/backups/%s", port, flags.backupId) resp, err := http.Post(url, "", nil) @@ -217,8 +216,7 @@ func waitForBackup(flags *Flags) error { panic(err) } - port := 9600 - closePortForward := k8Client.MustGatewayPortForward(port, port) + port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() for { diff --git a/go-chaos/cmd/cluster.go b/go-chaos/cmd/cluster.go index 299ef4a47..e41c4d0ac 100644 --- a/go-chaos/cmd/cluster.go +++ b/go-chaos/cmd/cluster.go @@ -63,8 +63,7 @@ func printCurrentTopology(flags *Flags) error { return err } - port := 9600 - closePortForward := k8Client.MustGatewayPortForward(port, port) + port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() topology, err := QueryTopology(port) @@ -90,8 +89,7 @@ func waitForChange(flags *Flags) error { return err } - port := 9600 - closePortForward := k8Client.MustGatewayPortForward(port, port) + port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() interval := time.Second * 5 diff --git a/go-chaos/cmd/deploy.go b/go-chaos/cmd/deploy.go index c0831d993..23bf60f3a 100644 --- a/go-chaos/cmd/deploy.go +++ b/go-chaos/cmd/deploy.go @@ -38,8 +38,7 @@ Defaults to the later, which is useful for experimenting with deployment distrib k8Client, err := createK8ClientWithFlags(flags) ensureNoError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port) @@ -62,8 +61,7 @@ Useful for experimenting with deployment distribution.`, k8Client, err := createK8ClientWithFlags(flags) ensureNoError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port) diff --git a/go-chaos/cmd/exporting.go b/go-chaos/cmd/exporting.go index 5bbd7e827..49c4863ef 100644 --- a/go-chaos/cmd/exporting.go +++ b/go-chaos/cmd/exporting.go @@ -56,8 +56,7 @@ func AddExportingCmds(rootCmd *cobra.Command, flags *Flags) { } func pauseExporting(k8Client internal.K8Client) error { - port := 9600 - closePortForward := k8Client.MustGatewayPortForward(port, port) + port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() url := fmt.Sprintf("http://localhost:%d/actuator/exporting/pause", port) resp, err := http.Post(url, "", nil) @@ -69,8 +68,7 @@ func pauseExporting(k8Client internal.K8Client) error { } func resumeExporting(k8Client internal.K8Client) error { - port := 9600 - closePortForward := k8Client.MustGatewayPortForward(port, port) + port, closePortForward := k8Client.MustGatewayPortForward(0, 9600) defer closePortForward() url := fmt.Sprintf("http://localhost:%d/actuator/exporting/resume", port) resp, err := http.Post(url, "", nil) diff --git a/go-chaos/cmd/publish.go b/go-chaos/cmd/publish.go index c072c21ae..bf1e32167 100644 --- a/go-chaos/cmd/publish.go +++ b/go-chaos/cmd/publish.go @@ -32,8 +32,7 @@ func AddPublishCmd(rootCmd *cobra.Command, flags *Flags) { k8Client, err := createK8ClientWithFlags(flags) panicOnError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port) diff --git a/go-chaos/cmd/stress.go b/go-chaos/cmd/stress.go index d22b74573..4997d85c6 100644 --- a/go-chaos/cmd/stress.go +++ b/go-chaos/cmd/stress.go @@ -40,8 +40,7 @@ func AddStressCmd(rootCmd *cobra.Command, flags *Flags) { k8Client, err := createK8ClientWithFlags(flags) ensureNoError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port) diff --git a/go-chaos/cmd/terminate.go b/go-chaos/cmd/terminate.go index aeaa660dc..321c5c8c0 100644 --- a/go-chaos/cmd/terminate.go +++ b/go-chaos/cmd/terminate.go @@ -99,8 +99,7 @@ func AddTerminateCommand(rootCmd *cobra.Command, flags *Flags) { // GracePeriod (in second) can be nil, which would mean using K8 default. // Returns the broker which has been restarted func restartBroker(k8Client internal.K8Client, nodeId int, partitionId int, role string, gracePeriod *int64) string { - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port) diff --git a/go-chaos/cmd/topology.go b/go-chaos/cmd/topology.go index c9950e658..c971cdb4e 100644 --- a/go-chaos/cmd/topology.go +++ b/go-chaos/cmd/topology.go @@ -38,8 +38,7 @@ func AddTopologyCmd(rootCmd *cobra.Command, flags *Flags) { panic(err) } - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() client, err := internal.CreateZeebeClient(port) diff --git a/go-chaos/cmd/verify.go b/go-chaos/cmd/verify.go index 33ecb2864..8c57642a8 100644 --- a/go-chaos/cmd/verify.go +++ b/go-chaos/cmd/verify.go @@ -54,8 +54,7 @@ Process instances are created until the required partition is reached.`, k8Client, err := createK8ClientWithFlags(flags) ensureNoError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port) @@ -88,8 +87,7 @@ Process instances are created until the required partition is reached.`, k8Client, err := createK8ClientWithFlags(flags) ensureNoError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port) @@ -118,8 +116,7 @@ Process instances are created until the required partition is reached.`, k8Client, err := createK8ClientWithFlags(flags) ensureNoError(err) - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port) diff --git a/go-chaos/internal/pods.go b/go-chaos/internal/pods.go index 1edc49b80..01e7329ec 100644 --- a/go-chaos/internal/pods.go +++ b/go-chaos/internal/pods.go @@ -215,9 +215,12 @@ func (c K8Client) RestartPod(podName string) error { // MustGatewayPortForward creates a port forwarding to a zeebe gateway with the given port. // Panics when port forwarding fails. +// localPort can be 0 to let the OS choose a random, free port. +// Returns the exposed local port and a function to close the port forwarding. +// // https://github.com/gruntwork-io/terratest/blob/master/modules/k8s/tunnel.go#L187-L196 // https://github.com/kubernetes/client-go/issues/51#issuecomment-436200428 -func (c K8Client) MustGatewayPortForward(localPort int, remotePort int) func() { +func (c K8Client) MustGatewayPortForward(localPort int, remotePort int) (int, func()) { names, err := c.GetGatewayPodNames() if err != nil { panic(err) @@ -246,8 +249,13 @@ func (c K8Client) MustGatewayPortForward(localPort int, remotePort int) func() { LogVerbose("\nError starting port forwarding tunnel: %s", err) panic(err) case <-portForwarder.Ready: - LogVerbose("Successfully created port forwarding tunnel") - return func() { + ports, err := portForwarder.GetPorts() + if err != nil { + panic(err) + } + exposedLocalPort := ports[0].Local + LogVerbose("Successfully created port forwarding tunnel from %d (local) to %d (remote)", exposedLocalPort, remotePort) + return int(exposedLocalPort), func() { portForwarder.Close() } } diff --git a/go-chaos/worker/chaos_worker.go b/go-chaos/worker/chaos_worker.go index 0ba8391e1..71b97f926 100644 --- a/go-chaos/worker/chaos_worker.go +++ b/go-chaos/worker/chaos_worker.go @@ -182,8 +182,7 @@ func getTargetClusterVersion(namespace string) string { return "" } - port := 26500 - closeFn := k8Client.MustGatewayPortForward(port, port) + port, closeFn := k8Client.MustGatewayPortForward(0, 26500) defer closeFn() zbClient, err := internal.CreateZeebeClient(port)