Skip to content

Commit

Permalink
Added support for passing complete event payload from signal to trigg…
Browse files Browse the repository at this point in the history
…er. Updated docs
  • Loading branch information
VaibhavPage committed Oct 4, 2018
1 parent 3272f1f commit 263831c
Show file tree
Hide file tree
Showing 39 changed files with 279 additions and 250 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ minikube start
eval $(minikube docker-env)
```

#### 5. Build the project & Docker images
#### 5. Build the project
```
make all
```
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ override LDFLAGS += \
-X ${PACKAGE}.gitTreeState=${GIT_TREE_STATE}

# docker image publishing options
DOCKER_PUSH=true
DOCKER_PUSH=false
IMAGE_NAMESPACE=argoproj
IMAGE_TAG=latest

Expand All @@ -35,13 +35,13 @@ endif

# Build the project images
.DELETE_ON_ERROR:
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-transformer-linux webhook-linux calendar-linux artifact-linux nats-linux kafka-linux amqp-linux mqtt-linux
all: sensor-linux sensor-controller-linux gateway-controller-linux gateway-http-transformer-linux webhook-linux calendar-linux artifact-linux nats-linux kafka-linux amqp-linux mqtt-linux gateway-processor-grpc-client-linux calendar-grpc-linux gateway-processor-http-client-linux calendar-http-linux

all-images: sensor-image sensor-controller-image gateway-controller-image gateway-http-transformer-image webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image gateway-processor-grpc-client-image calendar-grpc-image gateway-processor-http-client-image calendar-http-image

all-controller-images: sensor-controller-image gateway-controller-image

all-gateway-images: webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image
all-core-gateway-images: webhook-image calendar-image artifact-image nats-image kafka-image amqp-image mqtt-image

.PHONY: all clean test

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Argo Events is an open source event-based dependency manager for Kubernetes. The
- Runtime agnostic. The first runtime and package agnostic event framework for Kubernetes.
- Containers. Designed from the ground-up as Kubernetes-native.
- Extremely lightweight. All gateways, with the exception of calendar-based gateways, are event-driven, meaning that there is no polling involved.
- Configurable. Select gateways you want to support, deploy those to Kubernetes and configure them on the fly
- Configurable. Configure gateways at the runtime
- Scalable & Resilient.
- Simple or Complex dependencies. Manage everything from simple, linear, real-time dependencies to complex, multi-source, batch job dependencies.

Expand Down
1 change: 0 additions & 1 deletion controllers/sensor/notification-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func (se *sensorExecutionCtx) processSignal(gwEventWrapper *sensorEventWrapper)

// apply filters if any.
// error is thrown if some problem occurs during filtering the signal
// apply filters if any
ok, err := se.filterEvent(gwEventWrapper.signal.Filters, gwEventWrapper.event)
if err != nil {
se.log.Error().Err(err).Str("signal-name", gwEventWrapper.event.Context.Source.Host).Err(err).Msg("failed to apply filter")
Expand Down
6 changes: 5 additions & 1 deletion controllers/sensor/signal-filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ func (se *sensorExecutionCtx) filterTime(timeFilter *v1alpha1.TimeFilter, eventT
if int(currentT.Month()) < 10 {
currentMonth = "0" + currentMonth
}
currentTStr := fmt.Sprintf("%d-%s-%d", currentT.Year(), currentMonth, currentT.Day())
currentDay := fmt.Sprintf("%d", int(currentT.Day()))
if int(currentT.Day()) < 10 {
currentDay = "0" + currentDay
}
currentTStr := fmt.Sprintf("%d-%s-%s", currentT.Year(), currentMonth, currentDay)

if timeFilter.Start != "" && timeFilter.Stop != "" {
se.log.Info().Str("start time format", currentTStr+" "+timeFilter.Start).Msg("start time format")
Expand Down
8 changes: 7 additions & 1 deletion controllers/sensor/signal-filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,18 @@ func Test_filterTime(t *testing.T) {
Start: "10:11:00",
}
event := getCloudEvent()

currentT := time.Now().UTC()
currentMonth := fmt.Sprintf("%d", int(currentT.Month()))
if int(currentT.Month()) < 10 {
currentMonth = "0" + currentMonth
}
currentTStr := fmt.Sprintf("%d-%s-%d", currentT.Year(), currentMonth, currentT.Day())
currentDay := fmt.Sprintf("%d", int(currentT.Day()))
if int(currentT.Day()) < 10 {
currentDay = "0" + currentDay
}

currentTStr := fmt.Sprintf("%d-%s-%s", currentT.Year(), currentMonth, currentDay)
parsedTime, err := time.Parse(common.StandardTimeFormat, currentTStr+" 16:36:34")
assert.Nil(t, err)
event.Context.EventTime = metav1.MicroTime{
Expand Down
4 changes: 4 additions & 0 deletions controllers/sensor/trigger-params.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func resolveParamValue(src *v1alpha1.ResourceParameterSource, events map[string]
}
return "", err
}
// check if complete payload needs to be passed to the trigger
if src.Path == "" {
return string(js), nil
}
res := gjson.GetBytes(js, src.Path)
if res.Exists() {
return res.String(), nil
Expand Down
10 changes: 9 additions & 1 deletion controllers/sensor/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,15 @@ func validateSignalFilter(filter v1alpha1.SignalFilter) error {

func validateSignalTimeFilter(tFilter *v1alpha1.TimeFilter) error {
currentT := metav1.Time{Time: time.Now().UTC()}
currentTStr := fmt.Sprintf("%d-%d-%d", currentT.Year(), int(currentT.Month()), currentT.Day())
currentMonth := fmt.Sprintf("%d", int(currentT.Month()))
if int(currentT.Month()) < 10 {
currentMonth = "0" + currentMonth
}
currentDay := fmt.Sprintf("%d", int(currentT.Day()))
if int(currentT.Day()) < 10 {
currentDay = "0" + currentDay
}
currentTStr := fmt.Sprintf("%d-%s-%s", currentT.Year(), currentMonth, currentDay)
if tFilter.Start != "" && tFilter.Stop != "" {
startTime, err := time.Parse(common.StandardTimeFormat, currentTStr+" "+tFilter.Start)
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions docs/artifact-guide.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# Artifact Guide
This is a guide for help in utilizing artifacts within Argo Events. Sensors use artifacts for two purposes:
1. Object notifications for use in `Artifact` signals. (currently S3 bucket notifications are only supported)
2. A Resource Object store for use in `Resource` triggers
This is a guide for help in utilizing artifacts within Argo Events. Sensors use artifacts for Resource Object store for use in `Resource` triggers

## Inline
Inlined artifacts are included directly within the sensor resource and decoded as a string.
Expand Down
68 changes: 14 additions & 54 deletions docs/custom-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Difference between first three options and fourth is that in options 1,2 and 3,
to watch configuration updates, start/stop a configuration dynamically. In option 4, its up to
user to watch configuration updates and take actions.

##### Below are the environment variables provided to all kinds of gateways
<b> Below are the environment variables provided to all types of gateways </b>

| Field | Description |
|----------------------|--------------|
Expand All @@ -23,15 +23,13 @@ user to watch configuration updates and take actions.
| GATEWAY_CONTROLLER_NAME | Contains name of gateway controller |

## Core Gateway Style
It is the most straightforward option. The gateway consists of two components,
Its the most straightforward option. The gateway consists of two components,

1. Gateway Processor: either generates events internally or listens for external events and then
passes those events to gateway-transformer

2. Gateway Transformer: transforms incoming events into cloudevents specification compliant events
and dispatches them to interested sensors.

* gateway-processor can be exposed via a service using gateway's `serviceSpec`
and dispatches them to watchers.

![](core-gateway-style.png)

Expand All @@ -58,40 +56,6 @@ type ConfigData struct {
}
```

