From 9e97598f956c3f87075c0a75cfdae9b14998d91e Mon Sep 17 00:00:00 2001 From: David Reiss Date: Fri, 8 Dec 2023 20:47:29 -0800 Subject: [PATCH 1/3] Enhance task-queue describe --- common/defs-flags.go | 1 + common/flags.go | 1 + taskqueue/task_queue.go | 6 +++ taskqueue/task_queue_commands.go | 84 +++++++++++++++++++++++++++----- 4 files changed, 81 insertions(+), 11 deletions(-) diff --git a/common/defs-flags.go b/common/defs-flags.go index 92334a81..5eeddef7 100644 --- a/common/defs-flags.go +++ b/common/defs-flags.go @@ -135,6 +135,7 @@ const ( // Task Queue flags FlagTaskQueueName = "Name of the Task Queue." FlagTaskQueueTypeDefinition = "Task Queue type [workflow|activity]" + FlagPartitionsDefinition = "Query for all partitions up to this number" // Namespace update flags FlagActiveClusterDefinition = "Active cluster name." diff --git a/common/flags.go b/common/flags.go index 44ed1393..1c839757 100644 --- a/common/flags.go +++ b/common/flags.go @@ -79,6 +79,7 @@ var ( FlagOverlapPolicy = "overlap-policy" FlagOwnerEmail = "email" FlagParallelism = "input-parallelism" + FlagPartitions = "partitions" FlagPause = "pause" FlagPauseOnFailure = "pause-on-failure" FlagPort = "port" diff --git a/taskqueue/task_queue.go b/taskqueue/task_queue.go index 8c5d295a..d067cb4e 100644 --- a/taskqueue/task_queue.go +++ b/taskqueue/task_queue.go @@ -26,6 +26,12 @@ func NewTaskQueueCommands() []*cli.Command { Usage: common.FlagTaskQueueTypeDefinition, Category: common.CategoryMain, }, + &cli.IntFlag{ + Name: common.FlagPartitions, + Value: 1, + Usage: common.FlagPartitionsDefinition, + Category: common.CategoryMain, + }, }, common.FlagsForFormatting...), Action: func(c *cli.Context) error { return DescribeTaskQueue(c) diff --git a/taskqueue/task_queue_commands.go b/taskqueue/task_queue_commands.go index f2bd4241..fdf955ce 100644 --- a/taskqueue/task_queue_commands.go +++ b/taskqueue/task_queue_commands.go @@ -1,6 +1,7 @@ package taskqueue import ( + "encoding/json" "fmt" "strings" @@ -9,37 +10,98 @@ import ( "github.com/temporalio/tctl-kit/pkg/color" "github.com/temporalio/tctl-kit/pkg/output" "github.com/urfave/cli/v2" + commonpb "go.temporal.io/api/common/v1" enumspb "go.temporal.io/api/enums/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/server/common/tqname" ) // DescribeTaskQueue show pollers info of a given taskqueue func DescribeTaskQueue(c *cli.Context) error { - sdkClient, err := client.GetSDKClient(c) + taskQueue := c.String(common.FlagTaskQueue) + tqName, err := tqname.FromBaseName(taskQueue) if err != nil { return err } - taskQueue := c.String(common.FlagTaskQueue) taskQueueType := strToTaskQueueType(c.String(common.FlagTaskQueueType)) + partitions := c.Int(common.FlagPartitions) ctx, cancel := common.NewContext(c) defer cancel() - resp, err := sdkClient.DescribeTaskQueue(ctx, taskQueue, taskQueueType) + + frontendClient := client.Factory(c.App).FrontendClient(c) + namespace, err := common.RequiredFlag(c, common.FlagNamespace) if err != nil { - return fmt.Errorf("unable to describe task queue: %w", err) + return err + } + + type statusWithPartition struct { + Partition int `json:"partition"` + taskqueuepb.TaskQueueStatus + } + type pollerWithPartition struct { + Partition int `json:"partition"` + taskqueuepb.PollerInfo + // copy this out to display nicer in table or card, but not json + VersionCaps *commonpb.WorkerVersionCapabilities `json:"-"` + } + + var statuses []any + var pollers []any + + for p := 0; p < partitions; p++ { + resp, err := frontendClient.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{ + Namespace: namespace, + TaskQueue: &taskqueuepb.TaskQueue{ + Name: tqName.WithPartition(p).FullName(), + Kind: enumspb.TASK_QUEUE_KIND_NORMAL, + }, + TaskQueueType: taskQueueType, + IncludeTaskQueueStatus: true, + }) + // note that even if it doesn't exist before this call, DescribeTaskQueue will return something + if err != nil { + return fmt.Errorf("unable to describe task queue: %w", err) + } + statuses = append(statuses, &statusWithPartition{ + Partition: p, + TaskQueueStatus: *resp.TaskQueueStatus, + }) + for _, pi := range resp.Pollers { + pollers = append(pollers, &pollerWithPartition{ + Partition: p, + PollerInfo: *pi, + VersionCaps: pi.WorkerVersionCapabilities, + }) + } + } + + if output.OutputOption(c.String(output.FlagOutput)) == output.JSON { + // handle specially so we output a single object instead of two + b, err := json.MarshalIndent(map[string]any{ + "taskQueues": statuses, + "pollers": pollers, + }, "", " ") + if err != nil { + return err + } + _, err = fmt.Println(string(b)) + return err } opts := &output.PrintOptions{ - // TODO enable when versioning feature is out - // Fields: []string{"Identity", "LastAccessTime", "RatePerSecond", "WorkerVersioningId"}, - Fields: []string{"Identity", "LastAccessTime", "RatePerSecond"}, + Fields: []string{"Partition", "TaskQueueStatus.RatePerSecond", "TaskQueueStatus.BacklogCountHint", "TaskQueueStatus.ReadLevel", "TaskQueueStatus.AckLevel", "TaskQueueStatus.TaskIdBlock"}, } - var items []interface{} - for _, e := range resp.Pollers { - items = append(items, e) + err = output.PrintItems(c, statuses, opts) + if err != nil { + return err } - return output.PrintItems(c, items, opts) + + opts = &output.PrintOptions{ + Fields: []string{"Partition", "PollerInfo.Identity", "PollerInfo.LastAccessTime", "PollerInfo.RatePerSecond", "VersionCaps.BuildId", "VersionCaps.UseVersioning"}, + } + return output.PrintItems(c, pollers, opts) } // ListTaskQueuePartitions gets all the taskqueue partition and host information. From 2b54379c5cdb0e78e4755ba920082585f4cd7e1e Mon Sep 17 00:00:00 2001 From: David Reiss Date: Mon, 11 Dec 2023 15:29:53 -0800 Subject: [PATCH 2/3] comments + rename --- common/defs-flags.go | 2 +- taskqueue/task_queue.go | 1 + taskqueue/task_queue_commands.go | 11 ++++++----- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/common/defs-flags.go b/common/defs-flags.go index 5eeddef7..414c61a0 100644 --- a/common/defs-flags.go +++ b/common/defs-flags.go @@ -135,7 +135,7 @@ const ( // Task Queue flags FlagTaskQueueName = "Name of the Task Queue." FlagTaskQueueTypeDefinition = "Task Queue type [workflow|activity]" - FlagPartitionsDefinition = "Query for all partitions up to this number" + FlagPartitionsDefinition = "Query for all partitions up to this number (experimental+temporary feature)" // Namespace update flags FlagActiveClusterDefinition = "Active cluster name." diff --git a/taskqueue/task_queue.go b/taskqueue/task_queue.go index d067cb4e..4db75d82 100644 --- a/taskqueue/task_queue.go +++ b/taskqueue/task_queue.go @@ -26,6 +26,7 @@ func NewTaskQueueCommands() []*cli.Command { Usage: common.FlagTaskQueueTypeDefinition, Category: common.CategoryMain, }, + // TOOD: remove this when the server does partition fan-out &cli.IntFlag{ Name: common.FlagPartitions, Value: 1, diff --git a/taskqueue/task_queue_commands.go b/taskqueue/task_queue_commands.go index fdf955ce..de723721 100644 --- a/taskqueue/task_queue_commands.go +++ b/taskqueue/task_queue_commands.go @@ -44,12 +44,13 @@ func DescribeTaskQueue(c *cli.Context) error { Partition int `json:"partition"` taskqueuepb.PollerInfo // copy this out to display nicer in table or card, but not json - VersionCaps *commonpb.WorkerVersionCapabilities `json:"-"` + Versioning *commonpb.WorkerVersionCapabilities `json:"-"` } var statuses []any var pollers []any + // TOOD: remove this when the server does partition fan-out for p := 0; p < partitions; p++ { resp, err := frontendClient.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{ Namespace: namespace, @@ -70,9 +71,9 @@ func DescribeTaskQueue(c *cli.Context) error { }) for _, pi := range resp.Pollers { pollers = append(pollers, &pollerWithPartition{ - Partition: p, - PollerInfo: *pi, - VersionCaps: pi.WorkerVersionCapabilities, + Partition: p, + PollerInfo: *pi, + Versioning: pi.WorkerVersionCapabilities, }) } } @@ -99,7 +100,7 @@ func DescribeTaskQueue(c *cli.Context) error { } opts = &output.PrintOptions{ - Fields: []string{"Partition", "PollerInfo.Identity", "PollerInfo.LastAccessTime", "PollerInfo.RatePerSecond", "VersionCaps.BuildId", "VersionCaps.UseVersioning"}, + Fields: []string{"Partition", "PollerInfo.Identity", "PollerInfo.LastAccessTime", "PollerInfo.RatePerSecond", "Versioning.BuildId", "Versioning.UseVersioning"}, } return output.PrintItems(c, pollers, opts) } From c4c0c250ffe633f546875f5add4da52c83a2af35 Mon Sep 17 00:00:00 2001 From: David Reiss Date: Mon, 11 Dec 2023 21:43:42 -0800 Subject: [PATCH 3/3] unit test --- app/app_test.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/app/app_test.go b/app/app_test.go index 026bbec3..fb7bbc6e 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -13,7 +13,6 @@ import ( "github.com/golang/mock/gomock" "github.com/pborman/uuid" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/temporalio/cli/app" "github.com/temporalio/cli/client" @@ -100,7 +99,6 @@ func (s *cliAppSuite) SetupTest() { } func (s *cliAppSuite) TearDownTest() { - s.mockCtrl.Finish() // assert mock’s expectations } func (s *cliAppSuite) TestTopLevelCommands() { @@ -119,6 +117,19 @@ var describeTaskQueueResponse = &workflowservice.DescribeTaskQueueResponse{ { LastAccessTime: timestamp.TimePtr(time.Now().UTC()), Identity: "tester", + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: "some-build-id", + UseVersioning: false, + }, + }, + }, + TaskQueueStatus: &taskqueuepb.TaskQueueStatus{ + BacklogCountHint: 0, + ReadLevel: 100000, + AckLevel: 100000, + TaskIdBlock: &taskqueuepb.TaskIdBlock{ + StartId: 100001, + EndId: 200000, }, }, } @@ -176,17 +187,15 @@ func (s *cliAppSuite) TestAcceptStringSliceArgsWithCommas() { } func (s *cliAppSuite) TestDescribeTaskQueue() { - s.sdkClient.On("DescribeTaskQueue", mock.Anything, mock.Anything, mock.Anything).Return(describeTaskQueueResponse, nil).Once() + s.frontendClient.EXPECT().DescribeTaskQueue(gomock.Any(), gomock.Any()).Return(describeTaskQueueResponse, nil) err := s.app.Run([]string{"", "task-queue", "describe", "--task-queue", "test-taskQueue", "--namespace", cliTestNamespace}) s.Nil(err) - s.sdkClient.AssertExpectations(s.T()) } func (s *cliAppSuite) TestDescribeTaskQueue_Activity() { - s.sdkClient.On("DescribeTaskQueue", mock.Anything, mock.Anything, mock.Anything).Return(describeTaskQueueResponse, nil).Once() + s.frontendClient.EXPECT().DescribeTaskQueue(gomock.Any(), gomock.Any()).Return(describeTaskQueueResponse, nil) err := s.app.Run([]string{"", "task-queue", "describe", "--namespace", cliTestNamespace, "--task-queue", "test-taskQueue", "--task-queue-type", "activity"}) s.Nil(err) - s.sdkClient.AssertExpectations(s.T()) } // TestFlagCategory_IsSet verifies that command flags have Category set