Skip to content

Commit

Permalink
feat: Azure Event Hubs output plugin (#9346)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomconte authored Oct 18, 2021
1 parent c4c3202 commit e324ef1
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 4 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ require (
cloud.google.com/go/pubsub v1.17.0
code.cloudfoundry.org/clock v1.0.0 // indirect
collectd.org v0.5.0
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 // indirect
github.com/Azure/azure-amqp-common-go/v3 v3.1.0 // indirect
github.com/Azure/azure-event-hubs-go/v3 v3.3.13
github.com/Azure/azure-kusto-go v0.4.0
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go v52.5.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go v55.0.0+incompatible // indirect
github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-amqp v0.13.12 // indirect
Expand Down
14 changes: 12 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1 h1:mXh+eyOxGLBfqDtfmbtby0l7XfG/6b2NkuZ3B7i6zHA=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-amqp-common-go/v3 v3.0.1/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-amqp-common-go/v3 v3.1.0 h1:1N4YSkWYWffOpQHromYdOucBSQXhNRKzqtgICy6To8Q=
github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGibdKaYZv4avS+O6LNIp8bq0=
github.com/Azure/azure-event-hubs-go/v3 v3.3.13 h1:aiI2RLjp0MzLCuFUXzR8b3h3bdPIc2c3vBYXRK8jX3E=
github.com/Azure/azure-event-hubs-go/v3 v3.3.13/go.mod h1:dJ/WqDn0KEJkNznL9UT/UbXzfmkffCjSNl9x2Y8JI28=
github.com/Azure/azure-kusto-go v0.4.0 h1:CivPswdkVzSXzEjzJTyOJ6e5RhI4IKvaszilyNGvs+A=
Expand All @@ -80,8 +82,10 @@ github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v44.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v52.5.0+incompatible h1:/NLBWHCnIHtZyLPc1P7WIqi4Te4CC23kIQyK3Ep/7lA=
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v52.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go v55.0.0+incompatible h1:L4/vUGbg1Xkw5L20LZD+hJI5I+ibWSytqQ68lTCfLwY=
github.com/Azure/azure-sdk-for-go v55.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-storage-blob-go v0.6.0/go.mod h1:oGfmITT1V6x//CswqY2gtAHND+xIP64/qL7a5QJix0Y=
github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0=
github.com/Azure/azure-storage-blob-go v0.14.0 h1:1BCg74AmVdYwO3dlKwtFU1V0wU2PZdREkXvAmZJRUlM=
Expand Down Expand Up @@ -113,6 +117,7 @@ github.com/Azure/go-autorest/autorest/adal v0.9.0/go.mod h1:/c022QCutn2P7uY+/oQW
github.com/Azure/go-autorest/autorest/adal v0.9.5/go.mod h1:B7KF7jKIeC9Mct5spmyCB/A8CG/sEz1vwIRGv/bbw7A=
github.com/Azure/go-autorest/autorest/adal v0.9.11/go.mod h1:nBKAnTomx8gDtl+3ZCJv2v0KACFHWTB2drffI1B68Pk=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
github.com/Azure/go-autorest/autorest/adal v0.9.16 h1:P8An8Z9rH1ldbOLdFpxYorgOt2sywL9V24dAwWHPuGc=
github.com/Azure/go-autorest/autorest/adal v0.9.16/go.mod h1:tGMin8I49Yij6AQ+rvV+Xa/zwxYQB5hmsd6DkfAx2+A=
github.com/Azure/go-autorest/autorest/azure/auth v0.4.2/go.mod h1:90gmfKdlmKgfjUpnCEpOJzsUEjrWDSLwHIG73tSXddM=
Expand All @@ -132,6 +137,8 @@ github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935
github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk=
github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+XA683u8EctwboHk=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE=
github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac=
github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E=
Expand Down Expand Up @@ -520,6 +527,8 @@ github.com/coreos/go-systemd v0.0.0-20161114122254-48702e0da86b/go.mod h1:F5haX7
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
Expand Down Expand Up @@ -2429,6 +2438,7 @@ golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
_ "github.com/influxdata/telegraf/plugins/outputs/dynatrace"
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/outputs/event_hubs"
_ "github.com/influxdata/telegraf/plugins/outputs/exec"
_ "github.com/influxdata/telegraf/plugins/outputs/execd"
_ "github.com/influxdata/telegraf/plugins/outputs/file"
Expand Down
12 changes: 12 additions & 0 deletions plugins/outputs/azure_monitor/azure_monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,3 +373,15 @@ func TestWrite(t *testing.T) {
})
}
}

