Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

re-architecting around gateways and sensors #92

Merged
merged 37 commits into from
Oct 1, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
12af598
added gateway for processing outside events, making gateway long runn…
VaibhavPage Aug 7, 2018
4bb4457
refactoring k8s manifests
VaibhavPage Aug 22, 2018
95c2548
adding s3 gateway
VaibhavPage Aug 22, 2018
5839ee9
making s3 and webhook gateway mult configurable
VaibhavPage Aug 23, 2018
c3542da
refactor of calendar, s3 and webhook gateway done. started with streams
VaibhavPage Aug 23, 2018
543f698
added nats gateway
VaibhavPage Aug 24, 2018
44956f4
renamed signals to gateways. moved out-of-box gateways to core.
VaibhavPage Aug 24, 2018
78843be
refactor gateways
VaibhavPage Aug 24, 2018
cdc20f3
removing signals folder
VaibhavPage Aug 24, 2018
2ab9407
added kafka gateway
VaibhavPage Aug 24, 2018
7caee70
gateway and sensor dirs under a common controllers and clients directory
VaibhavPage Aug 27, 2018
2db02fe
adding resource, amqp and mqtt gateways. refactored sensor watcher
VaibhavPage Aug 28, 2018
62ccce9
adding gateway examples, readme etc
VaibhavPage Aug 28, 2018
721bac5
adding gateway docs. refactor gateway and sensor examples
VaibhavPage Aug 29, 2018
57791a9
added watcher for sensor. Completion count for sensor. Updated gatewa…
VaibhavPage Aug 30, 2018
ee5fefb
sensor's signal are gateway-name/configuration-name
VaibhavPage Aug 30, 2018
a039fa1
making gateway processor as gRPC server
VaibhavPage Sep 4, 2018
16d175a
giving user options to implement gateway either by mimicing core gate…
VaibhavPage Sep 5, 2018
66d3247
gateway spec changed. divided extending gateways into 3 parts. starte…
VaibhavPage Sep 5, 2018
28db2b8
updated gateway examples with deploySpec. started writing custom gate…
VaibhavPage Sep 6, 2018
11110d3
adding wiki for writing custom gateways
VaibhavPage Sep 6, 2018
8947fe5
updating wiki. grpc gateway updated DialOptions
VaibhavPage Sep 7, 2018
6828b2e
added REST gateways. wiki to follow
VaibhavPage Sep 8, 2018
50ec67c
refactor rest gateways
VaibhavPage Sep 10, 2018
b8c79c0
added k8 events for gateway. Gateway configuration state is now maint…
VaibhavPage Sep 11, 2018
99b04d5
minor refactor in k8 events
VaibhavPage Sep 11, 2018
655028a
added gateways as watchers for gateway along with sensors. started ad…
VaibhavPage Sep 12, 2018
3b0c7b0
added k8 event for escalation. nodes are deleted from gateway resourc…
VaibhavPage Sep 14, 2018
b43c934
added unit tests. refactored sensor notification handler. added k8 ev…
VaibhavPage Sep 17, 2018
970c63b
adding test cases for sensor controller, operator and notification ha…
VaibhavPage Sep 17, 2018
9ec8c44
updating webhook gateway
VaibhavPage Sep 18, 2018
6433b77
fixed issue with syncing gateway nodes and gateway configmap
VaibhavPage Sep 20, 2018
6691b07
test: cleaning up test cases
VaibhavPage Sep 21, 2018
d08ac54
updating dependencies
VaibhavPage Sep 21, 2018
92b31a5
update docker registry to argoproj
VaibhavPage Sep 22, 2018
ac90e6b
exposing gateway and sensor validate method
VaibhavPage Sep 27, 2018
edfa3a1
update dependencies
VaibhavPage Sep 27, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ const (
// http responses
SuccessResponse = "Success"
ErrorResponse = "Error"

// LabelEventSeen is the label for already seen k8 event
LabelEventSeen = "event-seen"

LabelArgoEventsEscalationKind = "ArgoEventsEscalation"
)

// SENSOR CONTROLLER CONSTANTS
Expand Down Expand Up @@ -71,6 +76,11 @@ const (

// LabelJobName is label for job name
LabelJobName = "job-name"

// LabelSensorName is label for sensor name
LabelSensorName = "sensor-name"

LabelSignalName = "signal-name"
)

// GATEWAY CONSTANTS
Expand Down Expand Up @@ -103,9 +113,6 @@ const (
// LabelGatewayConfigurationName is the label for a configuration in gateway
LabelGatewayConfigurationName = "config-name"

// LabelGatewayEventSeen is the label for already seen gateway event
LabelGatewayEventSeen = "event-seen"

// GatewayControllerInstanceIDEnvVar is used to get controller instance id
GatewayControllerInstanceIDEnvVar = "GATEWAY_CONTROLLER_INSTANCE_ID"

Expand Down
65 changes: 65 additions & 0 deletions common/k8-events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package common

import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
"k8s.io/client-go/kubernetes"
)

