Skip to content

Commit

Permalink
feat: api methods for parallel steps
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <[email protected]>
  • Loading branch information
vsukhin committed Dec 2, 2024
1 parent 81b0295 commit c71fb91
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 2 deletions.
6 changes: 4 additions & 2 deletions internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) {
testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler())
testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler())
testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/services/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/parallel-steps/:parallelStepName/:parallelStepIndex<int>", s.StreamTestWorkflowExecutionParallelStepNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/services/:serviceName/:serviceIndex<int>", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream/parallel-steps/:parallelStepName/:parallelStepIndex<int>", s.StreamTestWorkflowExecutionParallelStepNotificationsWebSocketHandler())
testWorkflowExecutions.Post("/:executionID/abort", s.AbortTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/pause", s.PauseTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/resume", s.ResumeTestWorkflowExecutionHandler())
Expand Down
81 changes: 81 additions & 0 deletions internal/app/api/v1/testworkflowexecutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,43 @@ func (s *TestkubeAPI) StreamTestWorkflowExecutionServiceNotificationsHandler() f
}
}

func (s *TestkubeAPI) StreamTestWorkflowExecutionParallelStepNotificationsHandler() fiber.Handler {
return func(c *fiber.Ctx) error {
ctx := c.Context()
executionID := c.Params("executionID")
parallelStepName := c.Params("parallelStepName")
parallelStepIndex := c.Params("parallelStepIndex")
errPrefix := fmt.Sprintf("failed to stream test workflow execution parallel step '%s' instance '%s' notifications '%s'",
parallelStepName, parallelStepIndex, executionID)

// Fetch execution from database
execution, err := s.TestWorkflowResults.Get(ctx, executionID)
if err != nil {
return s.ClientError(c, errPrefix, err)
}

ref := execution.GetParallelStepReference(parallelStepName)
if ref == "" {
return s.ClientError(c, errPrefix, errors.New("unknown parallel step for test workflow execution"))
}

// Check for the logs
id := fmt.Sprintf("%s-%s-%s", execution.Id, ref, parallelStepIndex)
notifications := s.ExecutionWorkerClient.Notifications(ctx, id, executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
return s.BadRequest(c, errPrefix, "fetching notifications", notifications.Err())
}

s.streamNotifications(ctx, id, notifications)
return nil
}
}

func (s *TestkubeAPI) StreamTestWorkflowExecutionNotificationsWebSocketHandler() fiber.Handler {
return websocket.New(func(c *websocket.Conn) {
ctx, ctxCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -214,6 +251,50 @@ func (s *TestkubeAPI) StreamTestWorkflowExecutionServiceNotificationsWebSocketHa
})
}

func (s *TestkubeAPI) StreamTestWorkflowExecutionParallelStepNotificationsWebSocketHandler() fiber.Handler {
return websocket.New(func(c *websocket.Conn) {
ctx, ctxCancel := context.WithCancel(context.Background())
executionID := c.Params("executionID")
parallelStepName := c.Params("parallelStepName")
parallelStepIndex := c.Params("parallelStepIndex")

// Stop reading when the WebSocket connection is already closed
originalClose := c.CloseHandler()
c.SetCloseHandler(func(code int, text string) error {
ctxCancel()
return originalClose(code, text)
})
defer c.Conn.Close()

// Fetch execution from database
execution, err := s.TestWorkflowResults.Get(ctx, executionID)
if err != nil {
return
}

ref := execution.GetParallelStepReference(parallelStepName)
if ref == "" {
return
}

// Check for the logs
id := fmt.Sprintf("%s-%s-%s", execution.Id, ref, parallelStepIndex)
notifications := s.ExecutionWorkerClient.Notifications(ctx, id, executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
return
}

for n := range notifications.Channel() {
_ = c.WriteJSON(n)
}
})
}

func (s *TestkubeAPI) ListTestWorkflowExecutionsHandler() fiber.Handler {
return func(c *fiber.Ctx) error {
errPrefix := "failed to list test workflow executions"
Expand Down

0 comments on commit c71fb91

Please sign in to comment.