Skip to content

Commit

Permalink
Issue #1123 - Fix 'kubectl get' failure if resource namespace is diff…
Browse files Browse the repository at this point in the history
…erent from workflow namespace (#1171)
  • Loading branch information
alexmt authored Jan 25, 2019
1 parent eaaad7d commit 8eb4c66
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 23 deletions.
23 changes: 16 additions & 7 deletions cmd/argoexec/commands/resource.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package commands

import (
"fmt"
"os"

"github.com/argoproj/argo/workflow/common"
Expand Down Expand Up @@ -35,20 +36,28 @@ func execResource(action string) error {
wfExecutor.AddError(err)
return err
}
resourceName, err := wfExecutor.ExecResource(action, common.ExecutorResourceManifestPath)
if err != nil {
isDelete := action == "delete"
if isDelete && (wfExecutor.Template.Resource.SuccessCondition != "" || wfExecutor.Template.Resource.FailureCondition != "" || len(wfExecutor.Template.Outputs.Parameters) > 0) {
err = fmt.Errorf("successCondition, failureCondition and outputs are not supported for delete action")
wfExecutor.AddError(err)
return err
}
err = wfExecutor.WaitResource(resourceName)
resourceNamespace, resourceName, err := wfExecutor.ExecResource(action, common.ExecutorResourceManifestPath, isDelete)
if err != nil {
wfExecutor.AddError(err)
return err
}
err = wfExecutor.SaveResourceParameters(resourceName)
if err != nil {
wfExecutor.AddError(err)
return err
if !isDelete {
err = wfExecutor.WaitResource(resourceNamespace, resourceName)
if err != nil {
wfExecutor.AddError(err)
return err
}
err = wfExecutor.SaveResourceParameters(resourceNamespace, resourceName)
if err != nil {
wfExecutor.AddError(err)
return err
}
}
return nil
}
57 changes: 41 additions & 16 deletions workflow/executor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ package executor
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"os/exec"
"strings"
"time"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/argoproj/argo/errors"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
Expand All @@ -16,28 +19,38 @@ import (
)

// ExecResource will run kubectl action against a manifest
func (we *WorkflowExecutor) ExecResource(action string, manifestPath string) (string, error) {
func (we *WorkflowExecutor) ExecResource(action string, manifestPath string, isDelete bool) (string, string, error) {
args := []string{
action,
}
if action == "delete" {
output := "json"
if isDelete {
args = append(args, "--ignore-not-found")
output = "name"
}
args = append(args, "-f")
args = append(args, manifestPath)
args = append(args, "-o")
args = append(args, "name")
args = append(args, output)
cmd := exec.Command("kubectl", args...)
log.Info(strings.Join(cmd.Args, " "))
out, err := cmd.Output()
if err != nil {
exErr := err.(*exec.ExitError)
errMsg := strings.TrimSpace(string(exErr.Stderr))
return "", errors.New(errors.CodeBadRequest, errMsg)
return "", "", errors.New(errors.CodeBadRequest, errMsg)
}
if action == "delete" {
return "", "", nil
}
obj := unstructured.Unstructured{}
err = json.Unmarshal(out, &obj)
if err != nil {
return "", "", err
}
resourceName := strings.TrimSpace(string(out))
log.Infof(resourceName)
return resourceName, nil
resourceName := fmt.Sprintf("%s.%s/%s", obj.GroupVersionKind().Kind, obj.GroupVersionKind().Group, obj.GetName())
log.Infof("%s/%s", obj.GetNamespace(), resourceName)
return obj.GetNamespace(), resourceName, nil
}

// gjsonLabels is an implementation of labels.Labels interface
Expand All @@ -58,7 +71,7 @@ func (g gjsonLabels) Get(label string) string {
}

// WaitResource waits for a specific resource to satisfy either the success or failure condition
func (we *WorkflowExecutor) WaitResource(resourceName string) error {
func (we *WorkflowExecutor) WaitResource(resourceNamespace string, resourceName string) error {
if we.Template.Resource.SuccessCondition == "" && we.Template.Resource.FailureCondition == "" {
return nil
}
Expand Down Expand Up @@ -86,7 +99,7 @@ func (we *WorkflowExecutor) WaitResource(resourceName string) error {
// Poll intervall of 5 seconds serves as a backoff intervall in case of immediate result reader failure
err := wait.PollImmediateInfinite(time.Duration(time.Second*5),
func() (bool, error) {
isErrRetry, err := checkResourceState(resourceName, successReqs, failReqs)
isErrRetry, err := checkResourceState(resourceNamespace, resourceName, successReqs, failReqs)

if err == nil {
log.Infof("Returning from successful wait for resource %s", resourceName)
Expand Down Expand Up @@ -114,9 +127,9 @@ func (we *WorkflowExecutor) WaitResource(resourceName string) error {
}

// Function to do the kubectl get -w command and then waiting on json reading.
func checkResourceState(resourceName string, successReqs labels.Requirements, failReqs labels.Requirements) (bool, error) {
func checkResourceState(resourceNamespace string, resourceName string, successReqs labels.Requirements, failReqs labels.Requirements) (bool, error) {

cmd, reader, err := startKubectlWaitCmd(resourceName)
cmd, reader, err := startKubectlWaitCmd(resourceNamespace, resourceName)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -179,8 +192,12 @@ func checkResourceState(resourceName string, successReqs labels.Requirements, fa
}

// Start Kubectl command Get with -w return error if unable to start command
func startKubectlWaitCmd(resourceName string) (*exec.Cmd, *bufio.Reader, error) {
cmd := exec.Command("kubectl", "get", resourceName, "-w", "-o", "json")
func startKubectlWaitCmd(resourceNamespace string, resourceName string) (*exec.Cmd, *bufio.Reader, error) {
args := []string{"get", resourceName, "-w", "-o", "json"}
if resourceNamespace != "" {
args = append(args, "-n", resourceNamespace)
}
cmd := exec.Command("kubectl", args...)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, nil, errors.InternalWrapError(err)
Expand Down Expand Up @@ -216,7 +233,7 @@ func readJSON(reader *bufio.Reader) ([]byte, error) {
}

// SaveResourceParameters will save any resource output parameters
func (we *WorkflowExecutor) SaveResourceParameters(resourceName string) error {
func (we *WorkflowExecutor) SaveResourceParameters(resourceNamespace string, resourceName string) error {
if len(we.Template.Outputs.Parameters) == 0 {
log.Infof("No output parameters")
return nil
Expand All @@ -228,9 +245,17 @@ func (we *WorkflowExecutor) SaveResourceParameters(resourceName string) error {
}
var cmd *exec.Cmd
if param.ValueFrom.JSONPath != "" {
cmd = exec.Command("kubectl", "get", resourceName, "-o", fmt.Sprintf("jsonpath='%s'", param.ValueFrom.JSONPath))
args := []string{"get", resourceName, "-o", fmt.Sprintf("jsonpath='%s'", param.ValueFrom.JSONPath)}
if resourceNamespace != "" {
args = append(args, "-n", resourceNamespace)
}
cmd = exec.Command("kubectl", args...)
} else if param.ValueFrom.JQFilter != "" {
cmdStr := fmt.Sprintf("kubectl get %s -o json | jq -c '%s'", resourceName, param.ValueFrom.JQFilter)
resArgs := []string{resourceName}
if resourceNamespace != "" {
resArgs = append(resArgs, "-n", resourceNamespace)
}
cmdStr := fmt.Sprintf("kubectl get %s -o json | jq -c '%s'", strings.Join(resArgs, " "), param.ValueFrom.JQFilter)
cmd = exec.Command("sh", "-c", cmdStr)
} else {
continue
Expand Down

0 comments on commit 8eb4c66

Please sign in to comment.