Skip to content

Commit

Permalink
Add more operator cluster commands (#465)
Browse files Browse the repository at this point in the history
Add `operator cluster list`, `operator cluster upsert` and `operator
cluster remove`.

To write the tests I had to expose a bunch of cluster configs in the
Test server and make sure the test could actually read them to use and
verify the information.
  • Loading branch information
Quinn-With-Two-Ns authored Feb 20, 2024
1 parent f6758c8 commit 0454314
Show file tree
Hide file tree
Showing 7 changed files with 383 additions and 31 deletions.
88 changes: 88 additions & 0 deletions temporalcli/commands.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ func NewTemporalOperatorClusterCommand(cctx *CommandContext, parent *TemporalOpe
s.Command.Args = cobra.NoArgs
s.Command.AddCommand(&NewTemporalOperatorClusterDescribeCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterHealthCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterListCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterRemoveCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterSystemCommand(cctx, &s).Command)
s.Command.AddCommand(&NewTemporalOperatorClusterUpsertCommand(cctx, &s).Command)
return &s
}

Expand Down Expand Up @@ -359,6 +362,61 @@ func NewTemporalOperatorClusterHealthCommand(cctx *CommandContext, parent *Tempo
return &s
}

type TemporalOperatorClusterListCommand struct {
Parent *TemporalOperatorClusterCommand
Command cobra.Command
Limit int
}

func NewTemporalOperatorClusterListCommand(cctx *CommandContext, parent *TemporalOperatorClusterCommand) *TemporalOperatorClusterListCommand {
var s TemporalOperatorClusterListCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "list [flags]"
s.Command.Short = "List all clusters"
if hasHighlighting {
s.Command.Long = "\x1b[1mtemporal operator cluster list\x1b[0m command prints a list of all remote Clusters on the system."
} else {
s.Command.Long = "`temporal operator cluster list` command prints a list of all remote Clusters on the system."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().IntVar(&s.Limit, "limit", 0, "Limit the number of items to print.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalOperatorClusterRemoveCommand struct {
Parent *TemporalOperatorClusterCommand
Command cobra.Command
Name string
}

func NewTemporalOperatorClusterRemoveCommand(cctx *CommandContext, parent *TemporalOperatorClusterCommand) *TemporalOperatorClusterRemoveCommand {
var s TemporalOperatorClusterRemoveCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "remove [flags]"
s.Command.Short = "Remove a cluster"
if hasHighlighting {
s.Command.Long = "\x1b[1mtemporal operator cluster remove\x1b[0m command removes a remote Cluster from the system."
} else {
s.Command.Long = "`temporal operator cluster remove` command removes a remote Cluster from the system."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.Name, "name", "", "Name of cluster.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "name")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalOperatorClusterSystemCommand struct {
Parent *TemporalOperatorClusterCommand
Command cobra.Command
Expand All @@ -384,6 +442,36 @@ func NewTemporalOperatorClusterSystemCommand(cctx *CommandContext, parent *Tempo
return &s
}

type TemporalOperatorClusterUpsertCommand struct {
Parent *TemporalOperatorClusterCommand
Command cobra.Command
FrontendAddress string
EnableConnection bool
}

func NewTemporalOperatorClusterUpsertCommand(cctx *CommandContext, parent *TemporalOperatorClusterCommand) *TemporalOperatorClusterUpsertCommand {
var s TemporalOperatorClusterUpsertCommand
s.Parent = parent
s.Command.DisableFlagsInUseLine = true
s.Command.Use = "upsert [flags]"
s.Command.Short = "Add a remote"
if hasHighlighting {
s.Command.Long = "\x1b[1mtemporal operator cluster upsert\x1b[0m command allows the user to add or update a remote Cluster."
} else {
s.Command.Long = "`temporal operator cluster upsert` command allows the user to add or update a remote Cluster."
}
s.Command.Args = cobra.NoArgs
s.Command.Flags().StringVar(&s.FrontendAddress, "frontend-address", "", "IP address to bind the frontend service to.")
_ = cobra.MarkFlagRequired(s.Command.Flags(), "frontend-address")
s.Command.Flags().BoolVar(&s.EnableConnection, "enable-connection", false, "enable cross cluster connection.")
s.Command.Run = func(c *cobra.Command, args []string) {
if err := s.run(cctx, args); err != nil {
cctx.Options.Fail(err)
}
}
return &s
}

type TemporalServerCommand struct {
Parent *TemporalCommand
Command cobra.Command
Expand Down
100 changes: 100 additions & 0 deletions temporalcli/commands.operator_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/fatih/color"
"github.com/temporalio/cli/temporalcli/internal/printer"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
)
Expand Down Expand Up @@ -86,3 +87,102 @@ func (c *TemporalOperatorClusterDescribeCommand) run(cctx *CommandContext, args
Table: &printer.TableOptions{},
})
}

func (c *TemporalOperatorClusterListCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()

var nextPageToken []byte
var execsProcessed int
for pageIndex := 0; ; pageIndex++ {
page, err := cl.OperatorService().ListClusters(cctx, &operatorservice.ListClustersRequest{
NextPageToken: nextPageToken,
})
if err != nil {
return fmt.Errorf("failed listing clusters: %w", err)
}
var textTable []map[string]any
for _, cluster := range page.GetClusters() {
if c.Limit > 0 && execsProcessed >= c.Limit {
break
}
execsProcessed++
// For JSON we are going to dump one line of JSON per execution
if cctx.JSONOutput {
_ = cctx.Printer.PrintStructured(cluster, printer.StructuredOptions{})
} else {
// For non-JSON, we are doing a table for each page
textTable = append(textTable, map[string]any{
"Name": cluster.ClusterName,
"ClusterId": cluster.ClusterId,
"Address": cluster.Address,
"HistoryShardCount": cluster.HistoryShardCount,
"InitialFailoverVersion": cluster.InitialFailoverVersion,
"IsConnectionEnabled": cluster.IsConnectionEnabled,
})
}
}
// Print table, headers only on first table
if len(textTable) > 0 {
_ = cctx.Printer.PrintStructured(textTable, printer.StructuredOptions{
Fields: []string{"Name", "ClusterId", "Address", "HistoryShardCount", "InitialFailoverVersion", "IsConnectionEnabled"},
Table: &printer.TableOptions{NoHeader: pageIndex > 0},
})
}
// Stop if next page token non-existing or executions reached limit
nextPageToken = page.GetNextPageToken()
if len(nextPageToken) == 0 || (c.Limit > 0 && execsProcessed >= c.Limit) {
return nil
}
}
}

func (c *TemporalOperatorClusterUpsertCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()
_, err = cl.OperatorService().AddOrUpdateRemoteCluster(cctx, &operatorservice.AddOrUpdateRemoteClusterRequest{
FrontendAddress: c.FrontendAddress,
EnableRemoteClusterConnection: c.EnableConnection,
})
if err != nil {
return fmt.Errorf("unable to upsert cluster: %w", err)
}
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(
struct {
FrontendAddress string `json:"frontendAddress"`
}{FrontendAddress: c.FrontendAddress},
printer.StructuredOptions{})
}
cctx.Printer.Println(color.GreenString(fmt.Sprintf("Upserted cluster %s", c.FrontendAddress)))
return nil
}

func (c *TemporalOperatorClusterRemoveCommand) run(cctx *CommandContext, args []string) error {
cl, err := c.Parent.Parent.ClientOptions.dialClient(cctx)
if err != nil {
return err
}
defer cl.Close()
_, err = cl.OperatorService().RemoveRemoteCluster(cctx, &operatorservice.RemoveRemoteClusterRequest{
ClusterName: c.Name,
})
if err != nil {
return fmt.Errorf("failed removing cluster: %w", err)
}
if cctx.JSONOutput {
return cctx.Printer.PrintStructured(
struct {
ClusterName string `json:"clusterName"`
}{ClusterName: c.Name},
printer.StructuredOptions{})
}
cctx.Printer.Println(color.GreenString(fmt.Sprintf("Removed cluster %s", c.Name)))
return nil
}
104 changes: 104 additions & 0 deletions temporalcli/commands.operator_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ package temporalcli_test

import (
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/temporalio/cli/temporalcli"
"github.com/temporalio/cli/temporalcli/devserver"
"go.temporal.io/api/workflowservice/v1"
)

Expand Down Expand Up @@ -79,3 +83,103 @@ func (s *SharedServerSuite) TestOperator_Cluster_Health() {
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal(jsonOut["status"], "SERVING")
}

func (s *SharedServerSuite) TestOperator_Cluster_Operations() {
// Create some clusters
standbyCluster1 := StartDevServer(s.Suite.T(), DevServerOptions{
StartOptions: devserver.StartOptions{
MasterClusterName: "standby1",
CurrentClusterName: "standby1",
InitialFailoverVersion: 2,
EnableGlobalNamespace: true,
},
})
defer standbyCluster1.Stop()

standbyCluster2 := StartDevServer(s.Suite.T(), DevServerOptions{
StartOptions: devserver.StartOptions{
MasterClusterName: "standby2",
CurrentClusterName: "standby2",
InitialFailoverVersion: 3,
EnableGlobalNamespace: true,
},
})
defer standbyCluster2.Stop()

// Upsert the clusters

// Text
res := s.Execute(
"operator", "cluster", "upsert",
"--frontend-address", standbyCluster1.Address(),
"--address", s.Address(),
)
s.NoError(res.Err)
out := res.Stdout.String()
s.Equal("Upserted cluster "+standbyCluster1.Address()+"\n", out)

// JSON
res = s.Execute(
"operator", "cluster", "upsert",
"--frontend-address", standbyCluster2.Address(),
"--address", s.Address(),
"-o", "json",
)
s.NoError(res.Err)
var jsonOut map[string]string
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal(jsonOut["frontendAddress"], standbyCluster2.Address())

// List the clusters

// Text
res = s.Execute(
"operator", "cluster", "list",
"--address", s.Address(),
)
s.NoError(res.Err)
out = res.Stdout.String()
s.ContainsOnSameLine(out, "Name", "ClusterId", "Address", "HistoryShardCount", "InitialFailoverVersion", "IsConnectionEnabled")
s.ContainsOnSameLine(out, "active", s.DevServer.Options.ClusterID, s.DevServer.Address(), "1", "1", "true")
s.ContainsOnSameLine(out, standbyCluster1.Options.CurrentClusterName, standbyCluster1.Options.ClusterID, standbyCluster1.Address(), "1", strconv.Itoa(standbyCluster1.Options.InitialFailoverVersion), "false")
s.ContainsOnSameLine(out, standbyCluster2.Options.CurrentClusterName, standbyCluster2.Options.ClusterID, standbyCluster2.Address(), "1", strconv.Itoa(standbyCluster2.Options.InitialFailoverVersion), "false")

// JSON
res = s.Execute(
"operator", "cluster", "list",
"--address", s.Address(),
"-o", "json",
)
s.NoError(res.Err)
out = res.Stdout.String()
// If we improve the output of list commands https://github.com/temporalio/cli/issues/448
// we can do more detailed checks here.
s.ContainsOnSameLine(out, fmt.Sprintf("\"clusterId\": \"%s\"", s.DevServer.Options.ClusterID))
s.ContainsOnSameLine(out, fmt.Sprintf("\"clusterId\": \"%s\"", standbyCluster1.Options.ClusterID))
s.ContainsOnSameLine(out, fmt.Sprintf("\"clusterId\": \"%s\"", standbyCluster2.Options.ClusterID))

// Need to wait for the cluster cache to be updated
time.Sleep(90 * time.Second)

// Remove the clusters

// Test
res = s.Execute(
"operator", "cluster", "remove",
"--name", standbyCluster1.Options.CurrentClusterName,
"--address", s.Address(),
)
s.NoError(res.Err)
s.Equal(fmt.Sprintf("Removed cluster %s\n", standbyCluster1.Options.CurrentClusterName), res.Stdout.String())

// JSON
res = s.Execute(
"operator", "cluster", "remove",
"--name", standbyCluster2.Options.CurrentClusterName,
"--address", s.Address(),
"-o", "json",
)
jsonOut = make(map[string]string)
s.NoError(json.Unmarshal(res.Stdout.Bytes(), &jsonOut))
s.Equal(jsonOut["clusterName"], standbyCluster2.Options.CurrentClusterName)
}
18 changes: 11 additions & 7 deletions temporalcli/commands.server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ func (t *TemporalServerStartDevCommand) run(cctx *CommandContext, args []string)
}
// Prepare options
opts := devserver.StartOptions{
FrontendIP: t.Ip,
FrontendPort: t.Port,
Namespaces: append([]string{"default"}, t.Namespace...),
Logger: cctx.Logger,
DatabaseFile: t.DbFilename,
MetricsPort: t.MetricsPort,
FrontendHTTPPort: t.HttpPort,
FrontendIP: t.Ip,
FrontendPort: t.Port,
Namespaces: append([]string{"default"}, t.Namespace...),
Logger: cctx.Logger,
DatabaseFile: t.DbFilename,
MetricsPort: t.MetricsPort,
FrontendHTTPPort: t.HttpPort,
ClusterID: uuid.NewString(),
MasterClusterName: "active",
CurrentClusterName: "active",
InitialFailoverVersion: 1,
}
if t.LogLevelServer.Value == "never" {
opts.LogLevel = 100
Expand Down
20 changes: 19 additions & 1 deletion temporalcli/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,12 @@ type SharedServerSuite struct {
}

func (s *SharedServerSuite) SetupSuite() {
s.DevServer = StartDevServer(s.Suite.T(), DevServerOptions{})
s.DevServer = StartDevServer(s.Suite.T(), DevServerOptions{
StartOptions: devserver.StartOptions{
// Enable for operator cluster commands
EnableGlobalNamespace: true,
},
})
// Stop server if we fail later
success := false
defer func() {
Expand Down Expand Up @@ -253,6 +258,19 @@ func StartDevServer(t *testing.T, options DevServerOptions) *DevServer {
if len(d.Options.Namespaces) == 0 {
d.Options.Namespaces = []string{"default"}
}
if d.Options.MasterClusterName == "" {
d.Options.MasterClusterName = "active"
}
if d.Options.CurrentClusterName == "" {
d.Options.CurrentClusterName = "active"
}
if d.Options.ClusterID == "" {
d.Options.ClusterID = uuid.New().String()
}
if d.Options.InitialFailoverVersion == 0 {
d.Options.InitialFailoverVersion = 1
}

if d.Options.Logger == nil {
w := &concurrentWriter{w: &d.logOutput, wLock: &d.logOutputLock}
d.Options.Logger = slog.New(slog.NewTextHandler(w, &slog.HandlerOptions{AddSource: true}))
Expand Down
Loading

0 comments on commit 0454314

Please sign in to comment.