Skip to content

Commit

Permalink
Implement workflow count, temporarily remove workflow trace
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Mar 26, 2024
1 parent 1e26acb commit 3fabd7b
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 34 deletions.
30 changes: 7 additions & 23 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -1624,7 +1624,6 @@ func NewTemporalWorkflowCommand(cctx *CommandContext, parent *TemporalCommand) *
s.Command.AddCommand(&NewTemporalWorkflowStackCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowStartCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowTerminateCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowTraceCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalWorkflowUpdateCommand(cctx, &s).Command)
s.ClientOptions.buildFlags(cctx, s.Command.PersistentFlags())
return &s
Expand Down Expand Up @@ -1660,6 +1659,7 @@ func NewTemporalWorkflowCancelCommand(cctx *CommandContext, parent *TemporalWork
type TemporalWorkflowCountCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Query string
}

func NewTemporalWorkflowCountCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowCountCommand {
Expand All @@ -1668,8 +1668,13 @@ func NewTemporalWorkflowCountCommand(cctx *CommandContext, parent *TemporalWorkf
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "count [flags]"
s.Command.Short = "Count Workflow Executions."
s.Command.Long = "TODO"
if hasHighlighting {
s.Command.Long = "The \x1b[1mtemporal workflow count\x1b[0m command returns a count of Workflow Executions.\n\nUse the options listed below to change the command's behavior."
} else {
s.Command.Long = "The `temporal workflow count` command returns a count of Workflow Executions.\n\nUse the options listed below to change the command's behavior."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVarP(&s.Query, "query", "q", "", "Filter results using a SQL-like query.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
Expand Down Expand Up @@ -2147,27 +2152,6 @@ func NewTemporalWorkflowTerminateCommand(cctx *CommandContext, parent *TemporalW
return &s
}

type TemporalWorkflowTraceCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
}

func NewTemporalWorkflowTraceCommand(cctx *CommandContext, parent *TemporalWorkflowCommand) *TemporalWorkflowTraceCommand {
var s TemporalWorkflowTraceCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "trace [flags]"
s.Command.Short = "Trace progress of a Workflow Execution and its children."
s.Command.Long = "TODO"
s.Command.Args = cobra.NoArgs
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalWorkflowUpdateCommand struct {
Parent *TemporalWorkflowCommand
Command cobra.Command
Expand Down
4 changes: 0 additions & 4 deletions temporalcli/commands.workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,6 @@ func (c *TemporalWorkflowTerminateCommand) run(cctx *CommandContext, _ []string)
return nil
}

func (*TemporalWorkflowTraceCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
}

func (c *TemporalWorkflowUpdateCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
Expand Down
48 changes: 46 additions & 2 deletions temporalcli/commands.workflow_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
)

func (c *TemporalWorkflowDescribeCommand) run(cctx *CommandContext, args []string) error {
Expand Down Expand Up @@ -265,8 +266,51 @@ func (c *TemporalWorkflowListCommand) pageFetcher(
}
}

func (*TemporalWorkflowCountCommand) run(*CommandContext, []string) error {
return fmt.Errorf("TODO")
func (c *TemporalWorkflowCountCommand) run(cctx *CommandContext, _ []string) error {
cl, err := c.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

resp, err := cl.WorkflowService().CountWorkflowExecutions(cctx, &workflowservice.CountWorkflowExecutionsRequest{
Namespace: c.Parent.Namespace,
Query: c.Query,
})
if err != nil {
return err
}

// Just dump response on JSON, otherwise print total and groups
if cctx.JSONOutput {
// Shorthand does not apply to search attributes currently, so we're going
// to remove the "type" from the metadata encoding on group values to make
// it apply
for _, group := range resp.Groups {
for _, payload := range group.GroupValues {
delete(payload.GetMetadata(), "type")
}
}
return cctx.Printer.PrintStructured(resp, printer.StructuredOptions{})
}

cctx.Printer.Printlnf("Total: %v", resp.Count)
for _, group := range resp.Groups {
// Payload values are search attributes, so we can use the default converter
var valueStr string
for _, payload := range group.GroupValues {
var value any
if err := converter.GetDefaultDataConverter().FromPayload(payload, &value); err != nil {
value = fmt.Sprintf("<failed converting: %v>", err)
}
if valueStr != "" {
valueStr += ", "
}
valueStr += fmt.Sprintf("%v", value)
}
cctx.Printer.Printlnf("Group total: %v, values: %v", group.Count, valueStr)
}
return nil
}

func (c *TemporalWorkflowShowCommand) run(cctx *CommandContext, args []string) error {
Expand Down
89 changes: 89 additions & 0 deletions temporalcli/commands.workflow_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"

"github.com/temporalio/cli/temporalcli"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -328,3 +330,90 @@ func (s *SharedServerSuite) TestWorkflow_List() {
s.ContainsOnSameLine(out, "name", "DevWorkflow")
s.ContainsOnSameLine(out, "status", "WORKFLOW_EXECUTION_STATUS_COMPLETED")
}

func (s *SharedServerSuite) TestWorkflow_Count() {
s.Worker.OnDevWorkflow(func(ctx workflow.Context, shouldComplete any) (any, error) {
// Only complete if shouldComplete is a true bool
shouldCompleteBool, _ := shouldComplete.(bool)
return nil, workflow.Await(ctx, func() bool { return shouldCompleteBool })
})

// Create 3 that complete and 2 that don't
for i := 0; i < 5; i++ {
_, err := s.Client.ExecuteWorkflow(
s.Context,
client.StartWorkflowOptions{TaskQueue: s.Worker.Options.TaskQueue},
DevWorkflow,
i < 3,
)
s.NoError(err)
}

// List and confirm they are all there in expected statuses
s.Eventually(
func() bool {
resp, err := s.Client.ListWorkflow(s.Context, &workflowservice.ListWorkflowExecutionsRequest{
Query: "TaskQueue = '" + s.Worker.Options.TaskQueue + "'",
})
s.NoError(err)
var completed, running int
for _, exec := range resp.Executions {
if exec.Status == enums.WORKFLOW_EXECUTION_STATUS_COMPLETED {
completed++
} else if exec.Status == enums.WORKFLOW_EXECUTION_STATUS_RUNNING {
running++
}
}
return completed == 3 && running == 2
},
10*time.Second,
100*time.Millisecond,
)

// Simple count w/out grouping
res := s.Execute(
"workflow", "count",
"--address", s.Address(),
"--query", "TaskQueue = '"+s.Worker.Options.TaskQueue+"'",
)
s.NoError(res.Err)
out := res.Stdout.String()
s.Equal("Total: 5", strings.TrimSpace(out))

// Grouped
res = s.Execute(
"workflow", "count",
"--address", s.Address(),
"--query", "TaskQueue = '"+s.Worker.Options.TaskQueue+"' GROUP BY ExecutionStatus",
)
s.NoError(res.Err)
out = res.Stdout.String()
s.Contains(out, "Total: 5")
s.Contains(out, "Group total: 2, values: Running")
s.Contains(out, "Group total: 3, values: Completed")

// Simple count w/out grouping JSON
res = s.Execute(
"workflow", "count",
"--address", s.Address(),
"--query", "TaskQueue = '"+s.Worker.Options.TaskQueue+"'",
"-o", "json",
)
s.NoError(res.Err)
out = res.Stdout.String()
// Proto JSON makes this count a string
s.Contains(out, `"count": "5"`)

// Grouped JSON
res = s.Execute(
"workflow", "count",
"--address", s.Address(),
"--query", "TaskQueue = '"+s.Worker.Options.TaskQueue+"' GROUP BY ExecutionStatus",
"-o", "jsonl",
)
s.NoError(res.Err)
out = res.Stdout.String()
s.Contains(out, `"count":"5"`)
s.Contains(out, `{"groupValues":["Running"],"count":"2"}`)
s.Contains(out, `{"groupValues":["Completed"],"count":"3"}`)
}
12 changes: 7 additions & 5 deletions temporalcli/commandsmd/commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,13 @@ Includes options set for [single workflow or batch](#options-set-single-workflow

### temporal workflow count: Count Workflow Executions.

TODO
The `temporal workflow count` command returns a count of [Workflow Executions](/concepts/what-is-a-workflow-execution).

Use the options listed below to change the command's behavior.

#### Options

* `--query`, `-q` (string) - Filter results using a SQL-like query.

### temporal workflow delete: Deletes a Workflow Execution.

Expand Down Expand Up @@ -1018,10 +1024,6 @@ Use the options listed below to change the behavior of this command.
* `--reason` (string) - Reason for termination. Defaults to message with the current user's name.
* `--yes`, `-y` (bool) - Confirm prompt to perform batch. Only allowed if query is present.

### temporal workflow trace: Trace progress of a Workflow Execution and its children.

TODO

### temporal workflow update: Updates a running workflow synchronously.

The `temporal workflow update` command is used to synchronously [Update](/concepts/what-is-an-update) a
Expand Down

0 comments on commit 3fabd7b

Please sign in to comment.