func TestMain(m *testing.M) {
// Set up a fake environment for adal.getMSIType()
// Root cause: https://github.com/Azure/go-autorest/commit/def88ef859fb980eff240c755a70597bc9b490d0
err := os.Setenv("MSI_ENDPOINT", "fake.endpoint")

if err != nil {
panic(err)
}

os.Exit(m.Run())
}
25 changes: 25 additions & 0 deletions plugins/outputs/event_hubs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Azure Event Hubs output plugin

This plugin for [Azure Event Hubs](https://azure.microsoft.com/en-gb/services/event-hubs/) will send metrics to a single Event Hub within an Event Hubs namespace. Metrics are sent as message batches, each message payload containing one metric object. The messages do not specify a partition key, and will thus be automatically load-balanced (round-robin) across all the Event Hub partitions.

## Metrics

The plugin uses the Telegraf serializers to format the metric data sent in the message payloads. You can select any of the supported output formats, although JSON is probably the easiest to integrate with downstream components.

## Configuration

```toml
[[ outputs.event_hubs ]]
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"

## Client timeout (defaults to 30s)
# timeout = "30s"

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json"
```
148 changes: 148 additions & 0 deletions plugins/outputs/event_hubs/event_hubs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package event_hubs

import (
"context"
"time"

eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/plugins/outputs"
"github.com/influxdata/telegraf/plugins/serializers"
)

/*
** Wrapper interface for eventhub.Hub
*/

type EventHubInterface interface {
GetHub(s string) error
Close(ctx context.Context) error
SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error
}

type eventHub struct {
hub *eventhub.Hub
}

func (eh *eventHub) GetHub(s string) error {
hub, err := eventhub.NewHubFromConnectionString(s)

if err != nil {
return err
}

eh.hub = hub

return nil
}

func (eh *eventHub) Close(ctx context.Context) error {
return eh.hub.Close(ctx)
}

func (eh *eventHub) SendBatch(ctx context.Context, iterator eventhub.BatchIterator, opts ...eventhub.BatchOption) error {
return eh.hub.SendBatch(ctx, iterator, opts...)
}

/* End wrapper interface */

type EventHubs struct {
Log telegraf.Logger `toml:"-"`
ConnectionString string `toml:"connection_string"`
Timeout config.Duration

Hub EventHubInterface
serializer serializers.Serializer
}

const (
defaultRequestTimeout = time.Second * 30
)

func (e *EventHubs) Description() string {
return "Configuration for Event Hubs output plugin"
}

func (e *EventHubs) SampleConfig() string {
return `
## The full connection string to the Event Hub (required)
## The shared access key must have "Send" permissions on the target Event Hub.
connection_string = "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=superSecret1234=;EntityPath=hubName"
## Client timeout (defaults to 30s)
# timeout = "30s"
## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "json"
`
}

func (e *EventHubs) Init() error {
err := e.Hub.GetHub(e.ConnectionString)

if err != nil {
return err
}

return nil
}

func (e *EventHubs) Connect() error {
return nil
}

func (e *EventHubs) Close() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()

err := e.Hub.Close(ctx)

if err != nil {
return err
}

return nil
}

func (e *EventHubs) SetSerializer(serializer serializers.Serializer) {
e.serializer = serializer
}

func (e *EventHubs) Write(metrics []telegraf.Metric) error {
var events []*eventhub.Event

for _, metric := range metrics {
payload, err := e.serializer.Serialize(metric)

if err != nil {
e.Log.Debugf("Could not serialize metric: %v", err)
continue
}

events = append(events, eventhub.NewEvent(payload))
}

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(e.Timeout))
defer cancel()

err := e.Hub.SendBatch(ctx, eventhub.NewEventBatchIterator(events...))

if err != nil {
return err
}

return nil
}

func init() {
outputs.Add("event_hubs", func() telegraf.Output {
return &EventHubs{
Hub: &eventHub{},
Timeout: config.Duration(defaultRequestTimeout),
}
})
}
Loading

0 comments on commit e324ef1

Please sign in to comment.