From 4782ad0f0d18f4ad9a8df384d0b65a72d95a27e7 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Mon, 2 Dec 2024 13:41:26 -0800 Subject: [PATCH] [exporter] Flip on queue batcher (#11637) #### Description This PR solves https://github.com/open-telemetry/opentelemetry-collector/issues/10368. Previously we use a pushing model between the queue and the batch, resulting the batch size to be constrained by the `sending_queue.num_consumers`, because the batch cannot accumulate more than `sending_queue.num_consumers` blocked goroutines provide. This PR changes it to a pulling model. We read from the queue until threshold is met or timeout, then allocate a worker to asynchronously send out the request. #### Link to tracking issue Fixes https://github.com/open-telemetry/opentelemetry-collector/issues/10368 https://github.com/open-telemetry/opentelemetry-collector/issues/8122 --------- Co-authored-by: Dmitrii Anoshin --- .chloggen/11637-exporter-queue-batcher.yaml | 28 +++++++++++++++ .../exporterhelper/internal/base_exporter.go | 11 +++--- .../internal/batch_sender_test.go | 1 - .../exporterhelper/internal/queue_sender.go | 34 +++++++++++++++---- .../internal/queue_sender_test.go | 3 +- exporter/internal/queue/default_batcher.go | 5 +++ exporter/internal/queue/disabled_batcher.go | 5 +++ 7 files changed, 74 insertions(+), 13 deletions(-) create mode 100644 .chloggen/11637-exporter-queue-batcher.yaml diff --git a/.chloggen/11637-exporter-queue-batcher.yaml b/.chloggen/11637-exporter-queue-batcher.yaml new file mode 100644 index 00000000000..a52f261ccf7 --- /dev/null +++ b/.chloggen/11637-exporter-queue-batcher.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterqueue + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Introduce a feature gate exporter.UsePullingBasedExporterQueueBatcher to use the new pulling model in exporter queue batching. + +# One or more tracking issues or pull requests related to the change +issues: [8122, 10368] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + If both queuing and batching is enabled for exporter, we now use a pulling model instead of a + pushing model. num_consumer in queue configuration is now used to specify the maximum number of + concurrent workers that are sending out the request. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 98b096ed9b7..a76a725981f 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -26,9 +26,9 @@ import ( ) var usePullingBasedExporterQueueBatcher = featuregate.GlobalRegistry().MustRegister( - "telemetry.UsePullingBasedExporterQueueBatcher", - featuregate.StageBeta, - featuregate.WithRegisterFromVersion("v0.114.0"), + "exporter.UsePullingBasedExporterQueueBatcher", + featuregate.StageAlpha, + featuregate.WithRegisterFromVersion("v0.115.0"), featuregate.WithRegisterDescription("if set to true, turns on the pulling-based exporter queue bathcer"), ) @@ -102,13 +102,14 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe ExporterSettings: be.Set, }, be.queueCfg) - be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep) + be.QueueSender = NewQueueSender(q, be.Set, be.queueCfg.NumConsumers, be.ExportFailureMessage, be.Obsrep, be.BatcherCfg) for _, op := range options { err = multierr.Append(err, op(be)) } } - if be.BatcherCfg.Enabled { + if !usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled || + usePullingBasedExporterQueueBatcher.IsEnabled() && be.BatcherCfg.Enabled && !be.queueCfg.Enabled { bs := NewBatchSender(be.BatcherCfg, be.Set) be.BatchSender = bs } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index 61edb5cf9fd..020b2a8d169 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -158,7 +158,6 @@ func TestBatchSender_BatchExportError(t *testing.T) { assert.Eventually(t, func() bool { return sink.requestsCount.Load() == tt.expectedRequests && sink.itemsCount.Load() == tt.expectedItems && - be.BatchSender.(*BatchSender).activeRequests.Load() == 0 && be.QueueSender.(*QueueSender).queue.Size() == 0 }, 100*time.Millisecond, 10*time.Millisecond) }) diff --git a/exporter/exporterhelper/internal/queue_sender.go b/exporter/exporterhelper/internal/queue_sender.go index 9b53a340146..bc4273cd620 100644 --- a/exporter/exporterhelper/internal/queue_sender.go +++ b/exporter/exporterhelper/internal/queue_sender.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/exporter/internal/queue" @@ -71,14 +72,20 @@ type QueueSender struct { queue exporterqueue.Queue[internal.Request] numConsumers int traceAttribute attribute.KeyValue + batcher queue.Batcher consumers *queue.Consumers[internal.Request] obsrep *ObsReport exporterID component.ID } -func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settings, numConsumers int, - exportFailureMessage string, obsrep *ObsReport) *QueueSender { +func NewQueueSender( + q exporterqueue.Queue[internal.Request], + set exporter.Settings, + numConsumers int, + exportFailureMessage string, + obsrep *ObsReport, + batcherCfg exporterbatcher.Config) *QueueSender { qs := &QueueSender{ queue: q, numConsumers: numConsumers, @@ -86,7 +93,8 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin obsrep: obsrep, exporterID: set.ID, } - consumeFunc := func(ctx context.Context, req internal.Request) error { + + exportFunc := func(ctx context.Context, req internal.Request) error { err := qs.NextSender.Send(ctx, req) if err != nil { set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, @@ -94,7 +102,11 @@ func NewQueueSender(q exporterqueue.Queue[internal.Request], set exporter.Settin } return err } - qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, consumeFunc) + if usePullingBasedExporterQueueBatcher.IsEnabled() { + qs.batcher, _ = queue.NewBatcher(batcherCfg, q, exportFunc, numConsumers) + } else { + qs.consumers = queue.NewQueueConsumers[internal.Request](q, numConsumers, exportFunc) + } return qs } @@ -103,8 +115,15 @@ func (qs *QueueSender) Start(ctx context.Context, host component.Host) error { if err := qs.queue.Start(ctx, host); err != nil { return err } - if err := qs.consumers.Start(ctx, host); err != nil { - return err + + if usePullingBasedExporterQueueBatcher.IsEnabled() { + if err := qs.batcher.Start(ctx, host); err != nil { + return err + } + } else { + if err := qs.consumers.Start(ctx, host); err != nil { + return err + } } dataTypeAttr := attribute.String(DataTypeKey, qs.obsrep.Signal.String()) @@ -123,6 +142,9 @@ func (qs *QueueSender) Shutdown(ctx context.Context) error { if err := qs.queue.Shutdown(ctx); err != nil { return err } + if usePullingBasedExporterQueueBatcher.IsEnabled() { + return qs.batcher.Shutdown(ctx) + } return qs.consumers.Shutdown(ctx) } diff --git a/exporter/exporterhelper/internal/queue_sender_test.go b/exporter/exporterhelper/internal/queue_sender_test.go index 44735eaebfa..41dbbdbc38d 100644 --- a/exporter/exporterhelper/internal/queue_sender_test.go +++ b/exporter/exporterhelper/internal/queue_sender_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal" @@ -540,7 +541,7 @@ func TestQueueSenderNoStartShutdown(t *testing.T) { ExporterCreateSettings: exportertest.NewNopSettings(), }) require.NoError(t, err) - qs := NewQueueSender(queue, set, 1, "", obsrep) + qs := NewQueueSender(queue, set, 1, "", obsrep, exporterbatcher.NewDefaultConfig()) assert.NoError(t, qs.Shutdown(context.Background())) }) } diff --git a/exporter/internal/queue/default_batcher.go b/exporter/internal/queue/default_batcher.go index a815a83bba9..2884159341b 100644 --- a/exporter/internal/queue/default_batcher.go +++ b/exporter/internal/queue/default_batcher.go @@ -132,6 +132,11 @@ func (qb *DefaultBatcher) startTimeBasedFlushingGoroutine() { // Start starts the goroutine that reads from the queue and flushes asynchronously. func (qb *DefaultBatcher) Start(_ context.Context, _ component.Host) error { + // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. + if qb.maxWorkers == -1 { + return nil + } + qb.startWorkerPool() qb.shutdownCh = make(chan bool, 1) diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go index 97a3fd32510..db08c546b4a 100644 --- a/exporter/internal/queue/disabled_batcher.go +++ b/exporter/internal/queue/disabled_batcher.go @@ -17,6 +17,11 @@ type DisabledBatcher struct { // Start starts the goroutine that reads from the queue and flushes asynchronously. func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { + // maxWorker being -1 means batcher is disabled. This is for testing queue sender metrics. + if qb.maxWorkers == -1 { + return nil + } + qb.startWorkerPool() // This goroutine reads and then flushes.