Skip to content

Commit

Permalink
Support parameter substitution in the volumes attribute (#1238)
Browse files Browse the repository at this point in the history
  • Loading branch information
elikatsis authored and jessesuen committed Apr 12, 2019
1 parent 6607dca commit 64370a2
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 27 deletions.
7 changes: 7 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,13 @@ func (woc *wfOperationCtx) resolveDependencyReferences(dagCtx *dagContext, task
}

// Perform replacement
// Replace woc.volumes
err := woc.substituteParamsInVolumes(scope.replaceMap())
if err != nil {
return nil, err
}

// Replace task's parameters
taskBytes, err := json.Marshal(task)
if err != nil {
return nil, errors.InternalWrapError(err)
Expand Down
37 changes: 36 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type wfOperationCtx struct {
// globalParams holds any parameters that are available to be referenced
// in the global scope (e.g. workflow.parameters.XXX).
globalParams map[string]string
// volumes holds a DeepCopy of wf.Spec.Volumes to perform substitutions.
// It is then used in addVolumeReferences() when creating a pod.
volumes []apiv1.Volume
// map of pods which need to be labeled with completed=true
completedPods map[string]bool
// deadline is the dealine time in which this operation should relinquish
Expand Down Expand Up @@ -93,6 +96,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper
}),
controller: wfc,
globalParams: make(map[string]string),
volumes: wf.Spec.DeepCopy().Volumes,
completedPods: make(map[string]bool),
deadline: time.Now().UTC().Add(maxOperationTime),
}
Expand Down Expand Up @@ -158,7 +162,14 @@ func (woc *wfOperationCtx) operate() {

woc.setGlobalParameters()

err := woc.createPVCs()
err := woc.substituteParamsInVolumes(woc.globalParams)
if err != nil {
woc.log.Errorf("%s volumes global param substitution error: %+v", woc.wf.ObjectMeta.Name, err)
woc.markWorkflowError(err, true)
return
}

err = woc.createPVCs()
if err != nil {
woc.log.Errorf("%s pvc create error: %+v", woc.wf.ObjectMeta.Name, err)
woc.markWorkflowError(err, true)
Expand Down Expand Up @@ -1667,3 +1678,27 @@ func (woc *wfOperationCtx) checkAndCompress() error {

return nil
}

func (woc *wfOperationCtx) substituteParamsInVolumes(params map[string]string) error {
if woc.volumes == nil {
return nil
}

volumes := woc.volumes
volumesBytes, err := json.Marshal(volumes)
if err != nil {
return errors.InternalWrapError(err)
}
fstTmpl := fasttemplate.New(string(volumesBytes), "{{", "}}")
newVolumesStr, err := common.Replace(fstTmpl, params, true)
if err != nil {
return err
}
var newVolumes []apiv1.Volume
err = json.Unmarshal([]byte(newVolumesStr), &newVolumes)
if err != nil {
return errors.InternalWrapError(err)
}
woc.volumes = newVolumes
return nil
}
15 changes: 7 additions & 8 deletions workflow/controller/steps.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,22 +278,21 @@ func shouldExecute(when string) (bool, error) {
func (woc *wfOperationCtx) resolveReferences(stepGroup []wfv1.WorkflowStep, scope *wfScope) ([]wfv1.WorkflowStep, error) {
newStepGroup := make([]wfv1.WorkflowStep, len(stepGroup))

// Step 0: replace all parameter scope references for volumes
err := woc.substituteParamsInVolumes(scope.replaceMap())
if err != nil {
return nil, err
}

for i, step := range stepGroup {
// Step 1: replace all parameter scope references in the step
// TODO: improve this
stepBytes, err := json.Marshal(step)
if err != nil {
return nil, errors.InternalWrapError(err)
}
replaceMap := make(map[string]string)
for key, val := range scope.scope {
valStr, ok := val.(string)
if ok {
replaceMap[key] = valStr
}
}
fstTmpl := fasttemplate.New(string(stepBytes), "{{", "}}")
newStepStr, err := common.Replace(fstTmpl, replaceMap, true)
newStepStr, err := common.Replace(fstTmpl, scope.replaceMap(), true)
if err != nil {
return nil, err
}
Expand Down
31 changes: 18 additions & 13 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
addSchedulingConstraints(pod, wfSpec, tmpl)
woc.addMetadata(pod, tmpl)

err = addVolumeReferences(pod, wfSpec, tmpl, woc.wf.Status.PersistentVolumeClaims)
err = addVolumeReferences(pod, woc.volumes, tmpl, woc.wf.Status.PersistentVolumeClaims)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -183,8 +183,13 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
pod.ObjectMeta.Annotations[common.AnnotationKeyTemplate] = string(tmplBytes)

// Perform one last variable substitution here. Some variables come from the from workflow
// configmap (e.g. archive location), and were not substituted in executeTemplate.
pod, err = substituteGlobals(pod, woc.globalParams)
// configmap (e.g. archive location) or volumes attribute, and were not substituted
// in executeTemplate.
podParams := woc.globalParams
for _, inParam := range tmpl.Inputs.Parameters {
podParams["inputs.parameters."+inParam.Name] = *inParam.Value
}
pod, err = substitutePodParams(pod, podParams)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -220,20 +225,20 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
return created, nil
}

// substituteGlobals returns a pod spec with global parameter references substituted as well as pod.name
func substituteGlobals(pod *apiv1.Pod, globalParams map[string]string) (*apiv1.Pod, error) {
newGlobalParams := make(map[string]string)
for k, v := range globalParams {
newGlobalParams[k] = v
// substitutePodParams returns a pod spec with parameter references substituted as well as pod.name
func substitutePodParams(pod *apiv1.Pod, podParams map[string]string) (*apiv1.Pod, error) {
newPodParams := make(map[string]string)
for k, v := range podParams {
newPodParams[k] = v
}
newGlobalParams[common.LocalVarPodName] = pod.Name
globalParams = newGlobalParams
newPodParams[common.LocalVarPodName] = pod.Name
podParams = newPodParams
specBytes, err := json.Marshal(pod)
if err != nil {
return nil, err
}
fstTmpl := fasttemplate.New(string(specBytes), "{{", "}}")
newSpecBytes, err := common.Replace(fstTmpl, globalParams, true)
newSpecBytes, err := common.Replace(fstTmpl, podParams, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -455,7 +460,7 @@ func addSchedulingConstraints(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *w

// addVolumeReferences adds any volumeMounts that a container/sidecar is referencing, to the pod.spec.volumes
// These are either specified in the workflow.spec.volumes or the workflow.spec.volumeClaimTemplate section
func addVolumeReferences(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.Template, pvcs []apiv1.Volume) error {
func addVolumeReferences(pod *apiv1.Pod, vols []apiv1.Volume, tmpl *wfv1.Template, pvcs []apiv1.Volume) error {
switch tmpl.GetType() {
case wfv1.TemplateTypeContainer, wfv1.TemplateTypeScript:
default:
Expand All @@ -464,7 +469,7 @@ func addVolumeReferences(pod *apiv1.Pod, wfSpec *wfv1.WorkflowSpec, tmpl *wfv1.T

// getVolByName is a helper to retrieve a volume by its name, either from the volumes or claims section
getVolByName := func(name string) *apiv1.Volume {
for _, vol := range wfSpec.Volumes {
for _, vol := range vols {
if vol.Name == name {
return &vol
}
Expand Down
52 changes: 47 additions & 5 deletions workflow/controller/workflowpod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
// For Docker executor
{
woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorDocker

Expand All @@ -291,7 +291,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
// For Kubelet executor
{
woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorKubelet

Expand All @@ -309,7 +309,7 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
// For K8sAPI executor
{
woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorK8sAPI

Expand All @@ -325,6 +325,48 @@ func TestVolumeAndVolumeMounts(t *testing.T) {
}
}

func TestVolumesPodSubstitution(t *testing.T) {
volumes := []apiv1.Volume{
{
Name: "volume-name",
VolumeSource: apiv1.VolumeSource{
PersistentVolumeClaim: &apiv1.PersistentVolumeClaimVolumeSource{
ClaimName: "{{inputs.parameters.volume-name}}",
},
},
},
}
volumeMounts := []apiv1.VolumeMount{
{
Name: "volume-name",
MountPath: "/test",
},
}
tmpStr := "test-name"
inputParameters := []wfv1.Parameter{
{
Name: "volume-name",
Value: &tmpStr,
},
}

woc := newWoc()
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.wf.Spec.Templates[0].Inputs.Parameters = inputParameters
woc.controller.Config.ContainerRuntimeExecutor = common.ContainerRuntimeExecutorDocker

woc.executeContainer(woc.wf.Spec.Entrypoint, &woc.wf.Spec.Templates[0], "")
podName := getPodName(woc.wf)
pod, err := woc.controller.kubeclientset.CoreV1().Pods("").Get(podName, metav1.GetOptions{})
assert.Nil(t, err)
assert.Equal(t, 3, len(pod.Spec.Volumes))
assert.Equal(t, "volume-name", pod.Spec.Volumes[2].Name)
assert.Equal(t, "test-name", pod.Spec.Volumes[2].PersistentVolumeClaim.ClaimName)
assert.Equal(t, 1, len(pod.Spec.Containers[1].VolumeMounts))
assert.Equal(t, "volume-name", pod.Spec.Containers[1].VolumeMounts[0].Name)
}

func TestOutOfCluster(t *testing.T) {

verifyKubeConfigVolume := func(ctr apiv1.Container, volName, mountPath string) {
Expand Down Expand Up @@ -428,7 +470,7 @@ func TestInitContainers(t *testing.T) {
mirrorVolumeMounts := true

woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.wf.Spec.Templates[0].InitContainers = []wfv1.UserContainer{
{
Expand Down Expand Up @@ -466,7 +508,7 @@ func TestSidecars(t *testing.T) {
mirrorVolumeMounts := true

woc := newWoc()
woc.wf.Spec.Volumes = volumes
woc.volumes = volumes
woc.wf.Spec.Templates[0].Container.VolumeMounts = volumeMounts
woc.wf.Spec.Templates[0].Sidecars = []wfv1.UserContainer{
{
Expand Down

0 comments on commit 64370a2

Please sign in to comment.