Skip to content

Commit

Permalink
feat: add cli support for parallel steps
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <[email protected]>
  • Loading branch information
vsukhin committed Dec 2, 2024
1 parent 3e2c45f commit 2819e43
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 14 deletions.
73 changes: 64 additions & 9 deletions cmd/kubectl-testkube/commands/testworkflows/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
)

const (
LogTimestampLength = 30 // time.RFC3339Nano without 00:00 timezone
apiErrorMessage = "processing error:"
serviceLogsCheckDelay = 100 * time.Millisecond
LogTimestampLength = 30 // time.RFC3339Nano without 00:00 timezone
apiErrorMessage = "processing error:"
logsCheckDelay = 100 * time.Millisecond
)

var (
Expand All @@ -49,7 +49,9 @@ func NewRunTestWorkflowCmd() *cobra.Command {
tags map[string]string
selectors []string
serviceName string
parallelStepName string
serviceIndex int
parallelStepIndex int
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -149,7 +151,7 @@ func NewRunTestWorkflowCmd() *cobra.Command {
ui.NL()
if !execution.FailedToInitialize() {
if watchEnabled && len(args) > 0 {
exitCode = uiWatch(execution, serviceName, serviceIndex, client)
exitCode = uiWatch(execution, serviceName, serviceIndex, parallelStepName, parallelStepIndex, client)
ui.NL()
if downloadArtifactsEnabled {
tests.DownloadTestWorkflowArtifacts(execution.Id, downloadDir, format, masks, client, outputPretty)
Expand Down Expand Up @@ -186,17 +188,19 @@ func NewRunTestWorkflowCmd() *cobra.Command {
cmd.Flags().StringSliceVarP(&selectors, "label", "l", nil, "label key value pair: --label key1=value1 or label expression")
cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index starting from 0")
cmd.Flags().StringVar(&parallelStepName, "parallel-step-name", "", "test workflow parallel step name or reference")
cmd.Flags().IntVar(&parallelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0")

return cmd
}

func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, serviceIndex int, client apiclientv1.Client) int {
func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, serviceIndex int,
parallelStepName string, parallelStepIndex int, client apiclientv1.Client) int {
var result *testkube.TestWorkflowResult
var err error

if serviceName == "" {
result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client)
} else {
switch {
case serviceName != "":
found := false
if execution.Workflow != nil {
found = execution.Workflow.HasService(serviceName)
Expand All @@ -207,6 +211,16 @@ func uiWatch(execution testkube.TestWorkflowExecution, serviceName string, servi
}

result, err = watchTestWorkflowServiceLogs(execution.Id, serviceName, serviceIndex, execution.Signature, client)
case parallelStepName != "":
ref := execution.GetParallelStepReference(parallelStepName)
if ref == "" {
ui.Failf("unknown parallel step '%s' for test workflow execution %s", parallelStepName, execution.Id)
}

result, err = watchTestWorkflowParallelStepLogs(execution.Id, parallelStepName, parallelStepIndex, execution.Signature, client)
default:
result, err = watchTestWorkflowLogs(execution.Id, execution.Signature, client)

}
ui.ExitOnError("reading test workflow execution logs", err)

Expand Down Expand Up @@ -362,7 +376,48 @@ func watchTestWorkflowServiceLogs(id, serviceName string, serviceIndex int,
if execution.Result.IsFinished() {
nErr = errors.New("test workflow execution is finished")
} else {
time.Sleep(serviceLogsCheckDelay)
time.Sleep(logsCheckDelay)
continue
}
}
}

if nErr != nil {
spinner.Fail()
return nil, nErr
}

break
}

spinner.Success()
return printTestWorkflowLogs(signature, notifications), nil
}

func watchTestWorkflowParallelStepLogs(id, parallelStepName string, parallelStepIndex int,
signature []testkube.TestWorkflowSignature, client apiclientv1.Client) (*testkube.TestWorkflowResult, error) {
ui.Info("Getting logs from test workflow parallel step job", fmt.Sprintf("%s-%s-%d", id, parallelStepName, parallelStepIndex))

var (
notifications chan testkube.TestWorkflowExecutionNotification
nErr error
)

spinner := ui.NewSpinner("Waiting for parallel step logs")
for {
notifications, nErr = client.GetTestWorkflowExecutionParallelStepNotifications(id, parallelStepName, parallelStepIndex)
if nErr != nil {
execution, cErr := client.GetTestWorkflowExecution(id)
if cErr != nil {
spinner.Fail()
return nil, cErr
}

if execution.Result != nil {
if execution.Result.IsFinished() {
nErr = errors.New("test workflow execution is finished")
} else {
time.Sleep(logsCheckDelay)
continue
}
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/kubectl-testkube/commands/testworkflows/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (

func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
var (
serviceName string
serviceIndex int
serviceName string
parallelStepName string
serviceIndex int
parallelStepIndex int
)

cmd := &cobra.Command{
Expand All @@ -36,7 +38,7 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {
ui.ExitOnError("render test workflow execution", err)

ui.NL()
exitCode := uiWatch(execution, serviceName, serviceIndex, client)
exitCode := uiWatch(execution, serviceName, serviceIndex, parallelStepName, parallelStepIndex, client)
ui.NL()

execution, err = client.GetTestWorkflowExecution(execution.Id)
Expand All @@ -50,6 +52,8 @@ func NewWatchTestWorkflowExecutionCmd() *cobra.Command {

cmd.Flags().StringVar(&serviceName, "service-name", "", "test workflow service name")
cmd.Flags().IntVar(&serviceIndex, "service-index", 0, "test workflow service index starting from 0")
cmd.Flags().StringVar(&parallelStepName, "parallel-step-name", "", "test workflow parallel step name or reference")
cmd.Flags().IntVar(&parallelStepIndex, "parallel-step-index", 0, "test workflow parallel step index starting from 0")

return cmd
}
3 changes: 1 addition & 2 deletions pkg/agent/testworkflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import (
const testWorkflowNotificationsRetryCount = 10

var (
logRetryDelay = 100 * time.Millisecond
serviceWaitTimeout = 24 * time.Hour
logRetryDelay = 100 * time.Millisecond
)

func getTestWorkflowNotificationType(n testkube.TestWorkflowExecutionNotification) cloud.TestWorkflowNotificationType {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/v1/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type TestWorkflowAPI interface {
GetTestWorkflowExecutionNotifications(id string) (chan testkube.TestWorkflowExecutionNotification, error)
GetTestWorkflowExecutionLogs(id string) ([]byte, error)
GetTestWorkflowExecutionServiceNotifications(id, serviceName string, serviceIndex int) (chan testkube.TestWorkflowExecutionNotification, error)
GetTestWorkflowExecutionParallelStepNotifications(id, parallelStepName string, parallelStepIndex int) (chan testkube.TestWorkflowExecutionNotification, error)
}

// TestWorkflowExecutionAPI describes test workflow api methods
Expand Down
8 changes: 8 additions & 0 deletions pkg/api/v1/client/testworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ func (c TestWorkflowClient) GetTestWorkflowExecutionServiceNotifications(id, ser
return notifications, err
}

// GetTestWorkflowExecutionParallelStepNotifications returns events stream from job pods, based on job pods logs
func (c TestWorkflowClient) GetTestWorkflowExecutionParallelStepNotifications(id, parallelStepName string, parallelStepIndex int) (notifications chan testkube.TestWorkflowExecutionNotification, err error) {
notifications = make(chan testkube.TestWorkflowExecutionNotification)
uri := c.testWorkflowTransport.GetURI("/test-workflow-executions/%s/notifications/%s/%d", id, parallelStepName, parallelStepIndex)
err = c.testWorkflowTransport.GetTestWorkflowExecutionNotifications(uri, notifications)
return notifications, err
}

// GetTestWorkflowExecution returns single test workflow execution by id
func (c TestWorkflowClient) GetTestWorkflowExecution(id string) (testkube.TestWorkflowExecution, error) {
uri := c.testWorkflowExecutionTransport.GetURI("/test-workflow-executions/%s", id)
Expand Down
15 changes: 15 additions & 0 deletions pkg/api/v1/testkube/model_test_workflow_execution_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,18 @@ func (e *TestWorkflowExecution) InitializationError(header string, err error) {
func (e *TestWorkflowExecution) FailedToInitialize() bool {
return e.Result.Status != nil && *e.Result.Status == ABORTED_TestWorkflowStatus && e.Result.QueuedAt.IsZero()
}

func (e *TestWorkflowExecution) GetParallelStepReference(nameOrReference string) string {
if e == nil {
return ""
}

for _, signature := range e.Signature {
ref := signature.GetParallelStepReference(nameOrReference)
if ref != "" {
return ref
}
}

return ""
}
21 changes: 21 additions & 0 deletions pkg/api/v1/testkube/model_test_workflow_signature_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,24 @@ func (s *TestWorkflowSignature) Sequence() []TestWorkflowSignature {
}
return result
}

func (s *TestWorkflowSignature) GetParallelStepReference(nameOrReference string) string {
if s.Category == "Run in parallel" {
if nameOrReference == "" {
return s.Ref
}

if s.Name == nameOrReference || s.Ref == nameOrReference {
return s.Ref
}
}

for _, child := range s.Children {
ref := child.GetParallelStepReference(nameOrReference)
if ref != "" {
return ref
}
}

return ""
}

0 comments on commit 2819e43

Please sign in to comment.