Skip to content
This repository has been archived by the owner on May 4, 2022. It is now read-only.

Commit

Permalink
Base logic for event handlers (#95)
Browse files Browse the repository at this point in the history
* Base logic for event handlers

This patch adds basic logic for event handlers. Currently it's not plugin based,
but the implementation is prepared for that.

This patch also adds first handler included in the SG codebase to not change
build process now. Thi handler splits the check-container-healthcheck result
event from collectd-sensubility and saves it to ES database as multiple events.

* Fix handler issues

    - leaving output in json for better querying
    - saving all output items, not just first one
    - fixed naming typo

* Fix staticcheck issue

* Fix lint issue

* refactor handler function (#96)

Co-authored-by: pleimer <[email protected]>

Co-authored-by: pleimer <[email protected]>
Co-authored-by: pleimer <[email protected]>
  • Loading branch information
3 people authored Dec 1, 2020
1 parent a418595 commit 86dd522
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 8 deletions.
37 changes: 29 additions & 8 deletions internal/pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func StartEvents() {

// Elastic connection
elasticClient, err := saelastic.CreateClient(*serverConfig)

if err != nil {
log.Fatal(err.Error())
}
Expand All @@ -233,6 +234,12 @@ func StartEvents() {
processingCases, qpidStatusCases, amqpServers := amqp10.CreateMessageLoopComponents(serverConfig, finish, amqpHandler, *fUniqueName)
amqp10.SpawnQpidStatusReporter(&wg, applicationHealth, qpidStatusCases)

// spawn handler manager
handlerManager, err := NewEventHandlerManager(*serverConfig)
if err != nil {
log.Fatal(err.Error())
}

// spawn event processor
wg.Add(1)
go func() {
Expand All @@ -252,15 +259,29 @@ func StartEvents() {
log.Printf("Failed to parse received event:\n- error: %s\n- event: %s\n", err, event)
}

record, err := elasticClient.Create(event.GetIndexName(), EVENTSINDEXTYPE, event.GetRawData())
if err != nil {
applicationHealth.ElasticSearchState = 0
log.Printf("Failed to save event to Elasticsearch DB:\n- error: %s\n- event: %s\n", err, event)
} else {
applicationHealth.ElasticSearchState = 1
process := true
for _, handler := range handlerManager.Handlers[amqpServers[index].DataSource] {
if handler.Relevant(event) {
process, err = handler.Handle(event, elasticClient)
if !process {
if err != nil {
log.Print(err.Error())
}
break
}
}
}
if serverConfig.AlertManagerEnabled {
notifyAlertManager(&wg, *serverConfig, &event, record)
if process {
record, err := elasticClient.Create(event.GetIndexName(), EVENTSINDEXTYPE, event.GetRawData())
if err != nil {
applicationHealth.ElasticSearchState = 0
log.Printf("Failed to save event to Elasticsearch DB:\n- error: %s\n- event: %s\n", err, event)
} else {
applicationHealth.ElasticSearchState = 1
}
if serverConfig.AlertManagerEnabled {
notifyAlertManager(&wg, *serverConfig, &event, record)
}
}
}
}
Expand Down
151 changes: 151 additions & 0 deletions internal/pkg/events/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package events

import (
"encoding/json"
"fmt"

"github.com/infrawatch/smart-gateway/internal/pkg/events/incoming"
"github.com/infrawatch/smart-gateway/internal/pkg/saconfig"
"github.com/infrawatch/smart-gateway/internal/pkg/saelastic"
)

// TODO: Implement this as pluggable system instead

//EventHandler provides interface for all possible handler types
type EventHandler interface {
//Processes the event
Handle(incoming.EventDataFormat, *saelastic.ElasticClient) (bool, error)
//Relevant should return true if the handler is relevant for the givent event and so the handler should be used
Relevant(incoming.EventDataFormat) bool
}

//EventHandlerManager holds all available handlers (and will be responsible
//in future for loading all handler plugins). The plugins will be organize
//per data source on which's events they could be applied
type EventHandlerManager struct {
Handlers map[saconfig.DataSource][]EventHandler
}

//NewEventHandlerManager loads all even handler plugins stated in events configuration
func NewEventHandlerManager(config saconfig.EventConfiguration) (*EventHandlerManager, error) {
manager := EventHandlerManager{}
manager.Handlers = make(map[saconfig.DataSource][]EventHandler)
for _, ds := range []saconfig.DataSource{saconfig.DataSourceCollectd, saconfig.DataSourceCeilometer, saconfig.DataSourceUniversal} {
manager.Handlers[ds] = make([]EventHandler, 0)
}

for _, pluginPath := range config.HandlerPlugins {
var ds saconfig.DataSource
if ok := ds.SetFromString(pluginPath.DataSource); !ok {
return &manager, fmt.Errorf("unknown datasource ''%s' for given event handler", pluginPath.DataSource)
}
manager.LoadHandlers(ds, pluginPath.Path)
}

//TODO: this just manually register the only handler we have now. Remove when the handler implementation will move out to plugin
manager.Handlers[saconfig.DataSourceCollectd] = append(manager.Handlers[saconfig.DataSourceCollectd], ContainerHealthCheckHandler{"collectd_checks"})
return &manager, nil
}

//LoadHandlers will load handler plugins in future
func (hand *EventHandlerManager) LoadHandlers(dataSource saconfig.DataSource, path string) error {

return nil
}

//ContainerHealthCheckHandler serves as handler for events from collectd-sensubility's
//results of check-container-health.
type ContainerHealthCheckHandler struct {
ElasticIndex string
}

type containerHealthCheckItem struct {
Container string `json:"container"`
Service string `json:"service"`
Status string `json:"status"`
Healthy int `json:"healthy"`
}

type list struct {
Next *list
Key string
}

//recursivle search for output field based on path
func getOutputObject(path *list, data interface{}) (string, error) {
var obj map[string]interface{}
var ok bool

if obj, ok = data.(map[string]interface{}); !ok {
return "", fmt.Errorf("cannot search non-map objects")
}
if path.Next == nil {
if output, ok := obj[path.Key].(string); ok {
return output, nil
}
return "", fmt.Errorf("output should be of type 'string'")
}
if newData, ok := obj[path.Key]; ok {
return getOutputObject(path.Next, newData)
}
return "", fmt.Errorf("input data does not contain path")
}

//Handle saves the event as separate document to ES in case the result output contains more than one item.
//Returns true if event processing should continue (eg. event should be saved to ES) or false if otherwise.
func (hand ContainerHealthCheckHandler) Handle(event incoming.EventDataFormat, elasticClient *saelastic.ElasticClient) (bool, error) {
pathList := &list{
Key: "annotations",
}
pathList.Next = &list{
Key: "output",
}

if evt, ok := event.(*incoming.CollectdEvent); ok {
rawData := evt.GetRawData()
output, err := getOutputObject(pathList, rawData)
if err != nil {
return false, err
}

var outData []containerHealthCheckItem
rawDataMap := rawData.(map[string]interface{})
if err := json.Unmarshal([]byte(output), &outData); err == nil {
for _, item := range outData {
rawDataMap["annotations"].(map[string]interface{})["output"] = item
if _, err := elasticClient.Create(hand.ElasticIndex, EVENTSINDEXTYPE, rawDataMap); err != nil {
// saving the splitted output failed for some reason, so we will play safe
// and try to process event outside of handler
return true, err
}
}
} else {
// We most probably received single item output, so we just proceed and save the event
if _, err := elasticClient.Create(hand.ElasticIndex, EVENTSINDEXTYPE, rawData); err != nil {
return false, err
}
}
}

//record, err := elasticClient.Create(event.GetIndexName(), EVENTSINDEXTYPE, event.GetRawData())
return false, nil
}

//Relevant returns true in case the event is suitable for processing with this handler, otherwise returns false.
func (hand ContainerHealthCheckHandler) Relevant(event incoming.EventDataFormat) bool {
if evt, ok := event.(*incoming.CollectdEvent); ok {
rawData := evt.GetRawData()
if data, ok := rawData.(map[string]interface{}); ok {
if rawLabels, ok := data["labels"]; ok {
if labels, ok := rawLabels.(map[string]interface{}); ok {
if check, ok := labels["check"]; ok {
if checkName, ok := check.(string); ok && checkName == "check-container-health" {
return true
}
}
}
}
}
}
return false
}
7 changes: 7 additions & 0 deletions internal/pkg/saconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ type EventAPIConfig struct {
AMQP1PublishURL string `json:"AMQP1PublishURL"` // new amqp address to send notifications
}

//HandlerPath holds information about location of handler plugin and a data source type stream it should be applied on
type HandlerPath struct {
Path string `json:"Path"`
DataSource string `json:"DataSource"`
}

//EventConfiguration ...
type EventConfiguration struct {
Debug bool `json:"Debug"`
Expand All @@ -78,6 +84,7 @@ type EventConfiguration struct {
TLSClientCert string `json:"TlsClientCert"`
TLSClientKey string `json:"TlsClientKey"`
TLSCaCert string `json:"TlsCaCert"`
HandlerPlugins []HandlerPath `json:"HandlerPlugin"`
}

/******************** MetricConfiguration implementation *********************/
Expand Down

0 comments on commit 86dd522

Please sign in to comment.