From 57db7a02dd8c0740b5ec3dbd2bad7f54dc07b8ec Mon Sep 17 00:00:00 2001 From: Vladislav Sukhin Date: Thu, 28 Nov 2024 18:12:03 +0300 Subject: [PATCH] fix: waiting for service pod Signed-off-by: Vladislav Sukhin --- cmd/api-server/main.go | 7 ++++++- pkg/agent/testworkflows.go | 29 +++++++++++++++++++++++------ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/cmd/api-server/main.go b/cmd/api-server/main.go index 8a349b80f5..a21e36a8ed 100644 --- a/cmd/api-server/main.go +++ b/cmd/api-server/main.go @@ -328,8 +328,13 @@ func main() { getTestWorkflowServiceNotificationsStream := func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) { execution, err := testWorkflowResultsRepository.Get(ctx, executionID) if err != nil { - return nil, err + return nil, errors.Join(err, agent.ErrGetTestWorkflowExecution) + } + + if execution.Result != nil && execution.Result.IsFinished() { + return nil, agent.ErrFinishedTestWorkflowExecution } + notifications := executionWorker.Notifications(ctx, fmt.Sprintf("%s-%s-%d", execution.Id, serviceName, serviceIndex), executionworkertypes.NotificationsOptions{ Hints: executionworkertypes.Hints{ Namespace: execution.Namespace, diff --git a/pkg/agent/testworkflows.go b/pkg/agent/testworkflows.go index 26d16f7947..697b801722 100644 --- a/pkg/agent/testworkflows.go +++ b/pkg/agent/testworkflows.go @@ -20,6 +20,11 @@ import ( const testWorkflowNotificationsRetryCount = 10 +var ( + ErrGetTestWorkflowExecution = errors.New("can't get test workflow execution") + ErrFinishedTestWorkflowExecution = errors.New("test workflow execution is finished") +) + func getTestWorkflowNotificationType(n testkube.TestWorkflowExecutionNotification) cloud.TestWorkflowNotificationType { if n.Result != nil { return cloud.TestWorkflowNotificationType_WORKFLOW_STREAM_RESULT @@ -236,17 +241,29 @@ func (ag *Agent) executeWorkflowNotificationsRequest(ctx context.Context, req *c } func (ag *Agent) executeWorkflowServiceNotificationsRequest(ctx context.Context, req *cloud.TestWorkflowServiceNotificationsRequest) error { - notificationsCh, err := ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex)) - for i := 0; i < testWorkflowNotificationsRetryCount; i++ { + var ( + notificationsCh <-chan testkube.TestWorkflowExecutionNotification + err error + ) + + for { + notificationsCh, err = ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex)) + if errors.Is(err, ErrGetTestWorkflowExecution) || errors.Is(ErrFinishedTestWorkflowExecution) { + break + } + if err != nil { // We have a race condition here - // Cloud sometimes slow to insert execution or test + // Cloud sometimes slow to start service // while WorkflowNotifications request from websockets comes in faster - // so we retry up to testWorkflowNotificationsRetryCount times. - time.Sleep(time.Second) - notificationsCh, err = ag.testWorkflowServiceNotificationsFunc(ctx, req.ExecutionId, req.ServiceName, int(req.ServiceIndex)) + // so we retry up to wait till service pod is uo or execution is finished. + time.Sleep(100 * time.Millisecond) + continue } + + break } + if err != nil { message := fmt.Sprintf("cannot get service pod logs: %s", err.Error()) ag.testWorkflowServiceNotificationsResponseBuffer <- &cloud.TestWorkflowServiceNotificationsResponse{