Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more operator cluster commands #465

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ func NewTemporalOperatorClusterCommand(cctx *CommandContext, parent *TemporalOpe
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalOperatorClusterDescribeCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterHealthCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterListCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterRemoveCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterSystemCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterUpsertCommand(cctx, &s).Command)
return &s
}

Expand Down Expand Up @@ -359,6 +362,61 @@ func NewTemporalOperatorClusterHealthCommand(cctx *CommandContext, parent *Tempo
return &s
}

type TemporalOperatorClusterListCommand struct {
Parent *TemporalOperatorClusterCommand
Command cobra.Command
Limit int
}

func NewTemporalOperatorClusterListCommand(cctx *CommandContext, parent *TemporalOperatorClusterCommand) *TemporalOperatorClusterListCommand {
var s TemporalOperatorClusterListCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "list [flags]"
s.Command.Short = "List all clusters"
if hasHighlighting {
s.Command.Long = "\x1b[1mtemporal operator cluster list\x1b[0m command prints a list of all remote Clusters on the system."
} else {
s.Command.Long = "`temporal operator cluster list` command prints a list of all remote Clusters on the system."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().IntVar(&s.Limit, "limit", 0, "Limit the number of items to print.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalOperatorClusterRemoveCommand struct {
Parent *TemporalOperatorClusterCommand
Command cobra.Command
Name string
}

func NewTemporalOperatorClusterRemoveCommand(cctx *CommandContext, parent *TemporalOperatorClusterCommand) *TemporalOperatorClusterRemoveCommand {
var s TemporalOperatorClusterRemoveCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "remove [flags]"
s.Command.Short = "Remove a cluster"
if hasHighlighting {
s.Command.Long = "\x1b[1mtemporal operator cluster remove\x1b[0m command removes a remote Cluster from the system."
} else {
s.Command.Long = "`temporal operator cluster remove` command removes a remote Cluster from the system."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.Name, "name", "", "Name of cluster.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "name")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalOperatorClusterSystemCommand struct {
Parent *TemporalOperatorClusterCommand
Command cobra.Command
Expand All @@ -384,6 +442,36 @@ func NewTemporalOperatorClusterSystemCommand(cctx *CommandContext, parent *Tempo
return &s
}

type TemporalOperatorClusterUpsertCommand struct {
Parent *TemporalOperatorClusterCommand
Command cobra.Command
FrontendAddress string
EnableConnection bool
}

func NewTemporalOperatorClusterUpsertCommand(cctx *CommandContext, parent *TemporalOperatorClusterCommand) *TemporalOperatorClusterUpsertCommand {
var s TemporalOperatorClusterUpsertCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "upsert [flags]"
s.Command.Short = "Add a remote"
if hasHighlighting {
s.Command.Long = "\x1b[1mtemporal operator cluster upsert\x1b[0m command allows the user to add or update a remote Cluster."
} else {
s.Command.Long = "`temporal operator cluster upsert` command allows the user to add or update a remote Cluster."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.FrontendAddress, "frontend-address", "", "IP address to bind the frontend service to.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "frontend-address")
s.Command.Flags().BoolVar(&s.EnableConnection, "enable-connection", false, "enable cross cluster connection.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalServerCommand struct {
Parent *TemporalCommand
Command cobra.Command
Expand Down
100 changes: 100 additions & 0 deletions temporalcli/commands.operator_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/fatih/color"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
)
Expand Down Expand Up @@ -86,3 +87,102 @@ func (c *TemporalOperatorClusterDescribeCommand) run(cctx *CommandContext, args
Table: &printer.TableOptions{},
})
}

func (c *TemporalOperatorClusterListCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

var nextPageToken []byte
var execsProcessed int
for pageIndex := 0; ; pageIndex++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of those cases I wonder if we never expect a large list and therefore should just collect the list and dump all at once. Regardless, I added to #448 to revisit.

page, err := cl.OperatorService().ListClusters(cctx, &operatorservice.ListClustersRequest{
NextPageToken: nextPageToken,
})
if err != nil {
return fmt.Errorf("failed listing clusters: %w", err)
}
var textTable []map[string]any
for _, cluster := range page.GetClusters() {
if c.Limit > 0 && execsProcessed >= c.Limit {
break
}
execsProcessed++
// For JSON we are going to dump one line of JSON per execution
if cctx.JSONOutput {
_ = cctx.Printer.PrintStructured(cluster, printer.StructuredOptions{})
} else {
// For non-JSON, we are doing a table for each page
textTable = append(textTable, map[string]any{
"Name": cluster.ClusterName,
"ClusterId": cluster.ClusterId,
"Address": cluster.Address,
"HistoryShardCount": cluster.HistoryShardCount,
"InitialFailoverVersion": cluster.InitialFailoverVersion,
"IsConnectionEnabled": cluster.IsConnectionEnabled,
})
}
}
// Print table, headers only on first table
if len(textTable) > 0 {
_ = cctx.Printer.PrintStructured(textTable, printer.StructuredOptions{
Fields: []string{"Name", "ClusterId", "Address", "HistoryShardCount", "InitialFailoverVersion", "IsConnectionEnabled"},
Table: &printer.TableOptions{NoHeader: pageIndex > 0},
})
}
// Stop if next page token non-existing or executions reached limit
nextPageToken = page.GetNextPageToken()
if len(nextPageToken) == 0 || (c.Limit > 0 && execsProcessed >= c.Limit) {
return nil
}
}
}

func (c *TemporalOperatorClusterUpsertCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()
_, err = cl.OperatorService().AddOrUpdateRemoteCluster(cctx, &operatorservice.AddOrUpdateRemoteClusterRequest{
FrontendAddress: c.FrontendAddress,
EnableRemoteClusterConnection: c.EnableConnection,
})
if err != nil {
return fmt.Errorf("unable to upsert cluster: %w", err)
}
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(
struct {
FrontendAddress string `json:"frontendAddress"`
}{FrontendAddress: c.FrontendAddress},
printer.StructuredOptions{})
}
cctx.Printer.Println(color.GreenString(fmt.Sprintf("Upserted cluster %s", c.FrontendAddress)))
return nil
}

func (c *TemporalOperatorClusterRemoveCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()
_, err = cl.OperatorService().RemoveRemoteCluster(cctx, &operatorservice.RemoveRemoteClusterRequest{
ClusterName: c.Name,
})
if err != nil {
return fmt.Errorf("failed removing cluster: %w", err)
}
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(
struct {
ClusterName string `json:"clusterName"`
}{ClusterName: c.Name},
printer.StructuredOptions{})
}
cctx.Printer.Println(color.GreenString(fmt.Sprintf("Removed cluster %s", c.Name)))
return nil
}
104 changes: 104 additions & 0 deletions temporalcli/commands.operator_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package temporalcli_test

import (
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/temporalio/cli/temporalcli"
"github.com/temporalio/cli/temporalcli/devserver"
"go.temporal.io/api/workflowservice/v1"
)

Expand Down Expand Up @@ -79,3 +83,103 @@ func (s *SharedServerSuite) TestOperator_Cluster_Health() {
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal(jsonOut["status"], "SERVING")
}

func (s *SharedServerSuite) TestOperator_Cluster_Operations() {
// Create some clusters
standbyCluster1 := StartDevServer(s.Suite.T(), DevServerOptions{
StartOptions: devserver.StartOptions{
MasterClusterName: "standby1",
CurrentClusterName: "standby1",
InitialFailoverVersion: 2,
EnableGlobalNamespace: true,
},
})
defer standbyCluster1.Stop()

standbyCluster2 := StartDevServer(s.Suite.T(), DevServerOptions{
StartOptions: devserver.StartOptions{
MasterClusterName: "standby2",
CurrentClusterName: "standby2",
InitialFailoverVersion: 3,
EnableGlobalNamespace: true,
},
})
defer standbyCluster2.Stop()

// Upsert the clusters

// Text
res := s.Execute(
"operator", "cluster", "upsert",
"--frontend-address", standbyCluster1.Address(),
"--address", s.Address(),
)
s.NoError(res.Err)
out := res.Stdout.String()
s.Equal("Upserted cluster "+standbyCluster1.Address()+"\n", out)

// JSON
res = s.Execute(
"operator", "cluster", "upsert",
"--frontend-address", standbyCluster2.Address(),
"--address", s.Address(),
"-o", "json",
)
s.NoError(res.Err)
var jsonOut map[string]string
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal(jsonOut["frontendAddress"], standbyCluster2.Address())

// List the clusters

// Text
res = s.Execute(
"operator", "cluster", "list",
"--address", s.Address(),
)
s.NoError(res.Err)
out = res.Stdout.String()
s.ContainsOnSameLine(out, "Name", "ClusterId", "Address", "HistoryShardCount", "InitialFailoverVersion", "IsConnectionEnabled")
s.ContainsOnSameLine(out, "active", s.DevServer.Options.ClusterID, s.DevServer.Address(), "1", "1", "true")
s.ContainsOnSameLine(out, standbyCluster1.Options.CurrentClusterName, standbyCluster1.Options.ClusterID, standbyCluster1.Address(), "1", strconv.Itoa(standbyCluster1.Options.InitialFailoverVersion), "false")
s.ContainsOnSameLine(out, standbyCluster2.Options.CurrentClusterName, standbyCluster2.Options.ClusterID, standbyCluster2.Address(), "1", strconv.Itoa(standbyCluster2.Options.InitialFailoverVersion), "false")

// JSON
res = s.Execute(
"operator", "cluster", "list",
"--address", s.Address(),
"-o", "json",
)
s.NoError(res.Err)
out = res.Stdout.String()
// If we improve the output of list commands https://github.com/temporalio/cli/issues/448
// we can do more detailed checks here.
s.ContainsOnSameLine(out, fmt.Sprintf("\"clusterId\": \"%s\"", s.DevServer.Options.ClusterID))
s.ContainsOnSameLine(out, fmt.Sprintf("\"clusterId\": \"%s\"", standbyCluster1.Options.ClusterID))
s.ContainsOnSameLine(out, fmt.Sprintf("\"clusterId\": \"%s\"", standbyCluster2.Options.ClusterID))

// Need to wait for the cluster cache to be updated
time.Sleep(90 * time.Second)

// Remove the clusters

// Test
res = s.Execute(
"operator", "cluster", "remove",
"--name", standbyCluster1.Options.CurrentClusterName,
"--address", s.Address(),
)
s.NoError(res.Err)
s.Equal(fmt.Sprintf("Removed cluster %s\n", standbyCluster1.Options.CurrentClusterName), res.Stdout.String())

// JSON
res = s.Execute(
"operator", "cluster", "remove",
"--name", standbyCluster2.Options.CurrentClusterName,
"--address", s.Address(),
"-o", "json",
)
jsonOut = make(map[string]string)
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal(jsonOut["clusterName"], standbyCluster2.Options.CurrentClusterName)
}
18 changes: 11 additions & 7 deletions temporalcli/commands.server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
}
// Prepare options
opts := devserver.StartOptions{
FrontendIP: t.Ip,
FrontendPort: t.Port,
Namespaces: append([]string{"default"}, t.Namespace...),
Logger: cctx.Logger,
DatabaseFile: t.DbFilename,
MetricsPort: t.MetricsPort,
FrontendHTTPPort: t.HttpPort,
FrontendIP: t.Ip,
FrontendPort: t.Port,
Namespaces: append([]string{"default"}, t.Namespace...),
Logger: cctx.Logger,
DatabaseFile: t.DbFilename,
MetricsPort: t.MetricsPort,
FrontendHTTPPort: t.HttpPort,
ClusterID: uuid.NewString(),
MasterClusterName: "active",
CurrentClusterName: "active",
InitialFailoverVersion: 1,
}
if t.LogLevelServer.Value == "never" {
opts.LogLevel = 100
Expand Down
20 changes: 19 additions & 1 deletion temporalcli/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,12 @@ type SharedServerSuite struct {
}

func (s *SharedServerSuite) SetupSuite() {
s.DevServer = StartDevServer(s.Suite.T(), DevServerOptions{})
s.DevServer = StartDevServer(s.Suite.T(), DevServerOptions{
StartOptions: devserver.StartOptions{
// Enable for operator cluster commands
EnableGlobalNamespace: true,
},
})
// Stop server if we fail later
success := false
defer func() {
Expand Down Expand Up @@ -252,6 +257,19 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer {
if len(d.Options.Namespaces) == 0 {
d.Options.Namespaces = []string{"default"}
}
if d.Options.MasterClusterName == "" {
d.Options.MasterClusterName = "active"
}
if d.Options.CurrentClusterName == "" {
d.Options.CurrentClusterName = "active"
}
if d.Options.ClusterID == "" {
d.Options.ClusterID = uuid.New().String()
}
if d.Options.InitialFailoverVersion == 0 {
d.Options.InitialFailoverVersion = 1
}

if d.Options.Logger == nil {
w := &concurrentWriter{w: &d.logOutput, wLock: &d.logOutputLock}
d.Options.Logger = slog.New(slog.NewTextHandler(w, &slog.HandlerOptions{AddSource: true}))
Expand Down
Loading
Loading