Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for passing complete event payload from signal to trigger #94

Merged
merged 1 commit into from
Oct 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ minikube start
eval $(minikube docker-env)
```

#### 5. Build the project & Docker images
#### 5. Build the project
```
make all
```
Expand Down
18 changes: 15 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ endif

# Build the project images
.DELETE_ON_ERROR:
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-transformer-linux webhook-linux calendar-linux artifact-linux nats-linux kafka-linux amqp-linux mqtt-linux
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-http-transformer-linux webhook-linux calendar-linux resource-linux artifact-linux nats-linux kafka-linux amqp-linux mqtt-linux gateway-processor-grpc-client-linux calendar-grpc-linux gateway-processor-http-client-linux calendar-http-linux

all-images: sensor-image sensor-controller-image gateway-controller-image gateway-http-transformer-image webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image gateway-processor-grpc-client-image calendar-grpc-image gateway-processor-http-client-image calendar-http-image
all-images: sensor-image sensor-controller-image gateway-controller-image gateway-http-transformer-image webhook-image calendar-image resource-image artifact-image nats-image kafka-image amqp-image mqtt-image gateway-processor-grpc-client-image calendar-grpc-image gateway-processor-http-client-image calendar-http-image

all-controller-images: sensor-controller-image gateway-controller-image

all-gateway-images: webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image
all-core-gateway-images: webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image resource-image

.PHONY: all clean test

Expand Down Expand Up @@ -115,6 +115,18 @@ calendar-image: calendar-linux
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)calendar-gateway:$(IMAGE_TAG) ; fi


resource:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/resource-gateway ./gateways/core/resource/

resource-linux:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 make resource

resource-image: resource-linux
docker build -t $(IMAGE_PREFIX)resource-gateway:$(IMAGE_TAG) -f ./gateways/core/resource/Dockerfile .
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)resource-gateway:$(IMAGE_TAG) ; fi



artifact:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -v -ldflags '${LDFLAGS}' -o ${DIST_DIR}/artifact-gateway ./gateways/core/artifact/

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Argo Events is an open source event-based dependency manager for Kubernetes. The
- Runtime agnostic. The first runtime and package agnostic event framework for Kubernetes.
- Containers. Designed from the ground-up as Kubernetes-native.
- Extremely lightweight. All gateways, with the exception of calendar-based gateways, are event-driven, meaning that there is no polling involved.
- Configurable. Select gateways you want to support, deploy those to Kubernetes and configure them on the fly
- Configurable. Configure gateways at runtime
- Scalable & Resilient.
- Simple or Complex dependencies. Manage everything from simple, linear, real-time dependencies to complex, multi-source, batch job dependencies.

Expand Down
2 changes: 1 addition & 1 deletion cmd/controllers/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func main() {

namespace, ok := os.LookupEnv(common.GatewayNamespace)
if !ok {
namespace = common.DefaultGatewayControllerNamespace
namespace = common.DefaultControllerNamespace
}

// create new gateway controller
Expand Down
2 changes: 1 addition & 1 deletion cmd/controllers/sensor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {

namespace, ok := os.LookupEnv(common.SensorNamespace)
if !ok {
namespace = common.DefaultGatewayControllerNamespace
namespace = common.DefaultControllerNamespace
}

// create a new sensor controller
Expand Down
3 changes: 3 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (

// StandardTimeFormat is time format reference for golang
StandardTimeFormat = "2006-01-02 15:04:05"

// StandardYYYYMMDDFormat formats date in yyyy-mm-dd format
StandardYYYYMMDDFormat = "2006-01-02"
)

// SENSOR CONTROLLER CONSTANTS
Expand Down
16 changes: 6 additions & 10 deletions common/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,11 @@ import (
const namespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

var (
// DefaultSensorControllerNamespace is the default namespace where the sensor sensor-controller is installed
DefaultSensorControllerNamespace = "argo-events"

// Todo: Does this even have to be separate
// DefaultGatewayControllerNamespace is the default namespace where the sensor sensor-controller is installed
DefaultGatewayControllerNamespace = "argo-events"
// DefaultControllerNamespace is the default namespace where the sensor and gateways controllers are installed
DefaultControllerNamespace = "argo-events"

// ErrReadNamespace occurs when the namespace cannot be read from a Kubernetes pod's service account token
ErrReadNamespace = errors.New("Could not read namespace from service account secret")
ErrReadNamespace = errors.New("could not read namespace from service account secret")
)

func init() {
Expand Down Expand Up @@ -61,16 +57,16 @@ func RefreshNamespace() {
// 1 - env variable
nm, ok := os.LookupEnv(EnvVarNamespace)
if ok {
DefaultSensorControllerNamespace = nm
DefaultControllerNamespace = nm
return
}

// 2 - pod service account token
nm, err := detectNamespace()
if err == nil {
DefaultSensorControllerNamespace = nm
DefaultControllerNamespace = nm
}

// 3 - use the DefaultSensorControllerNamespace
// 3 - use the DefaultControllerNamespace
return
}
5 changes: 2 additions & 3 deletions common/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import (
func TestResolveNamespace(t *testing.T) {
defer os.Unsetenv(EnvVarNamespace)

RefreshNamespace()
assert.Equal(t, "argo-events", DefaultSensorControllerNamespace)
assert.Equal(t, "argo-events", DefaultControllerNamespace)

// TODO: now write the namespace file

Expand All @@ -37,5 +36,5 @@ func TestResolveNamespace(t *testing.T) {
}

RefreshNamespace()
assert.Equal(t, "test", DefaultSensorControllerNamespace)
assert.Equal(t, "test", DefaultControllerNamespace)
}
2 changes: 1 addition & 1 deletion controllers/gateway/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *GatewayController) updateConfig(cm *apiv1.ConfigMap) error {
return err
}
if config.Namespace == "" {
config.Namespace = common.DefaultGatewayControllerNamespace
config.Namespace = common.DefaultControllerNamespace
}
c.Config = config
return nil
Expand Down
2 changes: 1 addition & 1 deletion controllers/gateway/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,5 @@ func TestGatewayController_ResyncConfig(t *testing.T) {
assert.NotNil(t, cm)
assert.NotNil(t, gc.Config)
assert.NotEqual(t, gc.Config.Namespace, gc.ConfigMapNS)
assert.Equal(t, gc.Config.Namespace, common.DefaultGatewayControllerNamespace)
assert.Equal(t, gc.Config.Namespace, common.DefaultControllerNamespace)
}
2 changes: 1 addition & 1 deletion controllers/sensor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (c *SensorController) updateConfig(cm *apiv1.ConfigMap) error {
return err
}
if config.Namespace == "" {
config.Namespace = common.DefaultSensorControllerNamespace
config.Namespace = common.DefaultControllerNamespace
}
c.Config = config
return nil
Expand Down
6 changes: 2 additions & 4 deletions controllers/sensor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@ import (
"context"
"errors"
"time"

"fmt"
"log"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

base "github.com/argoproj/argo-events"
"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
sensorclientset "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned"
"fmt"
"log"
)

const (
Expand Down
1 change: 0 additions & 1 deletion controllers/sensor/notification-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func (se *sensorExecutionCtx) processSignal(gwEventWrapper *sensorEventWrapper)

// apply filters if any.
// error is thrown if some problem occurs during filtering the signal
// apply filters if any
ok, err := se.filterEvent(gwEventWrapper.signal.Filters, gwEventWrapper.event)
if err != nil {
se.log.Error().Err(err).Str("signal-name", gwEventWrapper.event.Context.Source.Host).Err(err).Msg("failed to apply filter")
Expand Down
10 changes: 3 additions & 7 deletions controllers/sensor/signal-filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,9 @@ func (se *sensorExecutionCtx) filterTime(timeFilter *v1alpha1.TimeFilter, eventT
if timeFilter != nil {
se.log.Info().Str("event-time", eventTime.String()).Msg("event time")
currentT := time.Now().UTC()
se.log.Info().Str("current-time", currentT.String()).Msg("current time")
currentMonth := fmt.Sprintf("%d", int(currentT.Month()))
if int(currentT.Month()) < 10 {
currentMonth = "0" + currentMonth
}
currentTStr := fmt.Sprintf("%d-%s-%d", currentT.Year(), currentMonth, currentT.Day())

currentT = time.Date(currentT.Year(), currentT.Month(), currentT.Day(), 0, 0, 0, 0, time.UTC)
currentTStr := currentT.Format(common.StandardYYYYMMDDFormat)
se.log.Info().Str("date", currentTStr).Msg("current date")
if timeFilter.Start != "" && timeFilter.Stop != "" {
se.log.Info().Str("start time format", currentTStr+" "+timeFilter.Start).Msg("start time format")
startTime, err := time.Parse(common.StandardTimeFormat, currentTStr+" "+timeFilter.Start)
Expand Down
9 changes: 3 additions & 6 deletions controllers/sensor/signal-filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ limitations under the License.
package sensor

import (
"fmt"
"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/stretchr/testify/assert"
Expand All @@ -36,12 +35,10 @@ func Test_filterTime(t *testing.T) {
Start: "10:11:00",
}
event := getCloudEvent()

currentT := time.Now().UTC()
currentMonth := fmt.Sprintf("%d", int(currentT.Month()))
if int(currentT.Month()) < 10 {
currentMonth = "0" + currentMonth
}
currentTStr := fmt.Sprintf("%d-%s-%d", currentT.Year(), currentMonth, currentT.Day())
currentT = time.Date(currentT.Year(), currentT.Month(), currentT.Day(), 0, 0, 0, 0, time.UTC)
currentTStr := currentT.Format(common.StandardYYYYMMDDFormat)
parsedTime, err := time.Parse(common.StandardTimeFormat, currentTStr+" 16:36:34")
assert.Nil(t, err)
event.Context.EventTime = metav1.MicroTime{
Expand Down
4 changes: 4 additions & 0 deletions controllers/sensor/trigger-params.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func resolveParamValue(src *v1alpha1.ResourceParameterSource, events map[string]
}
return "", err
}
// check if complete payload needs to be passed to the trigger
if src.Path == "" {
return string(js), nil
}
res := gjson.GetBytes(js, src.Path)
if res.Exists() {
return res.String(), nil
Expand Down
8 changes: 4 additions & 4 deletions controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// ValidateSensor accepts a sensor and performs validation against it
Expand Down Expand Up @@ -83,8 +82,9 @@ func validateSignalFilter(filter v1alpha1.SignalFilter) error {
}

func validateSignalTimeFilter(tFilter *v1alpha1.TimeFilter) error {
currentT := metav1.Time{Time: time.Now().UTC()}
currentTStr := fmt.Sprintf("%d-%d-%d", currentT.Year(), int(currentT.Month()), currentT.Day())
currentT := time.Now().UTC()
currentT = time.Date(currentT.Year(), currentT.Month(), currentT.Day(), 0, 0, 0, 0, time.UTC)
currentTStr := currentT.Format(common.StandardYYYYMMDDFormat)
if tFilter.Start != "" && tFilter.Stop != "" {
startTime, err := time.Parse(common.StandardTimeFormat, currentTStr+" "+tFilter.Start)
if err != nil {
Expand All @@ -104,7 +104,7 @@ func validateSignalTimeFilter(tFilter *v1alpha1.TimeFilter) error {
return err
}
stopTime = stopTime.UTC()
if stopTime.Before(currentT.Time) {
if stopTime.Before(currentT.UTC()) {
return fmt.Errorf("invalid signal time filter: stop '%s' is before the current time '%s'", tFilter.Stop, currentT)
}
}
Expand Down
4 changes: 1 addition & 3 deletions docs/artifact-guide.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# Artifact Guide
This is a guide for help in utilizing artifacts within Argo Events. Sensors use artifacts for two purposes:
1. Object notifications for use in `Artifact` signals. (currently S3 bucket notifications are only supported)
2. A Resource Object store for use in `Resource` triggers
This is a guide for help in utilizing artifacts within Argo Events. Sensors use artifacts for Resource Object store for use in `Resource` triggers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this is no longer for object notifications because of the minio listenBucketNotifications implementation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup


## Inline
Inlined artifacts are included directly within the sensor resource and decoded as a string.
Expand Down
Loading