Lets's look at excerpt from NATS gateway's RunConfiguration implementation

```go
var wg sync.WaitGroup
wg.Add(1)

// waits till stop signal.
go func() {
<-config.StopCh
n.gatewayConfig.Log.Info().Str("config", config.Src).Msg("stopping the configuration...")
n.gatewayConfig.Log.Info().Str("config-key", config.Src).Msg("client disconnected. stopping the configuration...")
wg.Done()
}()

n.gatewayConfig.Log.Info().Str("config-name", config.Src).Msg("running...")
config.Active = true

sub, err := conn.Subscribe(s.Attributes[subjectKey], func(msg *natsio.Msg) {
n.gatewayConfig.Log.Info().Str("config-key", config.Src).Msg("dispatching event to gateway-processor")
n.gatewayConfig.DispatchEvent(msg.Data, config.Src)
})
if err != nil {
n.gatewayConfig.Log.Error().Str("url", s.URL).Str("subject", s.Attributes[subjectKey]).Err(err).Msg("failed to subscribe to subject")
} else {
n.gatewayConfig.Log.Info().Str("config-key", config.Src).Msg("running...")
}

wg.Wait()
```

First we create a wait-group and wait for stop signal. We mark configuration as active, subscribe to subject
and listens to incoming messages. As soon as message is consumed, we dispatch the event to framwork code
which takes further actions.

