Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelconsumer: handle entity too large errors #41523

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package otelconsumer
import (
"context"
"fmt"
"strings"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"

"go.opentelemetry.io/collector/consumer"
Expand All @@ -42,6 +44,7 @@ type otelConsumer struct {
observer outputs.Observer
logsConsumer consumer.Logs
beatInfo beat.Info
log *logp.Logger
}

func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *config.C) (outputs.Group, error) {
Expand All @@ -50,6 +53,7 @@ func makeOtelConsumer(_ outputs.IndexManager, beat beat.Info, observer outputs.O
observer: observer,
logsConsumer: beat.LogConsumer,
beatInfo: beat,
log: logp.NewLogger("otelconsumer"),
}

ocConfig := defaultConfig()
Expand Down Expand Up @@ -99,6 +103,30 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch)

err := out.logsConsumer.ConsumeLogs(ctx, pLogs)
if err != nil {
// If the batch is too large, the elasticsearchexporter will
// return a 413 error.
//
// At the moment, the exporter does not support batch splitting
// on error so we do it here.
//
// See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163.
if strings.Contains(err.Error(), "Request Entity Too Large") {
rdner marked this conversation as resolved.
Show resolved Hide resolved
// Try and split the batch into smaller batches and retry
if batch.SplitRetry() {
st.BatchSplit()
st.RetryableErrors(len(events))
} else {
// If the batch could not be split, there is no option left but
// to drop it and log the error state.
batch.Drop()
st.PermanentErrors(len(events))
out.log.Errorf("the batch is too large to be sent: %v", err)
}

// Don't propagate the error, the batch was split and retried.
return nil
}

// Permanent errors shouldn't be retried. This tipically means
// the data cannot be serialized by the exporter that is attached
// to the pipeline or when the destination refuses the data because
Expand Down
31 changes: 31 additions & 0 deletions libbeat/outputs/otelconsumer/otelconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/outputs/outest"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)

Expand All @@ -46,12 +47,15 @@ func TestPublish(t *testing.T) {
makeOtelConsumer := func(t *testing.T, consumeFn func(ctx context.Context, ld plog.Logs) error) *otelConsumer {
t.Helper()

assert.NoError(t, logp.TestingSetup(logp.WithSelectors("otelconsumer")))

logConsumer, err := consumer.NewLogs(consumeFn)
assert.NoError(t, err)
consumer := &otelConsumer{
observer: outputs.NewNilObserver(),
logsConsumer: logConsumer,
beatInfo: beat.Info{},
log: logp.NewLogger("otelconsumer"),
}
return consumer
}
Expand Down Expand Up @@ -86,6 +90,33 @@ func TestPublish(t *testing.T) {
assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag)
})

t.Run("split batch on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 1)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
})

t.Run("drop batch if can't split on entity too large error", func(t *testing.T) {
batch := outest.NewBatch(event1)

otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error {
return errors.New("Request Entity Too Large")
})

err := otelConsumer.Publish(ctx, batch)
assert.NoError(t, err)
assert.Len(t, batch.Signals, 2)
assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag)
assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag)
})

t.Run("drop batch on permanent consumer error", func(t *testing.T) {
batch := outest.NewBatch(event1, event2, event3)

Expand Down
Loading