diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index dcbe35b261..5028778074 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -27,9 +27,9 @@ import ( ) const ( - LogTimestampLength = 30 // time.RFC3339Nano without 00:00 timezone - apiErrorMessage = "processing error:" - serviceLogsCheckDelay = 100 * time.Millisecond + LogTimestampLength = 30 // time.RFC3339Nano without 00:00 timezone + apiErrorMessage = "processing error:" + logsCheckDelay = 100 * time.Millisecond ) var ( @@ -49,7 +49,9 @@ func NewRunTestWorkflowCmd() *cobra.Command { tags map[string]string selectors []string serviceName string + parallelStepName string serviceIndex int + parallelStepIndex int ) cmd := &cobra.Command{ @@ -149,7 +151,7 @@ func NewRunTestWorkflowCmd() *cobra.Command { ui.NL() if !execution.FailedToInitialize() { if watchEnabled && len(args) > 0 { - exitCode = uiWatch(execution, serviceName, serviceIndex, client) + exitCode = uiWatch(execution, serviceName, serviceIndex, parallelStepName, parallelStepIndex, client) ui.NL() if downloadArtifactsEnabled { tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty) @@ -186,17 +188,19 @@ func NewRunTestWorkflowCmd() *cobra.Command { cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression") cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name") cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index starting from 0") + cmd.Flags().StringVar(¶llelStepName, "parallel-step-name", "", "test workflow parallel step name or reference") + cmd.Flags().IntVar(¶llelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0") return cmd } -func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, serviceIndex int, client apiclientv1.Client) int { +func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, serviceIndex int, + parallelStepName string, parallelStepIndex int, client apiclientv1.Client) int { var result *testkube.TestWorkflowResult var err error - if serviceName == "" { - result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client) - } else { + switch { + case serviceName != "": found := false if execution.Workflow != nil { found = execution.Workflow.HasService(serviceName) @@ -207,6 +211,16 @@ func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, servi } result, err = watchTestWorkflowServiceLogs(execution.Id, serviceName, serviceIndex, execution.Signature, client) + case parallelStepName != "": + ref := execution.GetParallelStepReference(parallelStepName) + if ref == "" { + ui.Failf("unknown parallel step '%s' for test workflow execution %s", parallelStepName, execution.Id) + } + + result, err = watchTestWorkflowParallelStepLogs(execution.Id, parallelStepName, parallelStepIndex, execution.Signature, client) + default: + result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client) + } ui.ExitOnError("reading test workflow execution logs", err) @@ -362,7 +376,48 @@ func watchTestWorkflowServiceLogs(id, serviceName string, serviceIndex int, if execution.Result.IsFinished() { nErr = errors.New("test workflow execution is finished") } else { - time.Sleep(serviceLogsCheckDelay) + time.Sleep(logsCheckDelay) + continue + } + } + } + + if nErr != nil { + spinner.Fail() + return nil, nErr + } + + break + } + + spinner.Success() + return printTestWorkflowLogs(signature, notifications), nil +} + +func watchTestWorkflowParallelStepLogs(id, parallelStepName string, parallelStepIndex int, + signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) { + ui.Info("Getting logs from test workflow parallel step job", fmt.Sprintf("%s-%s-%d", id, parallelStepName, parallelStepIndex)) + + var ( + notifications chan testkube.TestWorkflowExecutionNotification + nErr error + ) + + spinner := ui.NewSpinner("Waiting for parallel step logs") + for { + notifications, nErr = client.GetTestWorkflowExecutionParallelStepNotifications(id, parallelStepName, parallelStepIndex) + if nErr != nil { + execution, cErr := client.GetTestWorkflowExecution(id) + if cErr != nil { + spinner.Fail() + return nil, cErr + } + + if execution.Result != nil { + if execution.Result.IsFinished() { + nErr = errors.New("test workflow execution is finished") + } else { + time.Sleep(logsCheckDelay) continue } } diff --git a/cmd/kubectl-testkube/commands/testworkflows/watch.go b/cmd/kubectl-testkube/commands/testworkflows/watch.go index 35c6e918ec..790d0f3fea 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/watch.go +++ b/cmd/kubectl-testkube/commands/testworkflows/watch.go @@ -14,8 +14,10 @@ import ( func NewWatchTestWorkflowExecutionCmd() *cobra.Command { var ( - serviceName string - serviceIndex int + serviceName string + parallelStepName string + serviceIndex int + parallelStepIndex int ) cmd := &cobra.Command{ @@ -36,7 +38,7 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command { ui.ExitOnError("render test workflow execution", err) ui.NL() - exitCode := uiWatch(execution, serviceName, serviceIndex, client) + exitCode := uiWatch(execution, serviceName, serviceIndex, parallelStepName, parallelStepIndex, client) ui.NL() execution, err = client.GetTestWorkflowExecution(execution.Id) @@ -50,6 +52,8 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command { cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name") cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index starting from 0") + cmd.Flags().StringVar(¶llelStepName, "parallel-step-name", "", "test workflow parallel step name or reference") + cmd.Flags().IntVar(¶llelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0") return cmd } diff --git a/pkg/agent/testworkflows.go b/pkg/agent/testworkflows.go index c70a28224f..3a1ce2f54b 100644 --- a/pkg/agent/testworkflows.go +++ b/pkg/agent/testworkflows.go @@ -23,8 +23,7 @@ import ( const testWorkflowNotificationsRetryCount = 10 var ( - logRetryDelay = 100 * time.Millisecond - serviceWaitTimeout = 24 * time.Hour + logRetryDelay = 100 * time.Millisecond ) func getTestWorkflowNotificationType(n testkube.TestWorkflowExecutionNotification) cloud.TestWorkflowNotificationType { diff --git a/pkg/api/v1/client/interface.go b/pkg/api/v1/client/interface.go index 73a3f6c48a..e5635a31d9 100644 --- a/pkg/api/v1/client/interface.go +++ b/pkg/api/v1/client/interface.go @@ -155,6 +155,7 @@ type TestWorkflowAPI interface { GetTestWorkflowExecutionNotifications(id string) (chan testkube.TestWorkflowExecutionNotification, error) GetTestWorkflowExecutionLogs(id string) ([]byte, error) GetTestWorkflowExecutionServiceNotifications(id, serviceName string, serviceIndex int) (chan testkube.TestWorkflowExecutionNotification, error) + GetTestWorkflowExecutionParallelStepNotifications(id, parallelStepName string, parallelStepIndex int) (chan testkube.TestWorkflowExecutionNotification, error) } // TestWorkflowExecutionAPI describes test workflow api methods diff --git a/pkg/api/v1/client/testworkflow.go b/pkg/api/v1/client/testworkflow.go index 8bffb81d28..77c4299ed7 100644 --- a/pkg/api/v1/client/testworkflow.go +++ b/pkg/api/v1/client/testworkflow.go @@ -154,6 +154,14 @@ func (c TestWorkflowClient) GetTestWorkflowExecutionServiceNotifications(id, ser return notifications, err } +// GetTestWorkflowExecutionParallelStepNotifications returns events stream from job pods, based on job pods logs +func (c TestWorkflowClient) GetTestWorkflowExecutionParallelStepNotifications(id, parallelStepName string, parallelStepIndex int) (notifications chan testkube.TestWorkflowExecutionNotification, err error) { + notifications = make(chan testkube.TestWorkflowExecutionNotification) + uri := c.testWorkflowTransport.GetURI("/test-workflow-executions/%s/notifications/%s/%d", id, parallelStepName, parallelStepIndex) + err = c.testWorkflowTransport.GetTestWorkflowExecutionNotifications(uri, notifications) + return notifications, err +} + // GetTestWorkflowExecution returns single test workflow execution by id func (c TestWorkflowClient) GetTestWorkflowExecution(id string) (testkube.TestWorkflowExecution, error) { uri := c.testWorkflowExecutionTransport.GetURI("/test-workflow-executions/%s", id) diff --git a/pkg/api/v1/testkube/model_test_workflow_execution_extended.go b/pkg/api/v1/testkube/model_test_workflow_execution_extended.go index dc1c4e0736..4b758c5abd 100644 --- a/pkg/api/v1/testkube/model_test_workflow_execution_extended.go +++ b/pkg/api/v1/testkube/model_test_workflow_execution_extended.go @@ -115,3 +115,18 @@ func (e *TestWorkflowExecution) InitializationError(header string, err error) { func (e *TestWorkflowExecution) FailedToInitialize() bool { return e.Result.Status != nil && *e.Result.Status == ABORTED_TestWorkflowStatus && e.Result.QueuedAt.IsZero() } + +func (e *TestWorkflowExecution) GetParallelStepReference(nameOrReference string) string { + if e == nil { + return "" + } + + for _, signature := range e.Signature { + ref := signature.GetParallelStepReference(nameOrReference) + if ref != "" { + return ref + } + } + + return "" +} diff --git a/pkg/api/v1/testkube/model_test_workflow_signature_extended.go b/pkg/api/v1/testkube/model_test_workflow_signature_extended.go index cd5979e1f6..b9d2e98617 100644 --- a/pkg/api/v1/testkube/model_test_workflow_signature_extended.go +++ b/pkg/api/v1/testkube/model_test_workflow_signature_extended.go @@ -14,3 +14,24 @@ func (s *TestWorkflowSignature) Sequence() []TestWorkflowSignature { } return result } + +func (s *TestWorkflowSignature) GetParallelStepReference(nameOrReference string) string { + if s.Category == "Run in parallel" { + if nameOrReference == "" { + return s.Ref + } + + if s.Name == nameOrReference || s.Ref == nameOrReference { + return s.Ref + } + } + + for _, child := range s.Children { + ref := child.GetParallelStepReference(nameOrReference) + if ref != "" { + return ref + } + } + + return "" +}