Skip to content
This repository has been archived by the owner on May 31, 2024. It is now read-only.

Added command delete along with subcommand executions for terminating #34

Merged
merged 3 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions cmd/delete/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package delete

import (
cmdcore "github.com/lyft/flytectl/cmd/core"

"github.com/spf13/cobra"
)

// Long descriptions are whitespace sensitive when generating docs using sphinx.
const (
deleteCmdShort = `Used for terminating/deleting various flyte resources including tasks/workflows/launchplans/executions/project.`
deleteCmdLong = `
Example Delete executions.
::

bin/flytectl delete execution kxd1i72850 -d development -p flytesnacks
`
)

// RemoteDeleteCommand will return delete command
func RemoteDeleteCommand() *cobra.Command {
deleteCmd := &cobra.Command{
Use: "delete",
Short: deleteCmdShort,
Long: deleteCmdLong,
}
terminateResourcesFuncs := map[string]cmdcore.CommandEntry{
"execution": {CmdFunc: terminateExecutionFunc, Aliases: []string{"executions"}, Short: execCmdShort, Long: execCmdLong},
}
cmdcore.AddCommands(deleteCmd, terminateResourcesFuncs)
return deleteCmd
}
25 changes: 25 additions & 0 deletions cmd/delete/delete_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package delete

import (
"sort"
"testing"

"github.com/stretchr/testify/assert"
)

func TestDeleteCommand(t *testing.T) {
deleteCommand := RemoteDeleteCommand()
assert.Equal(t, deleteCommand.Use, "delete")
assert.Equal(t, deleteCommand.Short, deleteCmdShort)
assert.Equal(t, deleteCommand.Long, deleteCmdLong)
assert.Equal(t, len(deleteCommand.Commands()), 1)
cmdNouns := deleteCommand.Commands()
// Sort by Use value.
sort.Slice(cmdNouns, func(i, j int) bool {
return cmdNouns[i].Use < cmdNouns[j].Use
})
assert.Equal(t, cmdNouns[0].Use, "execution")
assert.Equal(t, cmdNouns[0].Aliases, []string{"executions"})
assert.Equal(t, cmdNouns[0].Short, execCmdShort)
assert.Equal(t, cmdNouns[0].Long, execCmdLong)
}
81 changes: 81 additions & 0 deletions cmd/delete/execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package delete

import (
"context"

"github.com/lyft/flytectl/cmd/config"
cmdCore "github.com/lyft/flytectl/cmd/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
)

// Long descriptions are whitespace sensitive when generating docs using sphinx.
const (
execCmdShort = `Terminate/Delete execution resources.`
execCmdLong = `
Terminate executions.(execution,executions can be used interchangeably in these commands)

Task executions can be aborted only if they are in non-terminal state i.e if they are FAILED,ABORTED or SUCCEEDED then
calling terminate on them has no effect.

Terminate a single execution with its name

::

bin/flytectl delete execution c6a51x2l9e -d development -p flytesnacks

You can get executions to check its state.

::

bin/flytectl get execution -d development -p flytesnacks
------------ ------------------------------------------------------------------------- ---------- ----------- -------------------------------- ---------------
| NAME (7) | WORKFLOW NAME | TYPE | PHASE | STARTED | ELAPSED TIME |
------------ ------------------------------------------------------------------------- ---------- ----------- -------------------------------- ---------------
| c6a51x2l9e | recipes.core.basic.lp.go_greet | WORKFLOW | ABORTED | 2021-02-17T08:13:04.680476300Z | 15.540361300s |
------------ ------------------------------------------------------------------------- ---------- ----------- -------------------------------- ---------------

Terminate multiple executions with there names
::

bin/flytectl delete execution eeam9s8sny p4wv4hwgc4 -d development -p flytesnacks

Similarly you can get executions to find the state of previously terminated executions.

::

bin/flytectl get execution -d development -p flytesnacks
------------ ------------------------------------------------------------------------- ---------- ----------- -------------------------------- ---------------
| NAME (7) | WORKFLOW NAME | TYPE | PHASE | STARTED | ELAPSED TIME |
------------ ------------------------------------------------------------------------- ---------- ----------- -------------------------------- ---------------
| c6a51x2l9e | recipes.core.basic.lp.go_greet | WORKFLOW | ABORTED | 2021-02-17T08:13:04.680476300Z | 15.540361300s |
------------ ------------------------------------------------------------------------- ---------- ----------- -------------------------------- ---------------
| eeam9s8sny | recipes.core.basic.lp.go_greet | WORKFLOW | ABORTED | 2021-02-17T08:14:04.803084100Z | 42.306385500s |
------------ ------------------------------------------------------------------------- ---------- ----------- -------------------------------- ---------------
| p4wv4hwgc4 | recipes.core.basic.lp.go_greet | WORKFLOW | ABORTED | 2021-02-17T08:14:27.476307400Z | 19.727504400s |
------------ ------------------------------------------------------------------------- ---------- ----------- -------------------------------- ---------------

Usage
`
)

