Skip to content

Commit

Permalink
feat(inputs.amqp_consumer): Determine content encoding automatically (i…
Browse files Browse the repository at this point in the history
  • Loading branch information
sspaink authored Sep 26, 2022
1 parent a2baab3 commit d982ed9
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 4 deletions.
35 changes: 35 additions & 0 deletions internal/content_coding.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,32 @@ func NewContentEncoder(encoding string) (ContentEncoder, error) {
}
}

type AutoDecoder struct {
encoding string
gzip *GzipDecoder
identity *IdentityDecoder
}

func (a *AutoDecoder) SetEnconding(encoding string) {
a.encoding = encoding
}

func (a *AutoDecoder) Decode(data []byte) ([]byte, error) {
if a.encoding == "gzip" {
return a.gzip.Decode(data)
}
return a.identity.Decode(data)
}

func NewAutoContentDecoder() (*AutoDecoder, error) {
var a AutoDecoder
var err error

a.identity = NewIdentityDecoder()
a.gzip, err = NewGzipDecoder()
return &a, err
}

// NewContentDecoder returns a ContentDecoder for the encoding type.
func NewContentDecoder(encoding string) (ContentDecoder, error) {
switch encoding {
Expand All @@ -91,6 +117,8 @@ func NewContentDecoder(encoding string) (ContentDecoder, error) {
return NewZlibDecoder()
case "identity", "":
return NewIdentityDecoder(), nil
case "auto":
return NewAutoContentDecoder()
default:
return nil, errors.New("invalid value for content_encoding")
}
Expand Down Expand Up @@ -171,6 +199,7 @@ func (*IdentityEncoder) Encode(data []byte) ([]byte, error) {

// ContentDecoder removes a wrapper encoding from byte buffers.
type ContentDecoder interface {
SetEnconding(string)
Decode([]byte) ([]byte, error)
}

Expand All @@ -187,6 +216,8 @@ func NewGzipDecoder() (*GzipDecoder, error) {
}, nil
}

func (*GzipDecoder) SetEnconding(string) {}

func (d *GzipDecoder) Decode(data []byte) ([]byte, error) {
d.reader.Reset(bytes.NewBuffer(data))
d.buf.Reset()
Expand All @@ -212,6 +243,8 @@ func NewZlibDecoder() (*ZlibDecoder, error) {
}, nil
}

func (*ZlibDecoder) SetEnconding(string) {}

func (d *ZlibDecoder) Decode(data []byte) ([]byte, error) {
d.buf.Reset()

Expand All @@ -238,6 +271,8 @@ func NewIdentityDecoder() *IdentityDecoder {
return &IdentityDecoder{}
}

func (*IdentityDecoder) SetEnconding(string) {}

func (*IdentityDecoder) Decode(data []byte) ([]byte, error) {
return data, nil
}
15 changes: 13 additions & 2 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ For an introduction to AMQP see:
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
## Content encoding for message payloads, can be set to
## "gzip", "identity" or "auto"
## - Use "gzip" to decode gzip
## - Use "identity" to apply no encoding
## - Use "auto" determine the encoding using the ContentEncoding header
# content_encoding = "identity"

## Data format to consume.
Expand All @@ -92,3 +95,11 @@ For an introduction to AMQP see:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
```

## Metrics

TODO

## Example Output

TODO
1 change: 1 addition & 0 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ func (a *AMQPConsumer) onMessage(acc telegraf.TrackingAccumulator, d amqp.Delive
}
}

a.decoder.SetEnconding(d.ContentEncoding)
body, err := a.decoder.Decode(d.Body)
if err != nil {
onError()
Expand Down
51 changes: 51 additions & 0 deletions plugins/inputs/amqp_consumer/amqp_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package amqp_consumer

import (
"testing"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/parsers/influx"
"github.com/influxdata/telegraf/testutil"
"github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
)

func TestAutoEncoding(t *testing.T) {
enc, err := internal.NewGzipEncoder()
require.NoError(t, err)
payload, err := enc.Encode([]byte(`measurementName fieldKey="gzip" 1556813561098000000`))
require.NoError(t, err)

var a AMQPConsumer
parser := &influx.Parser{}
require.NoError(t, parser.Init())
a.deliveries = make(map[telegraf.TrackingID]amqp091.Delivery)
a.parser = parser
a.decoder, err = internal.NewContentDecoder("auto")
require.NoError(t, err)

acc := &testutil.Accumulator{}

d := amqp091.Delivery{
ContentEncoding: "gzip",
Body: payload,
}
err = a.onMessage(acc, d)
require.NoError(t, err)
acc.AssertContainsFields(t, "measurementName", map[string]interface{}{"fieldKey": "gzip"})

encIdentity := internal.NewIdentityEncoder()
require.NoError(t, err)
payload, err = encIdentity.Encode([]byte(`measurementName2 fieldKey="identity" 1556813561098000000`))
require.NoError(t, err)

d = amqp091.Delivery{
ContentEncoding: "not_gzip",
Body: payload,
}

err = a.onMessage(acc, d)
require.NoError(t, err)
acc.AssertContainsFields(t, "measurementName2", map[string]interface{}{"fieldKey": "identity"})
}
7 changes: 5 additions & 2 deletions plugins/inputs/amqp_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## Content encoding for message payloads, can be set to "gzip" to or
## "identity" to apply no encoding.
## Content encoding for message payloads, can be set to
## "gzip", "identity" or "auto"
## - Use "gzip" to decode gzip
## - Use "identity" to apply no encoding
## - Use "auto" determine the encoding using the ContentEncoding header
# content_encoding = "identity"

## Data format to consume.
Expand Down

0 comments on commit d982ed9

Please sign in to comment.