Skip to content

Commit

Permalink
utils: export WaitObserver methods into wait package
Browse files Browse the repository at this point in the history
Signed-off-by: Gilberto Bertin <[email protected]>
  • Loading branch information
jibi committed Sep 27, 2023
1 parent fef555c commit 158b7da
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 79 deletions.
7 changes: 4 additions & 3 deletions clustermesh/clustermesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cilium/cilium-cli/internal/utils"
"github.com/cilium/cilium-cli/k8s"
"github.com/cilium/cilium-cli/status"
"github.com/cilium/cilium-cli/utils/wait"
)

const (
Expand Down Expand Up @@ -1167,7 +1168,7 @@ type Status struct {
}

func (k *K8sClusterMesh) statusAccessInformation(ctx context.Context, log bool, getExternalWorkloadSecret bool) (*accessInformation, error) {
w := utils.NewWaitObserver(ctx, utils.WaitParameters{Log: func(err error, wait string) {
w := wait.NewObserver(ctx, wait.Parameters{Log: func(err error, wait string) {
if log {
k.Log("⌛ Waiting (%s) for access information: %s", wait, err)
}
Expand All @@ -1188,7 +1189,7 @@ func (k *K8sClusterMesh) statusAccessInformation(ctx context.Context, log bool,
}

func (k *K8sClusterMesh) statusDeployment(ctx context.Context) (err error) {
w := utils.NewWaitObserver(ctx, utils.WaitParameters{Log: func(err error, wait string) {
w := wait.NewObserver(ctx, wait.Parameters{Log: func(err error, wait string) {
k.Log("⌛ Waiting (%s) for deployment %s to become ready: %s", wait, defaults.ClusterMeshDeploymentName, err)
}})
defer func() {
Expand Down Expand Up @@ -1330,7 +1331,7 @@ func (c *ConnectivityStatus) parseAgentStatus(name string, expected []string, s
}

func (k *K8sClusterMesh) statusConnectivity(ctx context.Context) (*ConnectivityStatus, error) {
w := utils.NewWaitObserver(ctx, utils.WaitParameters{Log: func(err error, wait string) {
w := wait.NewObserver(ctx, wait.Parameters{Log: func(err error, wait string) {
k.Log("⌛ Waiting (%s) for clusters to be connected: %s", wait, err)
}})
defer w.Cancel()
Expand Down
4 changes: 2 additions & 2 deletions connectivity/tests/egressgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/cilium/cilium-cli/connectivity/check"
"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/internal/utils"
"github.com/cilium/cilium-cli/utils/wait"

v1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -42,7 +42,7 @@ func waitForBpfPolicyEntries(ctx context.Context, t *check.Test,
targetEntriesCallback func(ciliumPod check.Pod) []bpfEgressGatewayPolicyEntry) {
ct := t.Context()

w := utils.NewWaitObserver(ctx, utils.WaitParameters{Timeout: 10 * time.Second})
w := wait.NewObserver(ctx, wait.Parameters{Timeout: 10 * time.Second})
defer w.Cancel()

ensureBpfPolicyEntries := func() error {
Expand Down
74 changes: 0 additions & 74 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
package utils

import (
"context"
"fmt"
"os"
"regexp"
"strings"
"time"

"github.com/blang/semver/v4"

Expand Down Expand Up @@ -93,78 +91,6 @@ func BuildImagePath(userImage, userVersion, defaultImage, defaultVersion string,
return image
}

type LogFunc func(err error, waitTime string)

type WaitParameters struct {
RetryInterval time.Duration
WarningInterval time.Duration
Timeout time.Duration
Log LogFunc
}

func (w WaitParameters) retryInterval() time.Duration {
if w.RetryInterval != time.Duration(0) {
return w.RetryInterval
}

return defaults.WaitRetryInterval
}

func (w WaitParameters) warningInterval() time.Duration {
if w.WarningInterval != time.Duration(0) {
return w.WarningInterval
}

return defaults.WaitWarningInterval
}

type WaitObserver struct {
ctx context.Context
params WaitParameters
lastWarning time.Time
waitStarted time.Time
cancel context.CancelFunc
}

func NewWaitObserver(ctx context.Context, p WaitParameters) *WaitObserver {
w := &WaitObserver{
ctx: ctx,
params: p,
waitStarted: time.Now(),
}

if p.Timeout != time.Duration(0) {
w.ctx, w.cancel = context.WithTimeout(ctx, p.Timeout)
}

return w
}

func (w *WaitObserver) Cancel() {
if w.cancel != nil {
w.cancel()
}
}

func (w *WaitObserver) Retry(err error) error {
if w.params.Log != nil && time.Since(w.lastWarning) > w.params.warningInterval() {
waitString := time.Since(w.waitStarted).Truncate(time.Second).String()
w.params.Log(err, waitString)
w.lastWarning = time.Now()
}

select {
case <-w.ctx.Done():
if err != nil {
return fmt.Errorf("timeout while waiting for condition, last error: %s", err)
}
return fmt.Errorf("timeout while waiting for condition")
case <-time.After(w.params.retryInterval()):
}

return nil
}

func Contains(l []string, v string) bool {
for _, s := range l {
if s == v {
Expand Down
84 changes: 84 additions & 0 deletions utils/wait/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package wait

import (
"context"
"fmt"
"time"

"github.com/cilium/cilium-cli/defaults"
)

type LogFunc func(err error, waitTime string)

type Parameters struct {
RetryInterval time.Duration
WarningInterval time.Duration
Timeout time.Duration
Log LogFunc
}

func (w Parameters) retryInterval() time.Duration {
if w.RetryInterval != time.Duration(0) {
return w.RetryInterval
}

return defaults.WaitRetryInterval
}

func (w Parameters) warningInterval() time.Duration {
if w.WarningInterval != time.Duration(0) {
return w.WarningInterval
}

return defaults.WaitWarningInterval
}

type Observer struct {
ctx context.Context
params Parameters
lastWarning time.Time
waitStarted time.Time
cancel context.CancelFunc
}

func NewObserver(ctx context.Context, p Parameters) *Observer {
w := &Observer{
ctx: ctx,
params: p,
waitStarted: time.Now(),
}

if p.Timeout != time.Duration(0) {
w.ctx, w.cancel = context.WithTimeout(ctx, p.Timeout)
}

return w
}

func (w *Observer) Cancel() {
if w.cancel != nil {
w.cancel()
}
}

func (w *Observer) Retry(err error) error {
if w.params.Log != nil && time.Since(w.lastWarning) > w.params.warningInterval() {
waitString := time.Since(w.waitStarted).Truncate(time.Second).String()
w.params.Log(err, waitString)
w.lastWarning = time.Now()
}

select {
case <-w.ctx.Done():
if err != nil {
return fmt.Errorf("timeout while waiting for condition, last error: %s", err)
}
return fmt.Errorf("timeout while waiting for condition")
case <-time.After(w.params.retryInterval()):
}

return nil
}

0 comments on commit 158b7da

Please sign in to comment.