Skip to content

Commit

Permalink
[receiver/awsfirehose] Refactor to use pdata unmarshaler interfaces (#…
Browse files Browse the repository at this point in the history
…37361)

#### Description

Refactor unmarshallers to fit into the encoding framework.
    
The internal unmarshallers now implement `plog.Unmarshaler` and
`pmetric.Unmarshaler`. This will enable us to use encoding extensions in
a followup, and will enable us to extract the unmarshallers later as
encoding extensions.
    
As a result of the interface change, the unmarshallers now unmarshal a
single record at a time, which means we cannot merge resources/metrics
as we go, and only within each record. This impacts performance, so to
offset that we implement various optimisations:

  - Use json-iterator for decoding JSON
  - Use klauspost/compress for decompressing gzip
  - Pool gzip readers
  - Remove pointer type from cwMetricValue to avoid allocation
  - Don't read the whole request body into memory
  - Reuse buffer for decoding base64; decode as we go

There are more optimisations we can make to reduce memory allocations,
e.g. avoid reflection when decoding JSON.

There's a fix for a subtle bug in the cwmetrics unmarshaller where the
unit of a metric was not considered part of its identity, and so two
metrics that differed only by unit would be merged.

#### Link to tracking issue

Preparation for #37113

#### Testing

- Added tests for consuming requests containing multiple records.
- Added benchmarks for the full request/response flow, for cwlogs and
cwmetrics record types. There's an increase in memory usage for
cwmetrics unmarshalling, and decrease for cwlogs unmarshalling. CPU
usage is better across the board.

```
goos: linux
goarch: amd64
pkg: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver
cpu: AMD Ryzen 7 PRO 5850U with Radeon Graphics     
                                                              │ /tmp/old.txt  │            /tmp/new.txt             │
                                                              │    sec/op     │    sec/op     vs base               │
LogsConsumer_cwlogs/10resources_10records_1logs-16              142.03µ ±  8%   75.85µ ± 16%  -46.60% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_10records_10logs-16              413.1µ ±  5%   208.0µ ± 23%  -49.65% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_100records_1logs-16             1435.8µ ±  8%   728.0µ ± 11%  -49.30% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_100records_10logs-16             4.200m ±  2%   1.875m ±  4%  -55.34% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_10records_1metrics-16     104.30µ ± 16%   70.26µ ± 12%  -32.64% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_10records_10metrics-16     821.7µ ±  7%   505.5µ ±  5%  -38.48% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_100records_1metrics-16     780.2µ ±  5%   647.0µ ±  8%  -17.07% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_100records_10metrics-16    7.823m ±  5%   5.297m ± 11%  -32.29% (p=0.002 n=6)
geomean                                                          809.9µ         475.7µ        -41.26%

                                                              │ /tmp/old.txt  │             /tmp/new.txt             │
                                                              │     B/op      │     B/op      vs base                │
LogsConsumer_cwlogs/10resources_10records_1logs-16              437.78Ki ± 0%   61.70Ki ± 1%   -85.91% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_10records_10logs-16              533.7Ki ± 0%   124.9Ki ± 2%   -76.60% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_100records_1logs-16             4319.1Ki ± 0%   550.1Ki ± 0%   -87.26% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_100records_10logs-16             5.167Mi ± 0%   1.172Mi ± 1%   -77.31% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_10records_1metrics-16      47.88Ki ± 5%   83.71Ki ± 0%   +74.84% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_10records_10metrics-16     390.0Ki ± 1%   415.4Ki ± 1%    +6.51% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_100records_1metrics-16     358.9Ki ± 0%   772.9Ki ± 0%  +115.37% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_100records_10metrics-16    3.730Mi ± 0%   3.962Mi ± 0%    +6.23% (p=0.002 n=6)
geomean                                                          779.7Ki        391.8Ki        -49.76%

                                                              │ /tmp/old.txt │            /tmp/new.txt            │
                                                              │  allocs/op   │  allocs/op   vs base               │
LogsConsumer_cwlogs/10resources_10records_1logs-16                406.0 ± 2%    348.0 ± 2%  -14.29% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_10records_10logs-16              2.025k ± 0%   1.971k ± 1%   -2.64% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_100records_1logs-16              2.922k ± 0%   3.094k ± 0%   +5.89% (p=0.002 n=6)
LogsConsumer_cwlogs/10resources_100records_10logs-16             18.46k ± 0%   19.46k ± 1%   +5.45% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_10records_1metrics-16       548.5 ± 8%    653.0 ± 0%  +19.05% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_10records_10metrics-16     3.795k ± 3%   4.729k ± 1%  +24.63% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_100records_1metrics-16     3.281k ± 0%   6.161k ± 0%  +87.78% (p=0.002 n=6)
MetricsConsumer_cwmetrics/10resources_100records_10metrics-16    28.02k ± 0%   46.90k ± 0%  +67.39% (p=0.002 n=6)
geomean                                                          3.098k        3.722k       +20.16%
```