func terminateExecutionFunc(ctx context.Context, args []string, cmdCtx cmdCore.CommandContext) error {
for i := 0; i < len(args); i++ {
name := args[i]
logger.Infof(ctx, "Terminating execution of %v execution ", name)
_, err := cmdCtx.AdminClient().TerminateExecution(ctx, &admin.ExecutionTerminateRequest{
Id: &core.WorkflowExecutionIdentifier{
Project: config.GetConfig().Project,
Domain: config.GetConfig().Domain,
Name: name,
},
})
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also have continue on Error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this option too

In case of register files where we have this continueOnerror option where user can

cat1 ) give list of files from a folder using wildcard/archive and continue the registration on failures , this option seems very much required since without it the only way he has is to remove files in the folder /archive or have some neat way of filtering the bad files or wait until we have an option to accept the list of files from piped o/p

cat2) Another way he can choose to pass the files is by explicitly passing each files absolute path . In this case , the option to remove is much easier since we cant expect him to pass in a long list of files manually .

The execution failures as of now are similar to cat2) where he specifies the list of execution id's and hence removing the failed and reattempting them is easier.

But definitely it would be good to have this option and will take it up in next MR.

logger.Errorf(ctx, "Failed in terminating execution of %v execution due to %v ", name, err)
return err
}
logger.Infof(ctx, "Terminated execution of %v execution ", name)
}
return nil
}
82 changes: 82 additions & 0 deletions cmd/delete/execution_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package delete

import (
"context"
"errors"
"io"
"testing"

cmdCore "github.com/lyft/flytectl/cmd/core"
"github.com/lyft/flyteidl/clients/go/admin/mocks"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"

"github.com/stretchr/testify/assert"
)

var (
ctx context.Context
args []string
)

func setup() {
ctx = context.Background()
args = []string{}
}

func TestTerminateExecutionFunc(t *testing.T) {
setup()
args = append(args, "exec1", "exec2")
mockClient := new(mocks.AdminServiceClient)
mockOutStream := new(io.Writer)
cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream)
terminateExecRequests := []*admin.ExecutionTerminateRequest{
{Id: &core.WorkflowExecutionIdentifier{Name: "exec1"}},
{Id: &core.WorkflowExecutionIdentifier{Name: "exec2"}},
}
terminateExecResponse := &admin.ExecutionTerminateResponse{}
mockClient.OnTerminateExecutionMatch(ctx, terminateExecRequests[0]).Return(terminateExecResponse, nil)
mockClient.OnTerminateExecutionMatch(ctx, terminateExecRequests[1]).Return(terminateExecResponse, nil)
err := terminateExecutionFunc(ctx, args, cmdCtx)
assert.Nil(t, err)
mockClient.AssertCalled(t, "TerminateExecution", ctx, terminateExecRequests[0])
mockClient.AssertCalled(t, "TerminateExecution", ctx, terminateExecRequests[1])
}

func TestTerminateExecutionFuncWithError(t *testing.T) {
setup()
args = append(args, "exec1", "exec2")
mockClient := new(mocks.AdminServiceClient)
mockOutStream := new(io.Writer)
cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream)
terminateExecRequests := []*admin.ExecutionTerminateRequest{
{Id: &core.WorkflowExecutionIdentifier{Name: "exec1"}},
{Id: &core.WorkflowExecutionIdentifier{Name: "exec2"}},
}
terminateExecResponse := &admin.ExecutionTerminateResponse{}
mockClient.OnTerminateExecutionMatch(ctx, terminateExecRequests[0]).Return(nil, errors.New("failed to terminate"))
mockClient.OnTerminateExecutionMatch(ctx, terminateExecRequests[1]).Return(terminateExecResponse, nil)
err := terminateExecutionFunc(ctx, args, cmdCtx)
assert.Equal(t, errors.New("failed to terminate"), err)
mockClient.AssertCalled(t, "TerminateExecution", ctx, terminateExecRequests[0])
mockClient.AssertNotCalled(t, "TerminateExecution", ctx, terminateExecRequests[1])
}

