Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boolean circuit for event dependencies #162

Merged
merged 8 commits into from
Feb 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
863 changes: 0 additions & 863 deletions Gopkg.lock

This file was deleted.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ required = [
branch = "master"

[[constraint]]
name = "github.com/xanzy/go-gitlab"
name = "github.com/Knetic/govaluate"
branch = "master"

[[override]]
Expand Down
28 changes: 20 additions & 8 deletions controllers/gateway/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,33 @@ limitations under the License.
package gateway

import (
"fmt"
"github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1"
"github.com/ghodss/yaml"
"io/ioutil"
"strings"
"testing"

"github.com/smartystreets/goconvey/convey"
)

func TestValidate(t *testing.T) {
convey.Convey("Given a gateway", t, func() {
gateway, err := getGateway()

convey.Convey("Make sure gateway is a valid gateway", func() {
dir := "../../examples/gateways"
convey.Convey("Validate list of gateways", t, func() {
files, err := ioutil.ReadDir(dir)
convey.So(err, convey.ShouldBeNil)
for _, file := range files {
if strings.HasSuffix(file.Name(), "configmap.yaml") {
continue
}
fmt.Println("filename: ", file.Name())
content, err := ioutil.ReadFile(fmt.Sprintf("%s/%s", dir, file.Name()))
convey.So(err, convey.ShouldBeNil)
convey.So(gateway, convey.ShouldNotBeNil)

err := Validate(gateway)
var gateway *v1alpha1.Gateway
err = yaml.Unmarshal([]byte(content), &gateway)
convey.So(err, convey.ShouldBeNil)
err = Validate(gateway)
convey.So(err, convey.ShouldBeNil)
})
}
})
}
24 changes: 19 additions & 5 deletions controllers/sensor/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,15 @@ func (soc *sOperationCtx) operate() error {
}

// Initialize all event dependency nodes
for _, eventDependency := range soc.s.Spec.Dependencies {
InitializeNode(soc.s, eventDependency.Name, v1alpha1.NodeTypeEventDependency, &soc.log)
for _, dependency := range soc.s.Spec.Dependencies {
InitializeNode(soc.s, dependency.Name, v1alpha1.NodeTypeEventDependency, &soc.log)
}

// Initialize all dependency groups
if soc.s.Spec.DependencyGroups != nil {
for _, group := range soc.s.Spec.DependencyGroups {
InitializeNode(soc.s, group.Name, v1alpha1.NodeTypeDependencyGroup, &soc.log)
}
}

// Initialize all trigger nodes
Expand Down Expand Up @@ -182,9 +189,16 @@ func (soc *sOperationCtx) operate() error {
}
}

// Mark all eventDependency nodes as active
for _, eventDependency := range soc.s.Spec.Dependencies {
MarkNodePhase(soc.s, eventDependency.Name, v1alpha1.NodeTypeEventDependency, v1alpha1.NodePhaseActive, nil, &soc.log, "node is active")
// Mark all event dependency nodes as active
for _, dependency := range soc.s.Spec.Dependencies {
MarkNodePhase(soc.s, dependency.Name, v1alpha1.NodeTypeEventDependency, v1alpha1.NodePhaseActive, nil, &soc.log, "node is active")
}

// Mark all dependency groups as active
if soc.s.Spec.DependencyGroups != nil {
for _, group := range soc.s.Spec.DependencyGroups {
MarkNodePhase(soc.s, group.Name, v1alpha1.NodeTypeDependencyGroup, v1alpha1.NodePhaseActive, nil, &soc.log, "node is active")
}
}

// if we get here - we know the signals are running
Expand Down
2 changes: 2 additions & 0 deletions controllers/sensor/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func TestSensorOperations(t *testing.T) {
switch node.Type {
case v1alpha1.NodeTypeEventDependency:
convey.So(node.Phase, convey.ShouldEqual, v1alpha1.NodePhaseActive)
case v1alpha1.NodeTypeDependencyGroup:
convey.So(node.Phase, convey.ShouldEqual, v1alpha1.NodePhaseActive)
case v1alpha1.NodeTypeTrigger:
convey.So(node.Phase, convey.ShouldEqual, v1alpha1.NodePhaseNew)
}
Expand Down
23 changes: 23 additions & 0 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sensor

import (
"fmt"
"github.com/Knetic/govaluate"
"time"

"github.com/argoproj/argo-events/common"
Expand Down Expand Up @@ -61,6 +62,25 @@ func ValidateSensor(s *v1alpha1.Sensor) error {
default:
return fmt.Errorf("unknown gateway type")
}

if s.Spec.DependencyGroups != nil {
if s.Spec.Circuit == "" {
return fmt.Errorf("no circuit expression provided to resolve dependency groups")
}
expression, err := govaluate.NewEvaluableExpression(s.Spec.Circuit)
if err != nil {
return fmt.Errorf("circuit expression can't be created for dependency groups. err: %+v", err)
}

groups := make(map[string]interface{}, len(s.Spec.DependencyGroups))
for _, group := range s.Spec.DependencyGroups {
groups[group.Name] = false
}
if _, err = expression.Evaluate(groups); err != nil {
return fmt.Errorf("circuit expression can't be evaluated for dependency groups. err: %+v", err)
}
}

return nil
}

Expand All @@ -77,6 +97,9 @@ func validateTriggers(triggers []v1alpha1.Trigger) error {
if trigger.Resource == nil {
return fmt.Errorf("trigger '%s' does not contain an absolute action", trigger.Name)
}
if trigger.When != nil && trigger.When.All != nil && trigger.When.Any != nil {
return fmt.Errorf("trigger condition can't have both any and all condition")
}
}
return nil
}
Expand Down
21 changes: 16 additions & 5 deletions controllers/sensor/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,29 @@ limitations under the License.
package sensor

import (
"fmt"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/ghodss/yaml"
"io/ioutil"
"testing"

"github.com/smartystreets/goconvey/convey"
)

func TestValidateSensor(t *testing.T) {
convey.Convey("Given a sensor", t, func() {
sensor, err := getSensor()
dir := "../../examples/sensors"
convey.Convey("Validate list of sensor", t, func() {
files, err := ioutil.ReadDir(dir)
convey.So(err, convey.ShouldBeNil)
convey.Convey("Validate", func() {
err := ValidateSensor(sensor)
for _, file := range files {
fmt.Println("filename: ", file.Name())
content, err := ioutil.ReadFile(fmt.Sprintf("%s/%s", dir, file.Name()))
convey.So(err, convey.ShouldBeNil)
})
var sensor *v1alpha1.Sensor
err = yaml.Unmarshal([]byte(content), &sensor)
convey.So(err, convey.ShouldBeNil)
err = ValidateSensor(sensor)
convey.So(err, convey.ShouldBeNil)
}
})
}
5 changes: 3 additions & 2 deletions examples/sensors/context-filter-webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ spec:
kind: Workflow
source:
s3:
bucket: workflows
key: hello-world.yaml
bucket:
name: workflows
key: hello-world.yaml
endpoint: minio-service.argo-events:9000
insecure: true
accessKey:
Expand Down
13 changes: 8 additions & 5 deletions examples/sensors/data-filter-webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ spec:
dependencies:
- name: "webhook-gateway:foo"
filters:
name: "data-filter"
data:
- path: bucket
type: string
value: argo-workflow-input
dataFilters:
- path: bucket
type: string
value: argo-workflow-input
eventProtocol:
type: "HTTP"
http:
Expand All @@ -32,8 +34,9 @@ spec:
kind: Workflow
source:
s3:
bucket: workflows
key: hello-world.yaml
bucket:
name: workflows
key: hello-world.yaml
endpoint: minio-service.argo-events:9000
insecure: true
accessKey:
Expand Down
138 changes: 138 additions & 0 deletions examples/sensors/webhook-http-dependency-groups.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# The dependency grouping and selective workflow trigger execution is not supported in latest release
# This feature will be released in next release v0.8.
# You can try this example with sensor and sensor controller image v0.7.1
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: webhook-sensor-http
labels:
sensors.argoproj.io/sensor-controller-instanceid: argo-events
spec:
deploySpec:
containers:
- name: "sensor"
image: "argoproj/sensor:v0.7.1"
imagePullPolicy: Always
serviceAccountName: argo-events-sa
dependencies:
- name: "webhook-gateway-http:endpoint1"
filters:
context:
source:
host: xyz.com
contentType: application/json
- name: "webhook-gateway-http:endpoint2"
- name: "webhook-gateway-http:endpoint3"
- name: "webhook-gateway-http:endpoint4"
- name: "webhook-gateway-http:endpoint5"
- name: "webhook-gateway-http:endpoint6"
- name: "webhook-gateway-http:endpoint7"
filters:
name: "data-filter"
data:
dataFilters:
- path: bucket
type: string
value: argo-workflow-input
- name: "webhook-gateway-http:endpoint8"
- name: "webhook-gateway-http:endpoint9"
dependencyGroups:
- name: "group_1"
dependencies:
- "webhook-gateway-http:endpoint1"
- "webhook-gateway-http:endpoint2"
- name: "group_2"
dependencies:
- "webhook-gateway-http:endpoint3"
- name: "group_3"
dependencies:
- "webhook-gateway-http:endpoint4"
- "webhook-gateway-http:endpoint5"
- name: "group_4"
dependencies:
- "webhook-gateway-http:endpoint6"
- "webhook-gateway-http:endpoint7"
- "webhook-gateway-http:endpoint8"
- name: "group_5"
dependencies:
- "webhook-gateway-http:endpoint9"
circuit: "group_1 || group_2 || ((group_3 || group_4) && group_5)"
eventProtocol:
type: "HTTP"
http:
port: "9300"
triggers:
- name: webhook-workflow-trigger
when:
any:
- "group_1"
- "group_2"
resource:
namespace: argo-events
group: argoproj.io
version: v1alpha1
kind: Workflow
source:
inline: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
args:
- "hello world"
command:
- cowsay
image: "docker/whalesay:latest"
- name: webhook-workflow-trigger-2
when:
all:
- "group_5"
- "group_4"
resource:
namespace: argo-events
group: argoproj.io
version: v1alpha1
kind: Workflow
source:
inline: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-2-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
args:
- "hello world"
command:
- cowsay
image: "docker/whalesay:latest"
- name: webhook-workflow-trigger-common
resource:
namespace: argo-events
group: argoproj.io
version: v1alpha1
kind: Workflow
source:
inline: |
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-common-
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
args:
- "hello world"
command:
- cowsay
image: "docker/whalesay:latest"
2 changes: 1 addition & 1 deletion examples/sensors/webhook-http.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ spec:
- "hello world"
command:
- cowsay
image: "docker/whalesay:latest"
image: "docker/whalesay:latest"
2 changes: 1 addition & 1 deletion examples/sensors/webhook-with-complete-payload.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: webhook-with-resource-param-sensor
name: webhook-with-complete-payload-sensor
labels:
sensors.argoproj.io/sensor-controller-instanceid: argo-events
spec:
Expand Down
Loading