Skip to content

Commit

Permalink
feat: Add new lambda expr. Fixes argoproj#9529
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec committed Sep 6, 2022
1 parent 83501a4 commit 4266933
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 0 deletions.
56 changes: 56 additions & 0 deletions util/mapper/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package mapper

import (
"reflect"
)

// Map translate (maps) any object by recursively applying the mapper func to each field, array element, and map value.
// Among intended use cases is translating a data structure (e.g. from English to Spanish).
func Map(x any, m func(any) (any, error)) (any, error) {
value, err := _map(reflect.ValueOf(x), m)
return value.Interface(), err
}

func _map(x reflect.Value, m func(any) (any, error)) (reflect.Value, error) {
if x.IsZero() {
return x, nil
}
switch x.Kind() {
case reflect.Ptr:
y, err := _map(x.Elem(), m)
return y.Addr(), err
case reflect.Struct:
y := reflect.Indirect(reflect.New(x.Type()))
for i := 0; i < x.NumField(); i++ {
g, err := _map(x.Field(i), m)
if err != nil {
return y, err
}
y.Field(i).Set(g)
}
return y, nil
case reflect.Array, reflect.Slice:
y := reflect.Indirect(reflect.MakeSlice(x.Type(), x.Len(), x.Len()))
for i := 0; i < x.Len(); i++ {
g, err := _map(x.Index(i), m)
if err != nil {
return y, err
}
y.Index(i).Set(g)
}
return y, nil
case reflect.Map:
y := reflect.Indirect(reflect.MakeMap(x.Type()))
for _, key := range x.MapKeys() {
g, err := _map(x.MapIndex(key), m)
if err != nil {
return y, err
}
y.SetMapIndex(key, g)
}
return y, nil
default:
y, err := m(x.Interface())
return reflect.ValueOf(y), err
}
}
53 changes: 53 additions & 0 deletions util/mapper/map_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package mapper

import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/stretchr/testify/assert"
"testing"
)

type s struct {
String string
}

func TestVisit(t *testing.T) {
v := func(x any) (any, error) {
s, ok := x.(string)
if ok && s == "foo" {
return "bar", nil
}
return x, nil
}
t.Run("string", func(t *testing.T) {
x, err := Map("foo", v)
assert.NoError(t, err)
assert.Equal(t, "bar", x)
})
t.Run("Struct", func(t *testing.T) {
x, err := Map(s{String: "foo"}, v)
assert.NoError(t, err)
assert.Equal(t, "bar", x.(s).String)
})
t.Run("array", func(t *testing.T) {
x, err := Map([]string{"foo"}, v)
assert.NoError(t, err)
assert.Equal(t, []string{"bar"}, x)
})
t.Run("map", func(t *testing.T) {
x, err := Map(map[string]string{"x": "foo"}, v)
assert.NoError(t, err)
assert.Equal(t, map[string]string{"x": "bar"}, x)
})
t.Run("WorkflowSpec", func(t *testing.T) {
y, err := Map(wfv1.WorkflowSpec{}, v)
assert.NoError(t, err)
assert.Equal(t, wfv1.WorkflowSpec{}, y)
})
t.Run("*WorkflowSpec", func(t *testing.T) {
y, err := Map(&wfv1.WorkflowSpec{
Entrypoint: "foo",
}, v)
assert.NoError(t, err)
assert.Equal(t, &wfv1.WorkflowSpec{Entrypoint: "bar"}, y)
})
}
35 changes: 35 additions & 0 deletions util/template/eval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package template

import (
"fmt"
"github.com/antonmedv/expr"
"github.com/argoproj/argo-workflows/v3/util/mapper"
"strings"
)

func Eval(x any, env any) (any, error) {
return mapper.Map(x, func(g any) (any, error) {
s, ok := g.(string)
if ok {
return eval(s, env)
}
return g, nil
})
}

func eval(s string, env any) (string, error) {
const prefix = "ƛ"
if !strings.HasPrefix(s, prefix) {
return s, nil
}
input := strings.TrimPrefix(s, prefix)
output, err := expr.Eval(input, env)
if err != nil {
return "", fmt.Errorf("failed to evaluate %s: %w", s, err)
}
result, ok := output.(string)
if !ok {
return "", fmt.Errorf("failed to evaluate %s: %w", s, fmt.Errorf("expected result to be a string, but got %T", output))
}
return result, nil
}
13 changes: 13 additions & 0 deletions util/template/eval_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package template

import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/stretchr/testify/assert"
"testing"
)

func TestEval(t *testing.T) {
y, err := Eval(&wfv1.WorkflowSpec{Entrypoint: `ƛx == "foo" ? "bar": "x"`}, map[string]string{"x": "foo"})
assert.NoError(t, err)
assert.Equal(t, &wfv1.WorkflowSpec{Entrypoint: "bar"}, y)
}
11 changes: 11 additions & 0 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
exprenv "github.com/argoproj/argo-workflows/v3/util/expr/env"
"path/filepath"
"strconv"
"time"
Expand Down Expand Up @@ -428,6 +429,8 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
return nil, ErrResourceRateLimitReached
}

pod = woc.mustEval(pod).(*apiv1.Pod)

woc.log.Debugf("Creating Pod: %s (%s)", nodeName, pod.Name)

created, err := woc.controller.kubeclientset.CoreV1().Pods(woc.wf.ObjectMeta.Namespace).Create(ctx, pod, metav1.CreateOptions{})
Expand All @@ -449,6 +452,14 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
return created, nil
}

func (woc *wfOperationCtx) mustEval(x any) any {
y, err := template.Eval(x, exprenv.GetFuncMap(template.EnvMap(woc.globalParams)))
if err != nil {
panic(err)
}
return y
}

func (woc *wfOperationCtx) podExists(nodeID string) (existing *apiv1.Pod, exists bool, err error) {
objs, err := woc.controller.podInformer.GetIndexer().ByIndex(indexes.NodeIDIndex, woc.wf.Namespace+"/"+nodeID)
if err != nil {
Expand Down

0 comments on commit 4266933

Please sign in to comment.