func TestTerminateExecutionFuncWithPartialSuccess(t *testing.T) {
setup()
args = append(args, "exec1", "exec2")
mockClient := new(mocks.AdminServiceClient)
mockOutStream := new(io.Writer)
cmdCtx := cmdCore.NewCommandContext(mockClient, *mockOutStream)
terminateExecRequests := []*admin.ExecutionTerminateRequest{
{Id: &core.WorkflowExecutionIdentifier{Name: "exec1"}},
{Id: &core.WorkflowExecutionIdentifier{Name: "exec2"}},
}
terminateExecResponse := &admin.ExecutionTerminateResponse{}
mockClient.OnTerminateExecutionMatch(ctx, terminateExecRequests[0]).Return(terminateExecResponse, nil)
mockClient.OnTerminateExecutionMatch(ctx, terminateExecRequests[1]).Return(nil, errors.New("failed to terminate"))
err := terminateExecutionFunc(ctx, args, cmdCtx)
assert.Equal(t, errors.New("failed to terminate"), err)
mockClient.AssertCalled(t, "TerminateExecution", ctx, terminateExecRequests[0])
mockClient.AssertCalled(t, "TerminateExecution", ctx, terminateExecRequests[1])
}
7 changes: 5 additions & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import (
"context"
"fmt"

"github.com/lyft/flytectl/cmd/config"
"github.com/lyft/flytectl/cmd/delete"
"github.com/lyft/flytectl/cmd/get"
"github.com/lyft/flytectl/cmd/register"
"github.com/lyft/flytectl/cmd/update"
"github.com/lyft/flytectl/pkg/printer"
stdConfig "github.com/lyft/flytestdlib/config"
"github.com/lyft/flytestdlib/config/viper"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/cobra/doc"

"github.com/lyft/flytectl/cmd/config"
)

