Skip to content

Commit

Permalink
fix: use retry library
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <[email protected]>
  • Loading branch information
vsukhin committed Nov 29, 2024
1 parent b46bb41 commit 59bdce3
Showing 1 changed file with 20 additions and 14 deletions.
34 changes: 20 additions & 14 deletions pkg/agent/testworkflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"time"

"github.com/avast/retry-go/v4"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand All @@ -21,6 +22,11 @@ import (

const testWorkflowNotificationsRetryCount = 10

var (
retryDelay = 100 * time.Millisecond
waitTimeout = 24 * time.Hour
)

func getTestWorkflowNotificationType(n testkube.TestWorkflowExecutionNotification) cloud.TestWorkflowNotificationType {
if n.Result != nil {
return cloud.TestWorkflowNotificationType_WORKFLOW_STREAM_RESULT
Expand Down Expand Up @@ -184,7 +190,7 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c
// Cloud sometimes slow to insert execution or test
// while WorkflowNotifications request from websockets comes in faster
// so we retry up to testWorkflowNotificationsRetryCount times.
time.Sleep(100 * time.Millisecond)
time.Sleep(retryDelay)
notificationsCh, err = ag.testWorkflowNotificationsFunc(ctx, req.ExecutionId)
}
}
Expand Down Expand Up @@ -237,24 +243,24 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c
}

func (ag *Agent) executeWorkflowServiceNotificationsRequest(ctx context.Context, req *cloud.TestWorkflowServiceNotificationsRequest) error {
var (
notificationsCh <-chan testkube.TestWorkflowExecutionNotification
err error
)
timeoutCtx, cancel := context.WithTimeout(ctx, waitTimeout)
defer cancel()

for {
notificationsCh, err = ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex))
if errors.Is(err, registry.ErrResourceNotFound) {
notificationsCh, err := retry.DoWithData(
func() (<-chan testkube.TestWorkflowExecutionNotification, error) {
// We have a race condition here
// Cloud sometimes slow to start service
// while WorkflowNotifications request from websockets comes in faster
// so we retry up to wait till service pod is uo or execution is finished.
time.Sleep(100 * time.Millisecond)
continue
}

break
}
return ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex))
},
retry.DelayType(retry.FixedDelay),
retry.Delay(retryDelay),
retry.Context(timeoutCtx),
retry.RetryIf(func(err error) bool {
return errors.Is(err, registry.ErrResourceNotFound)
}),
)

if err != nil {
message := fmt.Sprintf("cannot get service pod logs: %s", err.Error())
Expand Down

0 comments on commit 59bdce3

Please sign in to comment.