Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance task-queue describe #399

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/defs-flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ const (
// Task Queue flags
FlagTaskQueueName = "Name of the Task Queue."
FlagTaskQueueTypeDefinition = "Task Queue type [workflow|activity]"
FlagPartitionsDefinition = "Query for all partitions up to this number"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this may be removed in the future when implemented server-side, can we mark this flag as experimental and add a note?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely


// Namespace update flags
FlagActiveClusterDefinition = "Active cluster name."
Expand Down
1 change: 1 addition & 0 deletions common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ var (
FlagOverlapPolicy = "overlap-policy"
FlagOwnerEmail = "email"
FlagParallelism = "input-parallelism"
FlagPartitions = "partitions"
FlagPause = "pause"
FlagPauseOnFailure = "pause-on-failure"
FlagPort = "port"
Expand Down
6 changes: 6 additions & 0 deletions taskqueue/task_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func NewTaskQueueCommands() []*cli.Command {
Usage: common.FlagTaskQueueTypeDefinition,
Category: common.CategoryMain,
},
&cli.IntFlag{
Name: common.FlagPartitions,
Value: 1,
Usage: common.FlagPartitionsDefinition,
Category: common.CategoryMain,
},
}, common.FlagsForFormatting...),
Action: func(c *cli.Context) error {
return DescribeTaskQueue(c)
Expand Down
84 changes: 73 additions & 11 deletions taskqueue/task_queue_commands.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package taskqueue

import (
"encoding/json"
"fmt"
"strings"

Expand All @@ -9,37 +10,98 @@ import (
"github.com/temporalio/tctl-kit/pkg/color"
"github.com/temporalio/tctl-kit/pkg/output"
"github.com/urfave/cli/v2"
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/server/common/tqname"
)

// DescribeTaskQueue show pollers info of a given taskqueue
func DescribeTaskQueue(c *cli.Context) error {
sdkClient, err := client.GetSDKClient(c)
taskQueue := c.String(common.FlagTaskQueue)
tqName, err := tqname.FromBaseName(taskQueue)
if err != nil {
return err
}
taskQueue := c.String(common.FlagTaskQueue)
taskQueueType := strToTaskQueueType(c.String(common.FlagTaskQueueType))
partitions := c.Int(common.FlagPartitions)

ctx, cancel := common.NewContext(c)
defer cancel()
resp, err := sdkClient.DescribeTaskQueue(ctx, taskQueue, taskQueueType)

frontendClient := client.Factory(c.App).FrontendClient(c)
namespace, err := common.RequiredFlag(c, common.FlagNamespace)
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
return err
}

type statusWithPartition struct {
Partition int `json:"partition"`
taskqueuepb.TaskQueueStatus
}
type pollerWithPartition struct {
Partition int `json:"partition"`
taskqueuepb.PollerInfo
// copy this out to display nicer in table or card, but not json
VersionCaps *commonpb.WorkerVersionCapabilities `json:"-"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still printed in the JSON view, just not flattened?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's in pollerinfo. but the table+card output functions can't handle doubly-nested fields so I had to pull it out. but json can of course (if I handle it separately)

}

var statuses []any
var pollers []any

for p := 0; p < partitions; p++ {
resp, err := frontendClient.DescribeTaskQueue(ctx, &workflowservice.DescribeTaskQueueRequest{
Namespace: namespace,
TaskQueue: &taskqueuepb.TaskQueue{
Name: tqName.WithPartition(p).FullName(),
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
},
TaskQueueType: taskQueueType,
IncludeTaskQueueStatus: true,
})
// note that even if it doesn't exist before this call, DescribeTaskQueue will return something
if err != nil {
return fmt.Errorf("unable to describe task queue: %w", err)
}
statuses = append(statuses, &statusWithPartition{
Partition: p,
TaskQueueStatus: *resp.TaskQueueStatus,
})
for _, pi := range resp.Pollers {
pollers = append(pollers, &pollerWithPartition{
Partition: p,
PollerInfo: *pi,
VersionCaps: pi.WorkerVersionCapabilities,
})
}
}

if output.OutputOption(c.String(output.FlagOutput)) == output.JSON {
// handle specially so we output a single object instead of two
b, err := json.MarshalIndent(map[string]any{
"taskQueues": statuses,
"pollers": pollers,
}, "", " ")
if err != nil {
return err
}
_, err = fmt.Println(string(b))
return err
}

opts := &output.PrintOptions{
// TODO enable when versioning feature is out
// Fields: []string{"Identity", "LastAccessTime", "RatePerSecond", "WorkerVersioningId"},
Fields: []string{"Identity", "LastAccessTime", "RatePerSecond"},
Fields: []string{"Partition", "TaskQueueStatus.RatePerSecond", "TaskQueueStatus.BacklogCountHint", "TaskQueueStatus.ReadLevel", "TaskQueueStatus.AckLevel", "TaskQueueStatus.TaskIdBlock"},
}
var items []interface{}
for _, e := range resp.Pollers {
items = append(items, e)
err = output.PrintItems(c, statuses, opts)
if err != nil {
return err
}
return output.PrintItems(c, items, opts)

opts = &output.PrintOptions{
Fields: []string{"Partition", "PollerInfo.Identity", "PollerInfo.LastAccessTime", "PollerInfo.RatePerSecond", "VersionCaps.BuildId", "VersionCaps.UseVersioning"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we spell Capabilities out or use a different field name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use WorkerVersionCapabilities but that would shadow the field in PollerInfo. How about Versioning?

note that the first part of the field name is not displayed in table view, so it's mostly not visible. it is in card view or as the input to --fields

}
return output.PrintItems(c, pollers, opts)
}

// ListTaskQueuePartitions gets all the taskqueue partition and host information.
Expand Down
Loading