// K8Event abstracts kubernetes event.
type K8Event struct {
// The object that this event is about.
Name string
// Namespace where event should be created
Namespace string
// What action was taken/failed regarding to the Regarding object.
Action string
// This should be a short, machine understandable string that gives the reason
// for the transition into the object's current status.
Reason string
// Kind of component generating this event
Kind string
// Name of the controller that emitted this Event
ReportingController string
// ID of the controller instance
ReportingInstance string
// Type of this event (Normal, Warning), new types could be added in the future
Type string
// Map of string keys and values that can be used to organize and categorize
// (scope and select) objects.
Labels map[string]string
}

// CreateK8Event returns a kubernetes event object
func GetK8Event(event *K8Event) *corev1.Event{
return &corev1.Event{
Reason: event.Reason,
Type: event.Type,
Action: event.Action,
EventTime: metav1.MicroTime{
Time: time.Now(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: event.Namespace,
Name: event.Name + "-" + RandomStringGenerator(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you not just use GenerateName here instead?

Copy link
Contributor Author

@VaibhavPage VaibhavPage Sep 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, had it changed in my local. I'll push some new changes with test cases

Labels: event.Labels,
},
InvolvedObject: corev1.ObjectReference{
Namespace: event.Namespace,
Name: event.Name,
Kind: event.Kind,
},
Source: corev1.EventSource{
Component: event.Name,
},
ReportingInstance: event.ReportingInstance,
ReportingController: event.ReportingController,
}
}

// CreateK8Event creates a kubernetes event resource
func CreateK8Event(event *corev1.Event, clientset kubernetes.Interface) error {
_, err := clientset.CoreV1().Events(event.ObjectMeta.Namespace).Create(event)
return err
}
4 changes: 2 additions & 2 deletions controllers/gateway/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (goc *gwOperationCtx) operate() error {
// check if the type of the gateway is http
var sensorWatchers []string
var gatewayWatchers []string
if goc.gw.Spec.Type == v1alpha1.HTTPGateway {
if goc.gw.Spec.DispatchMechanism == v1alpha1.HTTPGateway {
if goc.gw.Spec.Watchers.Sensors != nil {
for _, sensor := range goc.gw.Spec.Watchers.Sensors {
b, err := yaml.Marshal(sensor)
Expand Down Expand Up @@ -402,7 +402,7 @@ func (goc *gwOperationCtx) getContainersForGatewayPod() *[]corev1.Container {

var transformerImage string

switch goc.gw.Spec.Type {
switch goc.gw.Spec.DispatchMechanism {
case v1alpha1.HTTPGateway:
transformerImage = common.GatewayHTTPEventTransformerImage
case v1alpha1.KafkaGateway:
Expand Down
3 changes: 1 addition & 2 deletions controllers/gateway/transform/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ func (toc *tOperationCtx) postCloudEventToWatcher(ip string, port string, endpoi
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
toc.log.Info().Int("response-status-code", resp.StatusCode).Str("response-status", resp.Status).Msg("posting cloud event to watcher")
_, err = client.Do(req)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion controllers/gateway/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (goc *gwOperationCtx) validate() error {
if goc.gw.Spec.Version == "" {
return fmt.Errorf("gateway version is not specified")
}
switch goc.gw.Spec.Type {
switch goc.gw.Spec.DispatchMechanism {
case v1alpha1.HTTPGateway:
if goc.gw.Spec.Watchers == nil || (goc.gw.Spec.Watchers.Gateways == nil && goc.gw.Spec.Watchers.Sensors == nil) {
return fmt.Errorf("no associated watchers with gateway")
Expand Down
8 changes: 7 additions & 1 deletion controllers/sensor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
base "github.com/argoproj/argo-events"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
sensorclientset "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned"
"github.com/argoproj/argo-events/common"
)

const (
Expand Down Expand Up @@ -108,7 +109,12 @@ func (c *SensorController) processNextItem() bool {
if err != nil {
// now let's escalate the sensor
// the context should have the most up-to-date version
log.Infof("escalating sensor to level %s via %s message", ctx.s.Spec.Escalation.Level, ctx.s.Spec.Escalation.Message)
ctx.log.Error().Err(err).Msg("escalating controller failure")
event := ctx.GetK8Event("controller error", v1alpha1.NodePhaseError, sensor.Kind)
err = common.CreateK8Event(event, ctx.controller.kubeClientset)
if err != nil {
ctx.log.Error().Err(err).Msg("failed to create escalation event for controller failure")
}
}

return true
Expand Down
23 changes: 21 additions & 2 deletions controllers/sensor/notification-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,31 @@ func (se *sensorExecutor) handleSignals(w http.ResponseWriter, r *http.Request)
return
}

// check if sensor is in error state
if se.sensor.Status.Phase == v1alpha1.NodePhaseError {
se.log.Warn().Msg("sensor is in error state. can't process the notification")
common.SendErrorResponse(w)
return
}

// validate the signal is from gateway of interest
var isSignalValid bool
for _, signal := range se.sensor.Spec.Signals {
if signal.Name == gatewaySignal.Context.Source.Host {
isSignalValid = true
break
ok, err := se.filterEvent(signal.Filters, &gatewaySignal)
if err != nil {
se.log.Error().Str("signal-name", gatewaySignal.Context.Source.Host).Err(err).Msg("failed to apply filter")
// mark signal as failed
node := getNodeByName(se.sensor, gatewaySignal.Context.Source.Host)
se.markNodePhase(node.Name, v1alpha1.NodePhaseError, err.Error())
return
}
if ok {
isSignalValid = true
break
} else {
se.log.Warn().Str("signal-name", gatewaySignal.Context.Source.Host).Msg("notification failed to pass signal filter")
}
}
}

Expand Down
23 changes: 22 additions & 1 deletion controllers/sensor/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"os"
"github.com/argoproj/argo-events/pkg/apis/gateway"
)

// the context of an operation on a sensor.
Expand Down Expand Up @@ -76,6 +77,7 @@ func (soc *sOperationCtx) operate() error {
// since validation will fail every time
err := validateSensor(soc.s)
if err != nil {
soc.log.Error().Err(err).Msg("failed to validate sensor")
soc.markSensorPhase(v1alpha1.NodePhaseError, true, err.Error())
return nil
}
Expand Down Expand Up @@ -189,7 +191,7 @@ func (soc *sOperationCtx) operate() error {
}
}
case v1alpha1.NodePhaseError:
// todo: handle this
// trigger escalation for all signals in error state
case v1alpha1.NodePhaseComplete:
soc.log.Info().Msg("sensor is completed")
soc.s.Status.CompletionCount = soc.s.Status.CompletionCount + 1
Expand Down Expand Up @@ -333,3 +335,22 @@ func (soc *sOperationCtx) markSensorPhase(phase v1alpha1.NodePhase, markComplete
}
}
}

// createK8Event creates a kubernetes event.
func (soc *sOperationCtx) GetK8Event(reason string, action v1alpha1.NodePhase, eventType string) *corev1.Event {
event := &common.K8Event{
Name: soc.s.Name,
Namespace: soc.s.Namespace,
Type: eventType,
Reason: reason,
Action: string(action),
Kind: gateway.Kind,
ReportingController: common.DefaultSensorControllerDeploymentName,
ReportingInstance: soc.controller.Config.InstanceID,
Labels: map[string]string{
common.LabelEventSeen: "",
common.LabelSensorName: soc.s.Name,
},
}
return common.GetK8Event(event)
}
114 changes: 100 additions & 14 deletions controllers/sensor/signal-filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,121 @@ import (
v1alpha "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/tidwall/gjson"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"time"
"github.com/argoproj/argo-events/common"
)

// createEscalationEvent creates a k8 event for escalation
func (se *sensorExecutor) createEscalationEvent(policy *v1alpha1.EscalationPolicy, signalFilterName string) error {
se.log.Info().Interface("policy", policy).Msg("printing policy")
event := common.GetK8Event(&common.K8Event{
Name: policy.Name,
Namespace: se.sensor.Namespace,
ReportingInstance: se.sensor.Name,
ReportingController: se.sensor.Name,
Labels: map[string]string{
common.LabelEventSeen: "",
common.LabelSignalName: signalFilterName,
},
Type: common.LabelArgoEventsEscalationKind,
Action: string(policy.Level),
Reason: policy.Message,
})
err := common.CreateK8Event(event, se.kubeClient)
return err
}

// apply the signal filters to an event
func filterEvent(f v1alpha1.SignalFilter, event *v1alpha.Event) (bool, error) {
dataRes, err := filterData(f.Data, event)
return filterTime(f.Time, &event.Context.EventTime) && filterContext(f.Context, &event.Context) && dataRes, err
func (se *sensorExecutor) filterEvent(f v1alpha1.SignalFilter, event *v1alpha.Event) (bool, error) {
dataRes, err := se.filterData(f.Data.Filters, event)
// generate sensor failure event and mark sensor as failed
if err != nil {
return false, err
}
if !dataRes {
err = se.createEscalationEvent(f.Data.EscalationPolicy, f.Name)
if err != nil {
return false, err
}
}
timeRes, err := se.filterTime(f.Time, &event.Context.EventTime)
// generate sensor failure event and mark sensor as failed
if err != nil {
return false, err
}
if !timeRes {
err = se.createEscalationEvent(f.Time.EscalationPolicy, f.Name)
if err != nil {
return false, err
}
}
ctxRes := se.filterContext(f.Context, &event.Context)
if !ctxRes {
err = se.createEscalationEvent(f.Context.EscalationPolicy, f.Name)
if err != nil {
return false, err
}
}
return timeRes && ctxRes && dataRes, err
}

// applyTimeFilter checks the eventTime against the timeFilter:
// 1. the eventTime is greater than or equal to the start time
// 2. the eventTime is less than the end time
// returns true if 1 and 2 are true and false otherwise
func filterTime(timeFilter *v1alpha1.TimeFilter, eventTime *metav1.Time) bool {
if timeFilter != nil && eventTime != nil {
if timeFilter.Start != nil && timeFilter.Stop != nil {
return (timeFilter.Start.Before(eventTime) || timeFilter.Start.Equal(eventTime)) && eventTime.Before(timeFilter.Stop)
func (se *sensorExecutor) filterTime(timeFilter *v1alpha1.TimeFilter, eventTime *metav1.Time) (bool, error) {
if timeFilter != nil {
currentT := time.Now().UTC()
se.log.Info().Str("current-time", currentT.String()).Msg("current time")
currentTStr := fmt.Sprintf("%d-%s-%d", currentT.Year(), int(currentT.Month()), currentT.Day())

if timeFilter.Start != "" && timeFilter.Stop != "" {
se.log.Info().Str("start time format", currentTStr + " " + timeFilter.Start).Msg("start time format")
startTime, err := time.Parse("2006-01-02 15:04:05", currentTStr + " " + timeFilter.Start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make the format string 2006-01-02 15:04:05 a constant since we use it more than once and seeing this may be confusing for some.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done 👍

if err != nil {
fmt.Println(err)
return false, err
}
se.log.Info().Str("start time", startTime.String()).Msg("start time")
startTime = startTime.UTC()
se.log.Info().Str("stop time format", currentTStr + " " + timeFilter.Stop).Msg("stop time format")
stopTime, err := time.Parse("2006-01-02 15:04:05", currentTStr + " " + timeFilter.Stop)
if err != nil {
fmt.Println(err)
return false, err
}
se.log.Info().Str("stop time", stopTime.String()).Msg("stop time")
stopTime = stopTime.UTC()
return (startTime.Before(eventTime.Time) || stopTime.Equal(eventTime.Time)) && eventTime.Time.Before(stopTime), nil
}
if timeFilter.Start != nil {
if timeFilter.Start != "" {
// stop is nil - does not have an end
return timeFilter.Start.Before(eventTime) || timeFilter.Start.Equal(eventTime)
startTime, err := time.Parse("2006-01-02 15:04:05", currentTStr + " " + timeFilter.Start)
if err != nil {
return false, err
}
se.log.Info().Str("start time", startTime.String()).Msg("start time")
startTime = startTime.UTC()
return startTime.Before(eventTime.Time) || startTime.Equal(eventTime.Time), nil
}
if timeFilter.Stop != nil {
return eventTime.Before(timeFilter.Stop)
if timeFilter.Stop != "" {
stopTime, err := time.Parse("2016-01-02 15:04:05", currentTStr + " " + timeFilter.Stop)
if err != nil {
return false, err
}
se.log.Info().Str("stop time", stopTime.String()).Msg("stop time")
stopTime = stopTime.UTC()
return eventTime.Time.Before(stopTime), nil
}
}
return true
se.log.Info().Msg("NO time filter")
return true, nil
}

// applyContextFilter checks the expected EventContext against the actual EventContext
// values are only enforced if they are non-zero values
// map types check that the expected map is a subset of the actual map
func filterContext(expected *v1alpha.EventContext, actual *v1alpha.EventContext) bool {
func (se *sensorExecutor) filterContext(expected *v1alpha.EventContext, actual *v1alpha.EventContext) bool {
if expected == nil {
return true
}
Expand Down Expand Up @@ -89,9 +172,12 @@ func filterContext(expected *v1alpha.EventContext, actual *v1alpha.EventContext)
// applyDataFilter runs the dataFilter against the event's data
// returns (true, nil) when data passes filters, false otherwise
// TODO: split this function up into smaller pieces
func filterData(dataFilters []*v1alpha1.DataFilter, event *v1alpha.Event) (bool, error) {
func (se *sensorExecutor) filterData(dataFilters []*v1alpha1.DataFilter, event *v1alpha.Event) (bool, error) {
// TODO: use the event.Context.SchemaURL to figure out correct data format to unmarshal to
// for now, let's just use a simple map[string]interface{} for arbitrary data
if dataFilters == nil {
return true, nil
}
if event == nil {
return false, fmt.Errorf("nil event")
}
Expand Down
Loading