diff --git a/cmd/kubectl-testkube/commands/testworkflows/run.go b/cmd/kubectl-testkube/commands/testworkflows/run.go index 5c96c690d3..573553c4a1 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/run.go +++ b/cmd/kubectl-testkube/commands/testworkflows/run.go @@ -47,6 +47,8 @@ func NewRunTestWorkflowCmd() *cobra.Command { masks []string tags map[string]string selectors []string + serviceName string + serviceIndex int ) cmd := &cobra.Command{ @@ -146,7 +148,7 @@ func NewRunTestWorkflowCmd() *cobra.Command { ui.NL() if !execution.FailedToInitialize() { if watchEnabled && len(args) > 0 { - exitCode = uiWatch(execution, client) + exitCode = uiWatch(execution, serviceName, serviceIndex, client) ui.NL() if downloadArtifactsEnabled { tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty) @@ -181,12 +183,21 @@ func NewRunTestWorkflowCmd() *cobra.Command { cmd.Flags().StringArrayVarP(&masks, "mask", "", []string{}, "regexp to filter downloaded files, single or comma separated, like report/.* or .*\\.json,.*\\.js$") cmd.Flags().StringToStringVarP(&tags, "tag", "", map[string]string{}, "execution tags in a form of name1=val1 passed to executor") cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression") + cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name") + cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index") return cmd } -func uiWatch(execution testkube.TestWorkflowExecution, client apiclientv1.Client) int { - result, err := watchTestWorkflowLogs(execution.Id, execution.Signature, client) +func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, serviceIndex int, client apiclientv1.Client) int { + var result *testkube.TestWorkflowResult + var err error + + if serviceName == "" { + result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client) + } else { + result, err = watchTestWorkflowServiceLogs(execution.Id, serviceName, serviceIndex, execution.Signature, client) + } ui.ExitOnError("reading test workflow execution logs", err) // Apply the result in the execution @@ -313,6 +324,36 @@ func watchTestWorkflowLogs(id string, signature []testkube.TestWorkflowSignature return result, err } +func watchTestWorkflowServiceLogs(id, serviceName string, serviceIndex int, signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) { + ui.Info("Getting logs from test workflow service pod", fmt.Sprintf("%s-%s-%d", id, serviceName, serviceIndex)) + + notifications, err := client.GetTestWorkflowExecutionServiceNotifications(id, serviceName, serviceIndex) + ui.ExitOnError("getting logs from executor", err) + + steps := flattenSignatures(signature) + + var result *testkube.TestWorkflowResult + var isLineBeginning = true + for l := range notifications { + if l.Output != nil { + continue + } + if l.Result != nil { + if printResultDifference(result, l.Result, steps) { + isLineBeginning = true + } + result = l.Result + continue + } + + printStructuredLogLines(l.Log, &isLineBeginning) + } + + ui.NL() + + return result, err +} + func printStatusHeader(i, n int, name string) { if i == -1 { fmt.Println("\n" + ui.LightCyan(fmt.Sprintf("• %s", name))) diff --git a/cmd/kubectl-testkube/commands/testworkflows/watch.go b/cmd/kubectl-testkube/commands/testworkflows/watch.go index 2387810b6e..3992ec3337 100644 --- a/cmd/kubectl-testkube/commands/testworkflows/watch.go +++ b/cmd/kubectl-testkube/commands/testworkflows/watch.go @@ -13,6 +13,11 @@ import ( ) func NewWatchTestWorkflowExecutionCmd() *cobra.Command { + var ( + serviceName string + serviceIndex int + ) + cmd := &cobra.Command{ Use: "testworkflowexecution ", Aliases: []string{"testworkflowexecutions", "twe", "tw"}, @@ -31,7 +36,7 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command { ui.ExitOnError("render test workflow execution", err) ui.NL() - exitCode := uiWatch(execution, client) + exitCode := uiWatch(execution, serviceName, serviceIndex, client) ui.NL() execution, err = client.GetTestWorkflowExecution(execution.Id) @@ -43,5 +48,8 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command { }, } + cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name") + cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index") + return cmd } diff --git a/internal/app/api/v1/server.go b/internal/app/api/v1/server.go index 7b6890c605..d7aa19993b 100644 --- a/internal/app/api/v1/server.go +++ b/internal/app/api/v1/server.go @@ -150,9 +150,9 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) { testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler()) testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler()) testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler()) - testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsHandler()) + testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsHandler()) testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler()) - testWorkflowExecutions.Get("/:executionID/notifications/stream:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler()) + testWorkflowExecutions.Get("/:executionID/notifications/stream:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler()) testWorkflowExecutions.Post("/:executionID/abort", s.AbortTestWorkflowExecutionHandler()) testWorkflowExecutions.Post("/:executionID/pause", s.PauseTestWorkflowExecutionHandler()) testWorkflowExecutions.Post("/:executionID/resume", s.ResumeTestWorkflowExecutionHandler()) diff --git a/pkg/api/v1/client/interface.go b/pkg/api/v1/client/interface.go index 82f1357377..73a3f6c48a 100644 --- a/pkg/api/v1/client/interface.go +++ b/pkg/api/v1/client/interface.go @@ -154,6 +154,7 @@ type TestWorkflowAPI interface { ExecuteTestWorkflows(selector string, request testkube.TestWorkflowExecutionRequest) ([]testkube.TestWorkflowExecution, error) GetTestWorkflowExecutionNotifications(id string) (chan testkube.TestWorkflowExecutionNotification, error) GetTestWorkflowExecutionLogs(id string) ([]byte, error) + GetTestWorkflowExecutionServiceNotifications(id, serviceName string, serviceIndex int) (chan testkube.TestWorkflowExecutionNotification, error) } // TestWorkflowExecutionAPI describes test workflow api methods diff --git a/pkg/api/v1/client/testworkflow.go b/pkg/api/v1/client/testworkflow.go index d878f93565..8bffb81d28 100644 --- a/pkg/api/v1/client/testworkflow.go +++ b/pkg/api/v1/client/testworkflow.go @@ -146,6 +146,14 @@ func (c TestWorkflowClient) GetTestWorkflowExecutionNotifications(id string) (no return notifications, err } +// GetTestWorkflowExecutionServiceNotifications returns events stream from job pods, based on job pods logs +func (c TestWorkflowClient) GetTestWorkflowExecutionServiceNotifications(id, serviceName string, serviceIndex int) (notifications chan testkube.TestWorkflowExecutionNotification, err error) { + notifications = make(chan testkube.TestWorkflowExecutionNotification) + uri := c.testWorkflowTransport.GetURI("/test-workflow-executions/%s/notifications/%s/%d", id, serviceName, serviceIndex) + err = c.testWorkflowTransport.GetTestWorkflowExecutionNotifications(uri, notifications) + return notifications, err +} + // GetTestWorkflowExecution returns single test workflow execution by id func (c TestWorkflowClient) GetTestWorkflowExecution(id string) (testkube.TestWorkflowExecution, error) { uri := c.testWorkflowExecutionTransport.GetURI("/test-workflow-executions/%s", id)