Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support an easy way to set owner reference #1333

Merged
merged 5 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,10 @@
"description": "MergeStrategy is the strategy used to merge a patch. It defaults to \"strategic\" Must be one of: strategic, merge, json",
"type": "string"
},
"setOwnerReference": {
"description": "SetOwnerReference sets the reference to the workflow on the OwnerReference of generated resource.",
"type": "boolean"
},
"successCondition": {
"description": "SuccessCondition is a label selector expression which describes the conditions of the k8s resource in which it is acceptable to proceed to the following step",
"type": "string"
Expand Down
2 changes: 1 addition & 1 deletion cmd/argoexec/commands/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func execResource(action string) error {
wfExecutor.AddError(err)
return err
}
resourceNamespace, resourceName, err := wfExecutor.ExecResource(action, common.ExecutorResourceManifestPath, isDelete)
resourceNamespace, resourceName, err := wfExecutor.ExecResource(action, common.ExecutorResourceManifestPath)
if err != nil {
wfExecutor.AddError(err)
return err
Expand Down
25 changes: 25 additions & 0 deletions examples/k8s-set-owner-reference.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# This example creates a Kubernetes resource that will be deleted
# when the workflow is deleted via Kubernetes GC.
#
# A configmap is used for this example, but the same approach would apply
# to other more interesting resource types.
#
# https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: k8s-set-owner-reference-
spec:
entrypoint: k8s-set-owner-reference
templates:
- name: k8s-set-owner-reference
resource:
action: create
setOwnerReference: true
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
generateName: owned-eg-
data:
some: value
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,6 +825,9 @@ type ResourceTemplate struct {
// Manifest contains the kubernetes manifest
Manifest string `json:"manifest"`

// SetOwnerReference sets the reference to the workflow on the OwnerReference of generated resource.
SetOwnerReference bool `json:"setOwnerReference,omitempty"`

// SuccessCondition is a label selector expression which describes the conditions
// of the k8s resource in which it is acceptable to proceed to the following step
SuccessCondition string `json:"successCondition,omitempty"`
Expand Down
24 changes: 23 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ import (
argokubeerr "github.com/argoproj/pkg/kube/errors"
"github.com/argoproj/pkg/strftime"
jsonpatch "github.com/evanphx/json-patch"
"github.com/ghodss/yaml"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasttemplate"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -1541,16 +1543,36 @@ func (woc *wfOperationCtx) addChildNode(parent string, child string) {

// executeResource is runs a kubectl command against a manifest
func (woc *wfOperationCtx) executeResource(nodeName string, tmpl *wfv1.Template, boundaryID string) *wfv1.NodeStatus {
tmpl = tmpl.DeepCopy()

node := woc.getNodeByName(nodeName)
if node != nil {
return node
}

// Try to unmarshal the given manifest.
obj := unstructured.Unstructured{}
err := yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &obj)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}

if tmpl.Resource.SetOwnerReference {
ownerReferences := obj.GetOwnerReferences()
obj.SetOwnerReferences(append(ownerReferences, *metav1.NewControllerRef(woc.wf, wfv1.SchemaGroupVersionKind)))
bytes, err := yaml.Marshal(obj.Object)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
tmpl.Resource.Manifest = string(bytes)
}

mainCtr := woc.newExecContainer(common.MainContainerName)
mainCtr.Command = []string{"argoexec", "resource", tmpl.Resource.Action}
mainCtr.VolumeMounts = []apiv1.VolumeMount{
volumeMountPodMetadata,
}
_, err := woc.createWorkflowPod(nodeName, *mainCtr, tmpl)
_, err = woc.createWorkflowPod(nodeName, *mainCtr, tmpl)
if err != nil {
return woc.initializeNode(nodeName, wfv1.NodeTypePod, tmpl.Name, boundaryID, wfv1.NodeError, err.Error())
}
Expand Down
161 changes: 156 additions & 5 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/test"
"github.com/argoproj/argo/workflow/common"
"github.com/argoproj/argo/workflow/util"
"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -352,11 +354,11 @@ func TestDAGTemplateParallelismLimit(t *testing.T) {
}

var nestedParallelism = `
# Example with vertical and horizontal scalability
#
# Imagine we have 'M' workers which work in parallel,
# each worker should performs 'N' loops sequentially
#
# Example with vertical and horizontal scalability
#
# Imagine we have 'M' workers which work in parallel,
# each worker should performs 'N' loops sequentially
#
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
Expand Down Expand Up @@ -1003,3 +1005,152 @@ func TestResolveIOPathPlaceholders(t *testing.T) {

assert.Equal(t, []string{"sh", "-c", "head -n 3 <\"/inputs/text/data\" | tee \"/outputs/text/data\" | wc -l > \"/outputs/actual-lines-count/data\""}, pods.Items[0].Spec.Containers[1].Command)
}

var resourceTemplate = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: resource-template
spec:
entrypoint: resource
templates:
- name: resource
resource:
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
name: resource-cm
`

func TestResourceTemplate(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(resourceTemplate)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Phase)

pod, err := controller.kubeclientset.CoreV1().Pods("").Get("resource-template", metav1.GetOptions{})
if !assert.Nil(t, err) {
t.Fatal(err)
}
tmplStr := pod.Annotations[common.AnnotationKeyTemplate]
tmpl := wfv1.Template{}
err = yaml.Unmarshal([]byte(tmplStr), &tmpl)
if !assert.Nil(t, err) {
t.Fatal(err)
}
cm := apiv1.ConfigMap{}
err = yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &cm)
if !assert.Nil(t, err) {
t.Fatal(err)
}
assert.Equal(t, "resource-cm", cm.Name)
assert.Empty(t, cm.ObjectMeta.OwnerReferences)
}

