Skip to content

Commit

Permalink
Allow draining & undraining of workers (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
jpJuni0r authored Sep 2, 2022
1 parent 8582064 commit 08c6652
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 0 deletions.
21 changes: 21 additions & 0 deletions internal/cmd/workerpools/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,27 @@ func Command() *cli.Command {
Action: (&listWorkersCommand{}).listWorkers,
Before: authenticated.Ensure,
},
{
Name: "drain",
Usage: "Drains a worker.",
Flags: []cli.Flag{
flagWorkerID,
flagPoolIDNamed,
flagWaitUntilDrained,
},
Action: (&drainWorkerCommand{}).drainWorker,
Before: authenticated.Ensure,
},
{
Name: "undrain",
Usage: "Undrains a worker.",
Flags: []cli.Flag{
flagWorkerID,
flagPoolIDNamed,
},
Action: (&undrainWorkerCommand{}).undrainWorker,
Before: authenticated.Ensure,
},
},
},
},
Expand Down
12 changes: 12 additions & 0 deletions internal/cmd/workerpools/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,15 @@ var flagPoolIDNamed = &cli.StringFlag{
Usage: "[Required] ID of the worker pool",
Required: true,
}

var flagWorkerID = &cli.StringFlag{
Name: "id",
Usage: "[Required] ID of the worker",
Required: true,
}

var flagWaitUntilDrained = &cli.BoolFlag{
Name: "wait-until-drained",
Usage: "Wait until the worker is drained",
Required: false,
}
129 changes: 129 additions & 0 deletions internal/cmd/workerpools/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/shurcooL/graphql"
"github.com/spacelift-io/spacectl/internal/cmd"
"github.com/spacelift-io/spacectl/internal/cmd/authenticated"
"github.com/urfave/cli/v2"
"log"
"time"
)

const (
drainWorkerPollInterval = time.Second * 2
)

type worker struct {
Expand All @@ -22,8 +29,18 @@ type listWorkersQuery struct {
} `graphql:"workerPool(id: $workerPool)"`
}

type drainWorkerMutation struct {
Worker struct {
ID string `graphql:"id" json:"id"`
} `graphql:"workerDrainSet(workerPool: $workerPool, id: $worker, drain: $drain)"`
}

type listWorkersCommand struct{}

type drainWorkerCommand struct{}

type undrainWorkerCommand struct{}

func (c *listWorkersCommand) listWorkers(cliCtx *cli.Context) error {
outputFormat, err := cmd.GetOutputFormat(cliCtx)

Expand Down Expand Up @@ -88,3 +105,115 @@ func (c *listWorkersCommand) showOutputsTable(workers []worker) error {
}
return cmd.OutputTable(tableData, true)
}

func (c *drainWorkerCommand) drainWorker(cliCtx *cli.Context) error {
workerID := cliCtx.String(flagWorkerID.Name)
workerPoolID := cliCtx.String(flagPoolIDNamed.Name)
waitUntilDrained := cliCtx.Bool(flagWaitUntilDrained.Name)

var mutation drainWorkerMutation
variables := map[string]interface{}{
"worker": graphql.ID(workerID),
"workerPool": graphql.ID(workerPoolID),
"drain": graphql.Boolean(true),
}

if err := authenticated.Client.Mutate(cliCtx.Context, &mutation, variables); err != nil {
return err
}

if waitUntilDrained {
if err := c.waitUntilDrained(cliCtx, workerID, workerPoolID); err != nil {
return err
}
}

log.Printf("Successfully drained worker %s", workerID)

return nil
}

func (c *drainWorkerCommand) waitUntilDrained(cliCtx *cli.Context, workerID string, workerPoolID string) error {
var workerDrainedAndIdle = false
var firstRun = true

for !workerDrainedAndIdle {
if !firstRun {
time.Sleep(drainWorkerPollInterval)
}

var err error
workerDrainedAndIdle, err = c.drainedWorkerIsIdle(cliCtx, workerID, workerPoolID)

if err != nil {
return err
}

firstRun = false
}

return nil
}

func (c *drainWorkerCommand) drainedWorkerIsIdle(cliCtx *cli.Context, workerID string, workerPoolID string) (bool, error) {
type worker struct {
ID string `graphql:"id"`
Drained bool `graphql:"drained"`
Busy bool `graphql:"busy"`
}

var query struct {
WorkerPool struct {
Workers []worker `graphql:"workers"`
} `graphql:"workerPool(id: $workerPool)"`
}

variables := map[string]interface{}{
"workerPool": graphql.ID(workerPoolID),
}

if err := authenticated.Client.Query(cliCtx.Context, &query, variables); err != nil {
return false, err
}

var workerToDrain *worker

for _, w := range query.WorkerPool.Workers {
if w.ID == workerID {
workerToDrain = &w
break
}
}

if workerToDrain == nil {
return false, errors.New("worker to drain doesn't exist anymore")
}

if !workerToDrain.Drained {
return false, errors.New("worker is no longer flagged as drained")
}

return !workerToDrain.Busy, nil
}

func (c *undrainWorkerCommand) undrainWorker(cliCtx *cli.Context) error {
workerID := cliCtx.String(flagWorkerID.Name)
workerPoolID := cliCtx.String(flagPoolIDNamed.Name)

var mutation drainWorkerMutation
variables := map[string]interface{}{
"worker": graphql.ID(workerID),
"workerPool": graphql.ID(workerPoolID),
"drain": graphql.Boolean(false),
}

err := authenticated.Client.Mutate(cliCtx.Context, &mutation, variables)

if err != nil {
return err
}

log.Printf("Successfully undrained worker %s", workerID)

return nil
}

0 comments on commit 08c6652

Please sign in to comment.