GatewayConfig contains generic configuration for a gateway
```go
type GatewayConfig struct {
Expand All @@ -108,13 +72,12 @@ type GatewayConfig struct {
}
```

* To send events back to framework code for further processing, use
* To send events back to framework for further processing, use
```go
gatewayConfig.DispatchEvent(event []byte, src string) error
```


For detailed implementation, check core gateways [Core Gateways](https://github.com/argoproj/argo-events/tree/eventing/gateways/core)
For detailed implementation, check out [Core Gateways](https://github.com/argoproj/argo-events/tree/eventing/gateways/core)

## gRPC gateway
A gRPC gateway has 3 components,
Expand All @@ -124,19 +87,19 @@ external events and streams them back to gateway-processor-client
2. Gateway Processor Client - gRPC client provided by framework that connects to gateway processor server.

3. Gateway Transformer: transforms incoming events into cloudevents specification compliant events
and dispatches them to interested sensors.
and dispatches them to interested watchers.

### Architecture
![](grpc-gateway.png)

To implement gateway processor server, you will need to implement
To implement gateway processor server, you will need to implement
```proto
RunGateway(GatewayConfig) returns (stream Event)
```
`RunGateway` method takes an gateway configuration and sends events over a stream.
`RunGateway` method takes a gateway configuration and sends events over a stream.

The gateway processor client opens a new connection for each gateway configuration and start listening to
events on the stream.
The gateway processor client opens a new connection for each gateway configuration and starts listening to
events on a stream.

For detailed implementation, check out [Calendar gRPC gateway](https://github.com/argoproj/argo-events/tree/eventing/gateways/grpc/calendar)

Expand All @@ -156,7 +119,7 @@ a new configuration or `/stop` endpoint to stop a configuration. Processor clien
running internally listening for events from gateway-processor-server.

3. Gateway Transformer: transforms incoming events into cloudevents specification compliant events
and dispatches them to interested sensors.
and dispatches them to watchers.


### Architecture
Expand All @@ -175,11 +138,8 @@ List of environment variables available to user code

For detailed implementation, check out [Calendar HTTP gateway](https://github.com/argoproj/argo-events/tree/eventing/gateways/rest/calendar)

###### Advantage of writing a gRPC or http gateway is that you can use other languages for implementation of gateway processor server.


## No framework gateway code
The third option is you provide gateway implementation from scratch, watch the configuration
## Framework independent
The fourth option is you provide gateway implementation from scratch: watch the configuration
updates, start/stop configuration if needed. Only requirement is that events must be
dispatched to gateway-transformer using HTTP post request. The port to dispatch the request
is made available through environment variable `TRANSFORMER_PORT`.
Expand All @@ -194,4 +154,4 @@ List of environment variables available to user code
| GATEWAY_NAME | Gateway name |

### Gateway Examples
* Example gateways are available at [gateway examples](https://github.com/argoproj/argo-events/tree/eventing/examples/gateways)
* Example gateway definitions are available at [here](https://github.com/argoproj/argo-events/tree/eventing/examples/gateways)
33 changes: 18 additions & 15 deletions docs/gateway-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

## What is a gateway?
A gateway is a long running/repeatable process whose tasks are to process and transform either the internally produced events or
incoming events into the cloudevents specification compliant events and dispatching them to sensors.
external events into the cloudevents specification compliant events and dispatch them to watchers(sensors and/or gateways).

## Gateway Components
A gateway has two components:

1. gateway-processor: Either generates the events internally or listens to incoming events. It then passes the events to gateway-transformer.
The implementation of gateway-processor is provided by the user which means the user can easily create a custom gateway that has the business logic pertaining to a use-case.
1. <b>gateway-processor</b>: Either generates the events internally or listens to external events.
The implementation of gateway-processor is provided by the user which means the user can easily create a custom gateway.

2. gateway-transformer: Transforms the incoming event from gateway-processor into a cloudevents specification compliant event. The event is then dispatched to sensors who are interested in listening to this gateway.
Refer https://github.com/cloudevents/spec for more info on cloudevents.
2. <b>gateway-transformer</b>: Transforms the incoming events from gateway-processor into a cloudevents specification compliant events.
The event is then dispatched to watchers.

Refer <b>https://github.com/cloudevents/spec </b> for more info on cloudevents specifications.


Core gateways come in 5 types:
Expand Down Expand Up @@ -44,24 +46,25 @@ The `gateway-controller` is responsible for managing the `Gateway` resources.
| ConfigMap | Name of the configmap containing gateway configuration/s |
| Type | Type of gateway |
| Version | To mark event version |
| Service | Name of the service to expose the gateway |
| Sensors | List of sensors to dispatch events to |
| ServiceSpec | Specifications of the service to expose the gateway |
| Watchers | Watchers are components which are interested listening to notifications from the gateway |
| RPCPort | Used to communicate between gRPC gateway client and gRPC gateway server |
| HTTPServerPort | Used to communicate between gateway client and server over http |
| DispatchMechanism | Messaging mechanism used to send events from gateway to watchers |


## Gateway Deployment

All core gateways use kubernetes configmap to keep track of current gateway configurations. Multiple configurations can be defined for a single gateway and
each configuration will run in a separate go routine. The gateway watches updates to configmap which let us add new configuration at run time.
[Checkout core gateways specs.](https://github.com/argoproj/argo-events/tree/eventing/examples/gateways)

## How to write a custom gateway?
Follow the gateway tutorial
Follow this tutorial to learn more
[Custom Gateways](custom-gateway.md)


## Types of Gateways & their configurations

###### Gateway can have zero configuration(won't be doing anything useful) to multiple configurations. A configuration can be added or removed during the runtime.
## Gateway configurations
<b>Gateway can have zero configuration(idle) or many configurations. A configuration can be added or removed during the runtime.</b>

### Calendars
Events produced can be based on a [cron](https://crontab.guru/) schedule or an [interval duration](https://golang.org/pkg/time/#ParseDuration). In addition, calendar gateway currently supports a `recurrence` field in which to specify special exclusion dates for which this gateway will not produce an event.
Expand Down Expand Up @@ -138,10 +141,10 @@ Artifact gateway support S3 `bucket-notifications` via [Minio](https://docs.mini
```

### Streams
Stream signals contain a generic specification for messages received on a queue and/or though messaging server. The following are the `builtin` supported stream signals. Users can build their own signals by adding implementations to the [custom](../signals/stream/custom/doc.go) package.
Stream gateways contain a generic specification for messages received on a queue and/or though messaging server. The following are the `builtin` supported stream gateways.

#### NATS
[Nats](https://nats.io/) is an open-sourced, lightweight, secure, and scalable messaging system for cloud native applications and microservices architecture. It is currently a hosted CNCF Project. We are currently experimenting with using NATS as a solution for gateway (inputs) and triggers (outputs), however `NATS Streaming`, the data streaming system powered by NATS, offers many additional [features](https://nats.io/documentation/streaming/nats-streaming-intro/) on top of the core NATS platform that we believe are very desirable and definite future enhancements.
[Nats](https://nats.io/) is an open-sourced, lightweight, secure, and scalable messaging system for cloud native applications and microservices architecture. It is currently a hosted CNCF Project.
```
nats.fooConfig: |-
url: nats://nats.argo-events:4222
Expand Down Expand Up @@ -207,4 +210,4 @@ Stream signals contain a generic specification for messages received on a queue
```

### Examples
[Gateway Examples](https://github.com/argoproj/argo-events/tree/eventing/examples/gateways)
Explore [Gateway Examples](https://github.com/argoproj/argo-events/tree/eventing/examples/gateways)
4 changes: 2 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
## Why Argo Events?
- Containers. Designed from the ground-up as Kubernetes-native.
- Extremely lightweight. All gateways, with exception of calendar based gateway, are event-driven, meaning there is no polling involved.
- Configurable. Select gateways you want to support, deploy those to Kubernetes and configure them on the fly
- Extensible. Write custom gateways that cater to your business use cases in any language of your choice
- Configurable. Configure gateways at the runtime
- Extensible. Write custom gateways that cater to your business use cases in a language of your choice
- Scalability & Resilient.
- Simple or Complex dependencies. Manage everything from simple, linear, real-time dependencies to complex, multi-source batch job dependencies.

Expand Down
Loading

0 comments on commit 263831c

Please sign in to comment.