var resourceWithOwnerReferenceTemplate = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: resource-with-ownerreference-template
spec:
entrypoint: start
templates:
- name: start
steps:
- - name: resource-1
template: resource-1
- name: resource-2
template: resource-2
- name: resource-3
template: resource-3
- name: resource-1
resource:
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
name: resource-cm-1
ownerReferences:
- apiVersion: argoproj.io/v1alpha1
blockOwnerDeletion: true
kind: Workflow
name: "manual-ref-name"
uid: "manual-ref-uid"
- name: resource-2
resource:
setOwnerReference: true
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
name: resource-cm-2
- name: resource-3
resource:
setOwnerReference: true
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
name: resource-cm-3
ownerReferences:
- apiVersion: argoproj.io/v1alpha1
blockOwnerDeletion: true
kind: Workflow
name: "manual-ref-name"
uid: "manual-ref-uid"
`

func TestResourceWithOwnerReferenceTemplate(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")

// operate the workflow. it should create a pod.
wf := unmarshalWF(resourceWithOwnerReferenceTemplate)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Phase)

pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
if !assert.Nil(t, err) {
t.Fatal(err)
}

objectMetas := map[string]metav1.ObjectMeta{}
for _, pod := range pods.Items {
tmplStr := pod.Annotations[common.AnnotationKeyTemplate]
tmpl := wfv1.Template{}
err = yaml.Unmarshal([]byte(tmplStr), &tmpl)
if !assert.Nil(t, err) {
t.Fatal(err)
}
cm := apiv1.ConfigMap{}
err = yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &cm)
if !assert.Nil(t, err) {
t.Fatal(err)
}
objectMetas[cm.Name] = cm.ObjectMeta
}
if assert.Equal(t, 1, len(objectMetas["resource-cm-1"].OwnerReferences)) {
assert.Equal(t, "manual-ref-name", objectMetas["resource-cm-1"].OwnerReferences[0].Name)
}
if assert.Equal(t, 1, len(objectMetas["resource-cm-2"].OwnerReferences)) {
assert.Equal(t, "resource-with-ownerreference-template", objectMetas["resource-cm-2"].OwnerReferences[0].Name)
}
if assert.Equal(t, 2, len(objectMetas["resource-cm-3"].OwnerReferences)) {
assert.Equal(t, "manual-ref-name", objectMetas["resource-cm-3"].OwnerReferences[0].Name)
assert.Equal(t, "resource-with-ownerreference-template", objectMetas["resource-cm-3"].OwnerReferences[1].Name)
}
}
3 changes: 2 additions & 1 deletion workflow/executor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import (
)

// ExecResource will run kubectl action against a manifest
func (we *WorkflowExecutor) ExecResource(action string, manifestPath string, isDelete bool) (string, string, error) {
func (we *WorkflowExecutor) ExecResource(action string, manifestPath string) (string, string, error) {
isDelete := action == "delete"
args := []string{
action,
}
Expand Down
10 changes: 10 additions & 0 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"regexp"
"strings"

"github.com/ghodss/yaml"
"github.com/valyala/fasttemplate"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
apivalidation "k8s.io/apimachinery/pkg/util/validation"

"github.com/argoproj/argo/errors"
Expand Down Expand Up @@ -325,6 +327,14 @@ func validateLeaf(scope map[string]interface{}, tmpl *wfv1.Template) error {
mountPaths[art.Path] = fmt.Sprintf("inputs.artifacts.%s", art.Name)
}
}
if tmpl.Resource != nil {
// Try to unmarshal the given manifest.
obj := unstructured.Unstructured{}
err := yaml.Unmarshal([]byte(tmpl.Resource.Manifest), &obj)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.resource.manifest must be a valid yaml", tmpl.Name)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

}
}
if tmpl.ActiveDeadlineSeconds != nil {
if *tmpl.ActiveDeadlineSeconds <= 0 {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.activeDeadlineSeconds must be a positive integer > 0", tmpl.Name)
Expand Down
48 changes: 48 additions & 0 deletions workflow/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1356,3 +1356,51 @@ func TestBaseImageOutputVerify(t *testing.T) {
assert.NoError(t, err)
}
}

var validResourceWorkflow = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: valid-resource-
spec:
entrypoint: whalesay
templates:
- name: whalesay
resource:
manifest: |
apiVersion: v1
kind: ConfigMap
metadata:
name: whalesay-cm
`

// TestValidResourceWorkflow verifies a workflow of a valid resource.
func TestValidResourceWorkflow(t *testing.T) {
wf := unmarshalWf(validResourceWorkflow)
err := ValidateWorkflow(wf, ValidateOpts{})
assert.Equal(t, err, nil)
}

var invalidResourceWorkflow = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: invalid-resource-
spec:
entrypoint: whalesay
templates:
- name: whalesay
resource:
manifest: |
invalid-yaml-line
kind: ConfigMap
metadata:
name: whalesay-cm
`

// TestInvalidResourceWorkflow verifies an error against a workflow of an invalid resource.
func TestInvalidResourceWorkflow(t *testing.T) {
wf := unmarshalWf(invalidResourceWorkflow)
err := ValidateWorkflow(wf, ValidateOpts{})
assert.Error(t, err, "templates.whalesay.resource.manifest must be a valid yaml")
}