#### Documentation

N/A

---------

Co-authored-by: Anthony Mirabella <[email protected]>
  • Loading branch information
axw and Aneurysm9 authored Feb 10, 2025
1 parent 1f8c1ee commit 1af5afa
Show file tree
Hide file tree
Showing 27 changed files with 994 additions and 862 deletions.
27 changes: 27 additions & 0 deletions .chloggen/firehose-unmarshal-record.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awsfirehosereceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Refactor unmarshallers to implement pdata unmarshaler interfaces

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37361]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
164 changes: 164 additions & 0 deletions receiver/awsfirehosereceiver/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awsfirehosereceiver

import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"math/rand/v2"
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/receiver/receivertest"
)

func BenchmarkLogsConsumer_cwlogs(b *testing.B) {
// numLogGroups is the maximum number of unique log groups
// to use across the generated logs, using a random generator
// with fixed seeds for repeatability.
const numLogGroups = 10
rng := rand.New(rand.NewPCG(1, 2))

// numRecords is the number of records in the Firehose envelope.
for _, numRecords := range []int{10, 100} {
// numLogs is the number of CoudWatch log records within a Firehose record.
for _, numLogs := range []int{1, 10} {
b.Run(fmt.Sprintf("%dresources_%drecords_%dlogs", numLogGroups, numRecords, numLogs), func(b *testing.B) {
config := createDefaultConfig().(*Config)
config.Endpoint = "localhost:0"
r, err := createLogsReceiver(
context.Background(),
receivertest.NewNopSettings(),
config,
consumertest.NewNop(),
)
require.NoError(b, err)

err = r.Start(context.Background(), componenttest.NewNopHost())
require.NoError(b, err)
b.Cleanup(func() {
err = r.Shutdown(context.Background())
assert.NoError(b, err)
})

records := make([]firehoseRecord, numRecords)
for i := range records {
records[i] = firehoseRecord{
Data: base64.StdEncoding.EncodeToString(
makeCloudWatchLogRecord(rng, numLogs, numLogGroups),
),
}
}
fr := testFirehoseRequest(testFirehoseRequestID, records)
body, err := json.Marshal(fr)
require.NoError(b, err)

b.ResetTimer()
for i := 0; i < b.N; i++ {
req := newTestRequest(body)
recorder := httptest.NewRecorder()
r.(http.Handler).ServeHTTP(recorder, req)
if recorder.Code != http.StatusOK {
b.Fatalf("expected status code 200, got %d", recorder.Code)
}
}
})
}
}
}

