From dbeb9cda93620f8c28f24e4b97b3033db93a2f7f Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 10 Dec 2024 09:09:22 -0300 Subject: [PATCH] otelconsumer: remove temporary handling of entity too large (#41911) This code was initially added in #41523 because of a limitation from the elasticsearch exporter. The elasticsearch exporter has been updated to enforce flush::max_bytes for the batcher extension and will automatically split the batch if it exceeds the limit. This error is now fixed in the collector v0.115.0. See https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/36396. --- libbeat/outputs/otelconsumer/otelconsumer.go | 25 ----------------- .../outputs/otelconsumer/otelconsumer_test.go | 27 ------------------- 2 files changed, 52 deletions(-) diff --git a/libbeat/outputs/otelconsumer/otelconsumer.go b/libbeat/outputs/otelconsumer/otelconsumer.go index ca5da5308e5..7b2f6fe93e5 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer.go +++ b/libbeat/outputs/otelconsumer/otelconsumer.go @@ -20,7 +20,6 @@ package otelconsumer import ( "context" "fmt" - "strings" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -103,30 +102,6 @@ func (out *otelConsumer) logsPublish(ctx context.Context, batch publisher.Batch) err := out.logsConsumer.ConsumeLogs(ctx, pLogs) if err != nil { - // If the batch is too large, the elasticsearchexporter will - // return a 413 error. - // - // At the moment, the exporter does not support batch splitting - // on error so we do it here. - // - // See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/36163. - if strings.Contains(err.Error(), "Request Entity Too Large") { - // Try and split the batch into smaller batches and retry - if batch.SplitRetry() { - st.BatchSplit() - st.RetryableErrors(len(events)) - } else { - // If the batch could not be split, there is no option left but - // to drop it and log the error state. - batch.Drop() - st.PermanentErrors(len(events)) - out.log.Errorf("the batch is too large to be sent: %v", err) - } - - // Don't propagate the error, the batch was split and retried. - return nil - } - // Permanent errors shouldn't be retried. This tipically means // the data cannot be serialized by the exporter that is attached // to the pipeline or when the destination refuses the data because diff --git a/libbeat/outputs/otelconsumer/otelconsumer_test.go b/libbeat/outputs/otelconsumer/otelconsumer_test.go index 2751ce7f721..6a197d806c1 100644 --- a/libbeat/outputs/otelconsumer/otelconsumer_test.go +++ b/libbeat/outputs/otelconsumer/otelconsumer_test.go @@ -90,33 +90,6 @@ func TestPublish(t *testing.T) { assert.Equal(t, outest.BatchRetry, batch.Signals[0].Tag) }) - t.Run("split batch on entity too large error", func(t *testing.T) { - batch := outest.NewBatch(event1, event2, event3) - - otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { - return errors.New("Request Entity Too Large") - }) - - err := otelConsumer.Publish(ctx, batch) - assert.NoError(t, err) - assert.Len(t, batch.Signals, 1) - assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag) - }) - - t.Run("drop batch if can't split on entity too large error", func(t *testing.T) { - batch := outest.NewBatch(event1) - - otelConsumer := makeOtelConsumer(t, func(ctx context.Context, ld plog.Logs) error { - return errors.New("Request Entity Too Large") - }) - - err := otelConsumer.Publish(ctx, batch) - assert.NoError(t, err) - assert.Len(t, batch.Signals, 2) - assert.Equal(t, outest.BatchSplitRetry, batch.Signals[0].Tag) - assert.Equal(t, outest.BatchDrop, batch.Signals[1].Tag) - }) - t.Run("drop batch on permanent consumer error", func(t *testing.T) { batch := outest.NewBatch(event1, event2, event3)