Skip to content

Commit

Permalink
Enhance task-queue describe (#399)
Browse files Browse the repository at this point in the history
## What was changed
- `temporal task-queue describe` prints the `TaskQueueStatus` parts of
the `DescribeTaskQueue` response
- It now takes a `--partitions` flag to query for multiple partitions.

Partition fan-out is more properly done server-side, but that'll take
longer and in the meantime (and on older server versions) this is still
useful.

## Why?
More useful for debugging.

## Checklist

2. How was this tested:
Manually
  • Loading branch information
dnr authored Dec 12, 2023
1 parent 197f9a0 commit 54bb4b3
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 17 deletions.
21 changes: 15 additions & 6 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/golang/mock/gomock"
"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/temporalio/cli/app"
"github.com/temporalio/cli/client"
Expand Down Expand Up @@ -100,7 +99,6 @@ func (s *cliAppSuite) SetupTest() {
}

func (s *cliAppSuite) TearDownTest() {
s.mockCtrl.Finish() // assert mock’s expectations
}

func (s *cliAppSuite) TestTopLevelCommands() {
Expand All @@ -119,6 +117,19 @@ var describeTaskQueueResponse = &workflowservice.DescribeTaskQueueResponse{
{
LastAccessTime: timestamp.TimePtr(time.Now().UTC()),
Identity: "tester",
WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{
BuildId: "some-build-id",
UseVersioning: false,
},
},
},
TaskQueueStatus: &taskqueuepb.TaskQueueStatus{
BacklogCountHint: 0,
ReadLevel: 100000,
AckLevel: 100000,
TaskIdBlock: &taskqueuepb.TaskIdBlock{
StartId: 100001,
EndId: 200000,
},
},
}
Expand Down Expand Up @@ -176,17 +187,15 @@ func (s *cliAppSuite) TestAcceptStringSliceArgsWithCommas() {
}

func (s *cliAppSuite) TestDescribeTaskQueue() {
s.sdkClient.On("DescribeTaskQueue", mock.Anything, mock.Anything, mock.Anything).Return(describeTaskQueueResponse, nil).Once()
s.frontendClient.EXPECT().DescribeTaskQueue(gomock.Any(), gomock.Any()).Return(describeTaskQueueResponse, nil)
err := s.app.Run([]string{"", "task-queue", "describe", "--task-queue", "test-taskQueue", "--namespace", cliTestNamespace})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

func (s *cliAppSuite) TestDescribeTaskQueue_Activity() {
s.sdkClient.On("DescribeTaskQueue", mock.Anything, mock.Anything, mock.Anything).Return(describeTaskQueueResponse, nil).Once()
s.frontendClient.EXPECT().DescribeTaskQueue(gomock.Any(), gomock.Any()).Return(describeTaskQueueResponse, nil)
err := s.app.Run([]string{"", "task-queue", "describe", "--namespace", cliTestNamespace, "--task-queue", "test-taskQueue", "--task-queue-type", "activity"})
s.Nil(err)
s.sdkClient.AssertExpectations(s.T())
}

// TestFlagCategory_IsSet verifies that command flags have Category set
Expand Down
1 change: 1 addition & 0 deletions common/defs-flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ const (
// Task Queue flags
FlagTaskQueueName = "Name of the Task Queue."
FlagTaskQueueTypeDefinition = "Task Queue type [workflow|activity]"
FlagPartitionsDefinition = "Query for all partitions up to this number (experimental+temporary feature)"

// Namespace update flags
FlagActiveClusterDefinition = "Active cluster name."
Expand Down
1 change: 1 addition & 0 deletions common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
FlagOverlapPolicy = "overlap-policy"
FlagOwnerEmail = "email"
FlagParallelism = "input-parallelism"
FlagPartitions = "partitions"
FlagPause = "pause"
FlagPauseOnFailure = "pause-on-failure"
FlagPort = "port"
Expand Down
7 changes: 7 additions & 0 deletions taskqueue/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ func NewTaskQueueCommands() []*cli.Command {
Usage: common.FlagTaskQueueTypeDefinition,
Category: common.CategoryMain,
},
// TOOD: remove this when the server does partition fan-out
&cli.IntFlag{
Name: common.FlagPartitions,
Value: 1,
Usage: common.FlagPartitionsDefinition,
Category: common.CategoryMain,
},
}, common.FlagsForFormatting...),
Action: func(c *cli.Context) error {
return DescribeTaskQueue(c)
Expand Down
85 changes: 74 additions & 11 deletions taskqueue/task_queue_commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package taskqueue

import (
"encoding/json"
"fmt"
"strings"

Expand All @@ -9,37 +10,99 @@ import (
"github.com/temporalio/tctl-kit/pkg/color"
"github.com/temporalio/tctl-kit/pkg/output"
"github.com/urfave/cli/v2"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/tqname"
)

// DescribeTaskQueue show pollers info of a given taskqueue
func DescribeTaskQueue(c *cli.Context) error {
sdkClient, err := client.GetSDKClient(c)
taskQueue := c.String(common.FlagTaskQueue)
tqName, err := tqname.FromBaseName(taskQueue)
if err != nil {
return err
}
taskQueue := c.String(common.FlagTaskQueue)
taskQueueType := strToTaskQueueType(c.String(common.FlagTaskQueueType))
partitions := c.Int(common.FlagPartitions)

ctx, cancel := common.NewContext(c)
defer cancel()
resp, err := sdkClient.DescribeTaskQueue(ctx, taskQueue, taskQueueType)

frontendClient := client.Factory(c.App).FrontendClient(c)
namespace, err := common.RequiredFlag(c, common.FlagNamespace)
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
return err
}

type statusWithPartition struct {
Partition int `json:"partition"`
taskqueuepb.TaskQueueStatus
}
type pollerWithPartition struct {
Partition int `json:"partition"`
taskqueuepb.PollerInfo
// copy this out to display nicer in table or card, but not json
Versioning *commonpb.WorkerVersionCapabilities `json:"-"`
}

var statuses []any
var pollers []any

// TOOD: remove this when the server does partition fan-out
for p := 0; p < partitions; p++ {
resp, err := frontendClient.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueuepb.TaskQueue{
Name: tqName.WithPartition(p).FullName(),
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
IncludeTaskQueueStatus: true,
})
// note that even if it doesn't exist before this call, DescribeTaskQueue will return something
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
}
statuses = append(statuses, &statusWithPartition{
Partition: p,
TaskQueueStatus: *resp.TaskQueueStatus,
})
for _, pi := range resp.Pollers {
pollers = append(pollers, &pollerWithPartition{
Partition: p,
PollerInfo: *pi,
Versioning: pi.WorkerVersionCapabilities,
})
}
}

if output.OutputOption(c.String(output.FlagOutput)) == output.JSON {
// handle specially so we output a single object instead of two
b, err := json.MarshalIndent(map[string]any{
"taskQueues": statuses,
"pollers": pollers,
}, "", " ")
if err != nil {
return err
}
_, err = fmt.Println(string(b))
return err
}

opts := &output.PrintOptions{
// TODO enable when versioning feature is out
// Fields: []string{"Identity", "LastAccessTime", "RatePerSecond", "WorkerVersioningId"},
Fields: []string{"Identity", "LastAccessTime", "RatePerSecond"},
Fields: []string{"Partition", "TaskQueueStatus.RatePerSecond", "TaskQueueStatus.BacklogCountHint", "TaskQueueStatus.ReadLevel", "TaskQueueStatus.AckLevel", "TaskQueueStatus.TaskIdBlock"},
}
var items []interface{}
for _, e := range resp.Pollers {
items = append(items, e)
err = output.PrintItems(c, statuses, opts)
if err != nil {
return err
}
return output.PrintItems(c, items, opts)

opts = &output.PrintOptions{
Fields: []string{"Partition", "PollerInfo.Identity", "PollerInfo.LastAccessTime", "PollerInfo.RatePerSecond", "Versioning.BuildId", "Versioning.UseVersioning"},
}
return output.PrintItems(c, pollers, opts)
}

// ListTaskQueuePartitions gets all the taskqueue partition and host information.
Expand Down

0 comments on commit 54bb4b3

Please sign in to comment.