func BenchmarkMetricsConsumer_cwmetrics(b *testing.B) {
// numStreams is the maximum number of unique metric streams
// to use across the generated metrics, using a random generator
// with fixed seeds for repeatability.
const numStreams = 10
rng := rand.New(rand.NewPCG(1, 2))

// numRecords is the number of records in the Firehose envelope.
for _, numRecords := range []int{10, 100} {
// numMetrics is the number of CoudWatch metrics within a Firehose record.
for _, numMetrics := range []int{1, 10} {
b.Run(fmt.Sprintf("%dresources_%drecords_%dmetrics", numStreams, numRecords, numMetrics), func(b *testing.B) {
config := createDefaultConfig().(*Config)
config.Endpoint = "localhost:0"
r, err := createMetricsReceiver(
context.Background(),
receivertest.NewNopSettings(),
config,
consumertest.NewNop(),
)
require.NoError(b, err)

err = r.Start(context.Background(), componenttest.NewNopHost())
require.NoError(b, err)
b.Cleanup(func() {
err = r.Shutdown(context.Background())
assert.NoError(b, err)
})

records := make([]firehoseRecord, numRecords)
for i := range records {
records[i] = firehoseRecord{
Data: base64.StdEncoding.EncodeToString(
makeCloudWatchMetricRecord(rng, numMetrics, numStreams),
),
}
}

fr := testFirehoseRequest(testFirehoseRequestID, records)
body, err := json.Marshal(fr)
require.NoError(b, err)

b.ResetTimer()
for i := 0; i < b.N; i++ {
req := newTestRequest(body)
recorder := httptest.NewRecorder()
r.(http.Handler).ServeHTTP(recorder, req)
if recorder.Code != http.StatusOK {
b.Fatalf("expected status code 200, got %d", recorder.Code)
}
}
})
}
}
}

func makeCloudWatchLogRecord(rng *rand.Rand, numLogs, numLogGroups int) []byte {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
for i := 0; i < numLogs; i++ {
group := rng.IntN(numLogGroups)
fmt.Fprintf(w,
`{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"group_%d","logStream":"stream","logEvents":[{"id":"the_id","timestamp":1725594035523,"message":"message %d"}]}`,
group, i,
)
fmt.Fprintln(w)
}
if err := w.Close(); err != nil {
panic(err)
}
return buf.Bytes()
}

func makeCloudWatchMetricRecord(rng *rand.Rand, numMetrics, numStreams int) []byte {
var buf bytes.Buffer
for i := 0; i < numMetrics; i++ {
stream := rng.IntN(numStreams)
fmt.Fprintf(&buf,
`{"metric_stream_name":"stream_%d","account_id":"1234567890","region":"us-east-1","namespace":"AWS/NATGateway","metric_name":"metric_%d","dimensions":{"NatGatewayId":"nat-01a4160dfb995b990"},"timestamp":1643916720000,"value":{"max":0.0,"min":0.0,"sum":0.0,"count":2.0},"unit":"Count"}`,
stream, i,
)
fmt.Fprintln(&buf)
}
return buf.Bytes()
}
11 changes: 6 additions & 5 deletions receiver/awsfirehosereceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream"
Expand Down Expand Up @@ -54,19 +55,19 @@ func validateRecordType(recordType string) error {

// defaultMetricsUnmarshalers creates a map of the available metrics
// unmarshalers.
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler {
func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]pmetric.Unmarshaler {
cwmsu := cwmetricstream.NewUnmarshaler(logger)
otlpv1msu := otlpmetricstream.NewUnmarshaler(logger)
return map[string]unmarshaler.MetricsUnmarshaler{
return map[string]pmetric.Unmarshaler{
cwmsu.Type(): cwmsu,
otlpv1msu.Type(): otlpv1msu,
}
}

// defaultLogsUnmarshalers creates a map of the available logs unmarshalers.
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler {
func defaultLogsUnmarshalers(logger *zap.Logger) map[string]plog.Unmarshaler {
u := cwlog.NewUnmarshaler(logger)
return map[string]unmarshaler.LogsUnmarshaler{
return map[string]plog.Unmarshaler{
u.Type(): u,
}
}
Expand Down
13 changes: 11 additions & 2 deletions receiver/awsfirehosereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ go 1.22.0

require (
github.com/gogo/protobuf v1.3.2
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.17.11
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.119.0
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v0.119.0
go.opentelemetry.io/collector/component/componentstatus v0.119.0
Expand All @@ -24,6 +27,7 @@ require (
)

require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.8.0 // indirect
Expand All @@ -32,15 +36,14 @@ require (
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.2 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.119.0 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rs/cors v1.11.1 // indirect
Expand Down Expand Up @@ -76,3 +79,9 @@ retract (
v0.76.1
v0.65.0
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
2 changes: 2 additions & 0 deletions receiver/awsfirehosereceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 1af5afa

Please sign in to comment.