var (
Expand All @@ -28,6 +29,7 @@ func newRootCmd() *cobra.Command {
Long: "flytectl is CLI tool written in go to interact with flyteadmin service",
Short: "flyetcl CLI tool",
Use: "flytectl",
DisableAutoGenTag: true,
}

rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "",
Expand All @@ -45,6 +47,7 @@ func newRootCmd() *cobra.Command {
rootCmd.AddCommand(get.CreateGetCommand())
rootCmd.AddCommand(update.CreateUpdateCommand())
rootCmd.AddCommand(register.RemoteRegisterCommand())
rootCmd.AddCommand(delete.RemoteDeleteCommand())
config.GetConfig()

return rootCmd
Expand Down
6 changes: 2 additions & 4 deletions cmd/update/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (

// Long descriptions are whitespace sensitive when generating docs using sphinx.
const (
updateUse = "update"
updateShort = `
Used for updating flyte resources eg: project.
`
updateUse = "update"
updateShort = `Used for updating flyte resources eg: project.`
updatecmdLong = `
Currently this command only provides subcommands to update project.
Takes input project which need to be archived or unarchived. Name of the project to be updated is mandatory field.
Expand Down
7 changes: 5 additions & 2 deletions cmd/update/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (

func TestUpdateCommand(t *testing.T) {
updateCommand := CreateUpdateCommand()
assert.Equal(t, updateCommand.Use, "update")
assert.Equal(t, updateCommand.Short, "\nUsed for updating flyte resources eg: project.\n")
assert.Equal(t, updateCommand.Use, updateUse)
assert.Equal(t, updateCommand.Short, updateShort)
assert.Equal(t, updateCommand.Long, updatecmdLong)
assert.Equal(t, len(updateCommand.Commands()), 1)
cmdNouns := updateCommand.Commands()
// Sort by Use value.
Expand All @@ -19,4 +20,6 @@ func TestUpdateCommand(t *testing.T) {
})
assert.Equal(t, cmdNouns[0].Use, "project")
assert.Equal(t, cmdNouns[0].Aliases, []string{"projects"})
assert.Equal(t, cmdNouns[0].Short, projectShort)
assert.Equal(t, cmdNouns[0].Long, projectLong)
}
6 changes: 2 additions & 4 deletions docs/source/gen/flytectl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ SEE ALSO
~~~~~~~~

* :doc:`flytectl_config` - Runs various config commands, look at the help of this command to get a list of available commands..
* :doc:`flytectl_delete` - Used for terminating/deleting various flyte resources including tasks/workflows/launchplans/executions/project.
* :doc:`flytectl_get` - Used for fetching various flyte resources including tasks/workflows/launchplans/executions/project.
* :doc:`flytectl_register` - Registers tasks/workflows/launchplans from list of generated serialized files.
* :doc:`flytectl_update` -
Used for updating flyte resources eg: project.

* :doc:`flytectl_update` - Used for updating flyte resources eg: project.
* :doc:`flytectl_version` - Displays version information for the client and server.

*Auto generated by spf13/cobra on 16-Feb-2021*
1 change: 0 additions & 1 deletion docs/source/gen/flytectl_config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,3 @@ SEE ALSO
* :doc:`flytectl_config_discover` - Searches for a config in one of the default search paths.
* :doc:`flytectl_config_validate` - Validates the loaded config.

*Auto generated by spf13/cobra on 16-Feb-2021*
1 change: 0 additions & 1 deletion docs/source/gen/flytectl_config_discover.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,3 @@ SEE ALSO

* :doc:`flytectl_config` - Runs various config commands, look at the help of this command to get a list of available commands..

*Auto generated by spf13/cobra on 16-Feb-2021*
1 change: 0 additions & 1 deletion docs/source/gen/flytectl_config_validate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,3 @@ SEE ALSO

* :doc:`flytectl_config` - Runs various config commands, look at the help of this command to get a list of available commands..

*Auto generated by spf13/cobra on 16-Feb-2021*
75 changes: 75 additions & 0 deletions docs/source/gen/flytectl_delete.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
.. _flytectl_delete:

flytectl delete
---------------

Used for terminating/deleting various flyte resources including tasks/workflows/launchplans/executions/project.

Synopsis
~~~~~~~~



Example Delete executions.
::

bin/flytectl delete execution kxd1i72850 -d development -p flytesnacks


Options
~~~~~~~

::

-h, --help help for delete

Options inherited from parent commands
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

::

--admin.authorizationHeader string Custom metadata header to pass JWT
--admin.authorizationServerUrl string This is the URL to your IDP's authorization server'
--admin.clientId string Client ID
--admin.clientSecretLocation string File containing the client secret
--admin.endpoint string For admin types, specify where the uri of the service is located.
--admin.insecure Use insecure connection.
--admin.maxBackoffDelay string Max delay for grpc backoff (default "8s")
--admin.maxRetries int Max number of gRPC retries (default 4)
--admin.perRetryTimeout string gRPC per retry timeout (default "15s")
--admin.scopes strings List of scopes to request
--admin.tokenUrl string Your IDPs token endpoint
--admin.useAuth Whether or not to try to authenticate with options below
--adminutils.batchSize int Maximum number of records to retrieve per call. (default 100)
--adminutils.maxRecords int Maximum number of records to retrieve. (default 500)
--config string config file (default is $HOME/config.yaml)
-d, --domain string Specifies the Flyte project's domain.
--logger.formatter.type string Sets logging format type. (default "json")
--logger.level int Sets the minimum logging level. (default 4)
--logger.mute Mutes all logs regardless of severity. Intended for benchmarks/tests only.
--logger.show-source Includes source code location in logs.
-o, --output string Specifies the output type - supported formats [TABLE JSON YAML] (default "TABLE")
-p, --project string Specifies the Flyte project.
--root.domain string Specified the domain to work on.
--root.output string Specified the output type.
--root.project string Specifies the project to work on.
--storage.cache.max_size_mbs int Maximum size of the cache where the Blob store data is cached in-memory. If not specified or set to 0, cache is not used
--storage.cache.target_gc_percent int Sets the garbage collection target percentage.
--storage.connection.access-key string Access key to use. Only required when authtype is set to accesskey.
--storage.connection.auth-type string Auth Type to use [iam, accesskey]. (default "iam")
--storage.connection.disable-ssl Disables SSL connection. Should only be used for development.
--storage.connection.endpoint string URL for storage client to connect to.
--storage.connection.region string Region to connect to. (default "us-east-1")
--storage.connection.secret-key string Secret to use when accesskey is set.
--storage.container string Initial container to create -if it doesn't exist-.'
--storage.defaultHttpClient.timeout string Sets time out on the http client. (default "0s")
--storage.enable-multicontainer If this is true, then the container argument is overlooked and redundant. This config will automatically open new connections to new containers/buckets as they are encountered
--storage.limits.maxDownloadMBs int Maximum allowed download size (in MBs) per call. (default 2)
--storage.type string Sets the type of storage to configure [s3/minio/local/mem/stow]. (default "s3")

SEE ALSO
~~~~~~~~

* :doc:`flytectl` - flyetcl CLI tool
* :doc:`flytectl_delete_execution` - Terminate/Delete execution resources.

Loading