Skip to content

Commit

Permalink
support separator string for iteration string param (kubeflow#790)
Browse files Browse the repository at this point in the history
* support separator string for iteration string param

* address review comments
  • Loading branch information
wzhanw authored Nov 30, 2021
1 parent 61e4f36 commit 6551207
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 160 deletions.
5 changes: 0 additions & 5 deletions tekton-catalog/pipeline-loops/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops
go 1.13

require (
contrib.go.opencensus.io/exporter/stackdriver v0.13.5 // indirect
github.com/go-kit/kit v0.10.0 // indirect
github.com/go-openapi/validate v0.19.5 // indirect
github.com/google/go-cmp v0.5.6
github.com/hashicorp/go-multierror v1.1.1
github.com/tektoncd/pipeline v0.30.0
go.uber.org/zap v1.19.1
gomodules.xyz/jsonpatch/v2 v2.2.0
gopkg.in/evanphx/json-patch.v4 v4.9.0 // indirect
honnef.co/go/tools v0.0.1-2020.1.5 // indirect
k8s.io/api v0.21.4
k8s.io/apimachinery v0.21.4
k8s.io/client-go v0.21.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type PipelineLoopSpec struct {
// IterateParam is the name of the task parameter that is iterated upon.
IterateParam string `json:"iterateParam"`

// The separator for IterateParam if the IterateParam is a strings with separator char, this field is optional.
// +optional
IterateParamSeparator string `json:"iterateParamStringSeparator,omitempty"`

IterateNumeric string `json:"iterateNumeric"`

// Time after which the TaskRun times out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"context"
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/runtime"
duckv1 "knative.dev/pkg/apis/duck/v1"
"log"
"reflect"
"strconv"
"strings"
"time"

"k8s.io/apimachinery/pkg/runtime"
duckv1 "knative.dev/pkg/apis/duck/v1"

"github.com/hashicorp/go-multierror"

"github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop"
Expand Down Expand Up @@ -586,6 +588,8 @@ func computeIterations(run *v1alpha1.Run, tls *pipelineloopv1alpha1.PipelineLoop
step := -1
to := -1
iterationElements := []interface{}{}
iterationParamStr := ""
iterationParamStrSeparator := ""
for _, p := range run.Spec.Params {
if tls.IterateNumeric != "" {
if p.Name == "from" {
Expand All @@ -597,53 +601,64 @@ func computeIterations(run *v1alpha1.Run, tls *pipelineloopv1alpha1.PipelineLoop
if p.Name == "to" {
to, _ = strconv.Atoi(p.Value.StringVal)
}
} else if p.Name == tls.IterateParam {
if p.Value.Type == v1beta1.ParamTypeString {
// Transfer p.Value to Array.
var strings []string
var ints []int
var dictsString []map[string]string
var dictsInt []map[string]int
errString := json.Unmarshal([]byte(p.Value.StringVal), &strings)
errInt := json.Unmarshal([]byte(p.Value.StringVal), &ints)
errDictString := json.Unmarshal([]byte(p.Value.StringVal), &dictsString)
errDictInt := json.Unmarshal([]byte(p.Value.StringVal), &dictsInt)
if errString != nil && errInt != nil && errDictString != nil && errDictInt != nil {
return 0, iterationElements, fmt.Errorf("The value of the iterate parameter %q can not transfer to array", tls.IterateParam)
} else {
if p.Name == tls.IterateParam {
if p.Value.Type == v1beta1.ParamTypeString {
iterationParamStr = strings.Trim(p.Value.StringVal, " ")
}

if errString == nil {
numberOfIterations = len(strings)
for _, v := range strings {
iterationElements = append(iterationElements, v)
}
break
} else if errInt == nil {
numberOfIterations = len(ints)
for _, v := range ints {
iterationElements = append(iterationElements, v)
}
break
} else if errDictString == nil {
numberOfIterations = len(dictsString)
for _, v := range dictsString {
iterationElements = append(iterationElements, v)
}
break
} else if errDictInt == nil {
numberOfIterations = len(dictsInt)
for _, v := range dictsInt {
if p.Value.Type == v1beta1.ParamTypeArray {
numberOfIterations = len(p.Value.ArrayVal)
for _, v := range p.Value.ArrayVal {
iterationElements = append(iterationElements, v)
}
break
}
}
if p.Value.Type == v1beta1.ParamTypeArray {
numberOfIterations = len(p.Value.ArrayVal)
for _, v := range p.Value.ArrayVal {
if p.Name == tls.IterateParamSeparator {
iterationParamStrSeparator = p.Value.StringVal
}
}
}
if iterationParamStr != "" {
// Transfer p.Value to Array.
if iterationParamStrSeparator != "" {
stringArr := strings.Split(iterationParamStr, iterationParamStrSeparator)
numberOfIterations = len(stringArr)
for _, v := range stringArr {
iterationElements = append(iterationElements, v)
}
} else {
var strings []string
var ints []int
var dictsString []map[string]string
var dictsInt []map[string]int
errString := json.Unmarshal([]byte(iterationParamStr), &strings)
errInt := json.Unmarshal([]byte(iterationParamStr), &ints)
errDictString := json.Unmarshal([]byte(iterationParamStr), &dictsString)
errDictInt := json.Unmarshal([]byte(iterationParamStr), &dictsInt)
if errString != nil && errInt != nil && errDictString != nil && errDictInt != nil {
return 0, iterationElements, fmt.Errorf("The value of the iterate parameter %q can not transfer to array", tls.IterateParam)
}
if errString == nil {
numberOfIterations = len(strings)
for _, v := range strings {
iterationElements = append(iterationElements, v)
}
} else if errInt == nil {
numberOfIterations = len(ints)
for _, v := range ints {
iterationElements = append(iterationElements, v)
}
} else if errDictString == nil {
numberOfIterations = len(dictsString)
for _, v := range dictsString {
iterationElements = append(iterationElements, v)
}
} else if errDictInt == nil {
numberOfIterations = len(dictsInt)
for _, v := range dictsInt {
iterationElements = append(iterationElements, v)
}
break
}
}
}
Expand All @@ -669,6 +684,8 @@ func getParameters(run *v1alpha1.Run, tls *pipelineloopv1alpha1.PipelineLoopSpec
var out []v1beta1.Param
if tls.IterateParam != "" {
// IterateParam defined
var iterationParam, iterationParamStrSeparator *v1beta1.Param
var item, separator v1beta1.Param
for i, p := range run.Spec.Params {
if p.Name == tls.IterateParam {
if p.Value.Type == v1beta1.ParamTypeArray {
Expand All @@ -678,42 +695,59 @@ func getParameters(run *v1alpha1.Run, tls *pipelineloopv1alpha1.PipelineLoopSpec
})
}
if p.Value.Type == v1beta1.ParamTypeString {
var strings []string
var ints []int
var dictsString []map[string]string
var dictsInt []map[string]int
errString := json.Unmarshal([]byte(p.Value.StringVal), &strings)
errInt := json.Unmarshal([]byte(p.Value.StringVal), &ints)
errDictString := json.Unmarshal([]byte(p.Value.StringVal), &dictsString)
errDictInt := json.Unmarshal([]byte(p.Value.StringVal), &dictsInt)
if errString == nil {
item = p
iterationParam = &item
}
} else if p.Name == tls.IterateParamSeparator {
separator = p
iterationParamStrSeparator = &separator
} else {
out = append(out, run.Spec.Params[i])
}
}
if iterationParam != nil {
if iterationParamStrSeparator != nil {
iterationParamStr := strings.Trim(iterationParam.Value.StringVal, " ")
stringArr := strings.Split(iterationParamStr, iterationParamStrSeparator.Value.StringVal)
out = append(out, v1beta1.Param{
Name: iterationParam.Name,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: stringArr[iteration-1]},
})

} else {
var strings []string
var ints []int
var dictsString []map[string]string
var dictsInt []map[string]int
errString := json.Unmarshal([]byte(iterationParam.Value.StringVal), &strings)
errInt := json.Unmarshal([]byte(iterationParam.Value.StringVal), &ints)
errDictString := json.Unmarshal([]byte(iterationParam.Value.StringVal), &dictsString)
errDictInt := json.Unmarshal([]byte(iterationParam.Value.StringVal), &dictsInt)
if errString == nil {
out = append(out, v1beta1.Param{
Name: iterationParam.Name,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: strings[iteration-1]},
})
} else if errInt == nil {
out = append(out, v1beta1.Param{
Name: iterationParam.Name,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: strconv.Itoa(ints[iteration-1])},
})
} else if errDictString == nil {
for dictParam := range dictsString[iteration-1] {
out = append(out, v1beta1.Param{
Name: p.Name,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: strings[iteration-1]},
Name: iterationParam.Name + "-subvar-" + dictParam,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: dictsString[iteration-1][dictParam]},
})
} else if errInt == nil {
}
} else if errDictInt == nil {
for dictParam := range dictsInt[iteration-1] {
out = append(out, v1beta1.Param{
Name: p.Name,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: strconv.Itoa(ints[iteration-1])},
Name: iterationParam.Name + "-subvar-" + dictParam,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: strconv.Itoa(dictsInt[iteration-1][dictParam])},
})
} else if errDictString == nil {
for dictParam := range dictsString[iteration-1] {
out = append(out, v1beta1.Param{
Name: p.Name + "-subvar-" + dictParam,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: dictsString[iteration-1][dictParam]},
})
}
} else if errDictInt == nil {
for dictParam := range dictsInt[iteration-1] {
out = append(out, v1beta1.Param{
Name: p.Name + "-subvar-" + dictParam,
Value: v1beta1.ArrayOrString{Type: v1beta1.ParamTypeString, StringVal: strconv.Itoa(dictsInt[iteration-1][dictParam])},
})
}
}
}
} else {
out = append(out, run.Spec.Params[i])
}
}
} else {
Expand Down
Loading

0 comments on commit 6551207

Please sign in to comment.