From da4053193fced4db5ec70677c3e93bd7f15392a2 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Fri, 22 Dec 2023 16:35:46 -0800 Subject: [PATCH] [exporterhelper] Add WithRequestQueue option to the exporter The new configuration interface for the end users provides a new `queue_size_items` option to limit the queue by a number of spans, log records, or metric data points. The previous way to limit the queue by number of requests is preserved under the same field, `queue_size,` which will later be deprecated through a longer transition process. --- .chloggen/exporter-helper-v2.yaml | 32 +++ exporter/exporterhelper/common.go | 43 +++- exporter/exporterhelper/logs.go | 2 +- exporter/exporterhelper/logs_test.go | 2 +- exporter/exporterhelper/metrics.go | 2 +- exporter/exporterhelper/metrics_test.go | 2 +- exporter/exporterhelper/queue_sender.go | 34 +--- exporter/exporterhelper/queue_sender_test.go | 183 ++++++++++++------ exporter/exporterhelper/retry_sender.go | 2 +- exporter/exporterhelper/traces.go | 2 +- exporter/exporterhelper/traces_test.go | 2 +- exporter/exporterqueue/config.go | 69 +++++++ exporter/exporterqueue/config_test.go | 30 +++ exporter/exporterqueue/queue.go | 69 +++++++ .../internal/bounded_memory_queue.go | 2 +- .../internal/bounded_memory_queue_test.go | 0 .../internal/consumers.go | 2 +- exporter/{exporterhelper => }/internal/err.go | 2 +- .../internal/mock_storage.go | 2 +- .../internal/package_test.go | 0 .../internal/persistent_queue.go | 2 +- .../internal/persistent_queue_test.go | 0 .../{exporterhelper => }/internal/queue.go | 2 +- .../internal/queue_capacity.go | 2 +- .../internal/queue_capacity_test.go | 0 25 files changed, 387 insertions(+), 101 deletions(-) create mode 100644 .chloggen/exporter-helper-v2.yaml create mode 100644 exporter/exporterqueue/config.go create mode 100644 exporter/exporterqueue/config_test.go create mode 100644 exporter/exporterqueue/queue.go rename exporter/{exporterhelper => }/internal/bounded_memory_queue.go (98%) rename exporter/{exporterhelper => }/internal/bounded_memory_queue_test.go (100%) rename exporter/{exporterhelper => }/internal/consumers.go (97%) rename exporter/{exporterhelper => }/internal/err.go (93%) rename exporter/{exporterhelper => }/internal/mock_storage.go (99%) rename exporter/{exporterhelper => }/internal/package_test.go (100%) rename exporter/{exporterhelper => }/internal/persistent_queue.go (99%) rename exporter/{exporterhelper => }/internal/persistent_queue_test.go (100%) rename exporter/{exporterhelper => }/internal/queue.go (98%) rename exporter/{exporterhelper => }/internal/queue_capacity.go (98%) rename exporter/{exporterhelper => }/internal/queue_capacity_test.go (100%) diff --git a/.chloggen/exporter-helper-v2.yaml b/.chloggen/exporter-helper-v2.yaml new file mode 100644 index 00000000000..368da4ba2db --- /dev/null +++ b/.chloggen/exporter-helper-v2.yaml @@ -0,0 +1,32 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add API for enabling queue in the new exporter helpers. + +# One or more tracking issues or pull requests related to the change +issues: [7874] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The following experimental API is introduced in exporter package: + - `exporterhelper.WithRequestQueue`: a new exporter helper option for using a queue. + - `exporterqueue.Queue`: an interface for queue implementations. + - `exporterqueue.Factory`: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option. + - `exporterqueue.Settings`: queue factory settings. + - `exporterqueue.Config`: common configuration for queue implementations. + - `exporterqueue.NewDefaultConfig`: a function for creating a default queue configuration. + - `exporterqueue.NewMemoryQueueFactory`: a new factory for creating a memory queue. + - `exporterqueue.NewPersistentQueueFactory: a factory for creating a persistent queue. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index b5e7aa39a33..b70a9552101 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -13,6 +13,8 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/internal" ) // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). @@ -85,17 +87,48 @@ func WithRetry(config configretry.BackOffConfig) Option { func WithQueue(config QueueSettings) Option { return func(o *baseExporter) { if o.requestExporter { - panic("queueing is not available for the new request exporters yet") + panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead") } if !config.Enabled { o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return } - consumeErrHandler := func(err error, req Request) { - o.set.Logger.Error("Exporting failed. Dropping data."+o.exportFailureMessage, - zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) + qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{ + Marshaler: o.marshaler, + Unmarshaler: o.unmarshaler, + }) + queue := qf(exporterqueue.Settings[Request]{ + Sizer: &internal.RequestSizer[Request]{}, + Capacity: config.QueueSize, + DataType: o.signal, + ExporterSettings: o.set, + }) + o.queueSender = newQueueSender(queue, o.set, config.NumConsumers, o.exportFailureMessage) + } +} + +// WithRequestQueue enables queueing for an exporter. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option { + return func(o *baseExporter) { + if !cfg.Enabled { + o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." + return + } + set := exporterqueue.Settings[Request]{ + DataType: o.signal, + ExporterSettings: o.set, + } + if cfg.QueueSizeItems > 0 { + set.Sizer = &internal.ItemsSizer[Request]{} + set.Capacity = cfg.QueueSizeItems + } else { + set.Sizer = &internal.RequestSizer[Request]{} + set.Capacity = cfg.QueueSize } - o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler, consumeErrHandler) + o.queueSender = newQueueSender(queueFactory(set), o.set, cfg.NumConsumers, o.exportFailureMessage) } } diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index f08bf0e6da6..e49533bf53d 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -13,7 +13,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/pdata/plog" ) diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 92bbd48edac..e5c57db8fee 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -25,8 +25,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/plog" diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 7360d548962..7c19ca19666 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -13,7 +13,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/pdata/pmetric" ) diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 770ea801e48..76cd3e1a727 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -25,8 +25,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/pmetric" diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index 8a2e71e0b05..d2432cfdb2b 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -17,7 +17,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterqueue" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) @@ -72,7 +73,7 @@ func (qCfg *QueueSettings) Validate() error { type queueSender struct { baseRequestSender fullName string - queue internal.Queue[Request] + queue exporterqueue.Queue[Request] traceAttribute attribute.KeyValue logger *zap.Logger meter otelmetric.Meter @@ -82,28 +83,8 @@ type queueSender struct { metricSize otelmetric.Int64ObservableGauge } -func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType, - marshaler RequestMarshaler, unmarshaler RequestUnmarshaler, consumeErrHandler func(error, Request)) *queueSender { - - isPersistent := config.StorageID != nil - var queue internal.Queue[Request] - queueSizer := &internal.RequestSizer[Request]{} - if isPersistent { - queue = internal.NewPersistentQueue[Request](internal.PersistentQueueSettings[Request]{ - Sizer: queueSizer, - Capacity: config.QueueSize, - DataType: signal, - StorageID: *config.StorageID, - Marshaler: marshaler, - Unmarshaler: unmarshaler, - ExporterSettings: set, - }) - } else { - queue = internal.NewBoundedMemoryQueue[Request](internal.MemoryQueueSettings[Request]{ - Sizer: queueSizer, - Capacity: config.QueueSize, - }) - } +func newQueueSender(queue exporterqueue.Queue[Request], set exporter.CreateSettings, numConsumers int, + exportFailureMessage string) *queueSender { qs := &queueSender{ fullName: set.ID.String(), queue: queue, @@ -114,11 +95,12 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co consumeFunc := func(ctx context.Context, req Request) error { err := qs.nextSender.send(ctx, req) if err != nil { - consumeErrHandler(err, req) + set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage, + zap.Error(err), zap.Int("dropped_items", req.ItemsCount())) } return err } - qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, consumeFunc) + qs.consumers = internal.NewQueueConsumers[Request](queue, numConsumers, consumeFunc) return qs } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index d56ec6d78b1..65df1ab8a4e 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -18,8 +18,9 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal" ) func TestQueuedRetry_StopWhileWaiting(t *testing.T) { @@ -101,41 +102,85 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) { } func TestQueuedRetryHappyPath(t *testing.T) { - tt, err := componenttest.SetupTelemetry(defaultID) - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - - qCfg := NewDefaultQueueSettings() - rCfg := configretry.NewDefaultBackOffConfig() - set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) - require.NoError(t, err) - ocs := be.obsrepSender.(*observabilityConsumerSender) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - assert.NoError(t, be.Shutdown(context.Background())) - }) - - wantRequests := 10 - reqs := make([]*mockRequest, 0, 10) - for i := 0; i < wantRequests; i++ { - ocs.run(func() { - req := newMockRequest(2, nil) - reqs = append(reqs, req) - require.NoError(t, be.send(context.Background(), req)) - }) + tests := []struct { + name string + queueOption Option + }{ + { + name: "WithQueue", + queueOption: WithQueue(QueueSettings{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }), + }, + { + name: "WithRequestQueue/MemoryQueueFactory", + queueOption: WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSizeItems: 21, + NumConsumers: 1, + }, exporterqueue.NewMemoryQueueFactory[Request]()), + }, + { + name: "WithRequestQueue/PersistentQueueFactory", + queueOption: WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSizeItems: 21, + NumConsumers: 1, + }, exporterqueue.NewPersistentQueueFactory[Request](nil, exporterqueue.PersistentQueueSettings[Request]{})), + }, + { + name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit", + queueOption: WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }, exporterqueue.NewPersistentQueueFactory[Request](nil, exporterqueue.PersistentQueueSettings[Request]{})), + }, } - - // Wait until all batches received - ocs.awaitAsyncProcessing() - - require.Len(t, reqs, wantRequests) - for _, req := range reqs { - req.checkNumRequests(t, 1) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tel, err := componenttest.SetupTelemetry(defaultID) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) + + rCfg := configretry.NewDefaultBackOffConfig() + set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), tt.queueOption) + require.NoError(t, err) + ocs := be.obsrepSender.(*observabilityConsumerSender) + + wantRequests := 10 + reqs := make([]*mockRequest, 0, 10) + for i := 0; i < wantRequests; i++ { + ocs.run(func() { + req := newMockRequest(2, nil) + reqs = append(reqs, req) + require.NoError(t, be.send(context.Background(), req)) + }) + } + + // expect queue to be full + require.Error(t, be.send(context.Background(), newMockRequest(2, nil))) + + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + assert.NoError(t, be.Shutdown(context.Background())) + }) + + // Wait until all batches received + ocs.awaitAsyncProcessing() + + require.Len(t, reqs, wantRequests) + for _, req := range reqs { + req.checkNumRequests(t, 1) + } + + ocs.checkSendItemsCount(t, 2*wantRequests) + ocs.checkDroppedItemsCount(t, 0) + }) } - - ocs.checkSendItemsCount(t, 2*wantRequests) - ocs.checkDroppedItemsCount(t, 0) } func TestQueuedRetry_QueueMetricsReported(t *testing.T) { tt, err := componenttest.SetupTelemetry(defaultID) @@ -193,27 +238,52 @@ func TestQueueSettings_Validate(t *testing.T) { } func TestQueueRetryWithDisabledQueue(t *testing.T) { - qs := NewDefaultQueueSettings() - qs.Enabled = false - set := exportertest.NewNopCreateSettings() - logger, observed := observer.New(zap.ErrorLevel) - set.Logger = zap.New(logger) - be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, - WithQueue(qs)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - ocs := be.obsrepSender.(*observabilityConsumerSender) - mockR := newMockRequest(2, errors.New("some error")) - ocs.run(func() { - require.Error(t, be.send(context.Background(), mockR)) - }) - assert.Len(t, observed.All(), 1) - assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message) - ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) - ocs.checkSendItemsCount(t, 0) - ocs.checkDroppedItemsCount(t, 2) - require.NoError(t, be.Shutdown(context.Background())) + tests := []struct { + name string + queueOption Option + }{ + { + name: "WithQueue", + queueOption: func() Option { + qs := NewDefaultQueueSettings() + qs.Enabled = false + return WithQueue(qs) + }(), + }, + { + name: "WithRequestQueue", + queueOption: func() Option { + qs := exporterqueue.NewDefaultConfig() + qs.Enabled = false + return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[Request]()) + }(), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + set := exportertest.NewNopCreateSettings() + logger, observed := observer.New(zap.ErrorLevel) + set.Logger = zap.New(logger) + be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, + tt.queueOption) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(2, errors.New("some error")) + ocs.run(func() { + require.Error(t, be.send(context.Background(), mockR)) + }) + assert.Len(t, observed.All(), 1) + assert.Equal(t, "Exporting failed. Rejecting data. Try enabling sending_queue to survive temporary failures.", observed.All()[0].Message) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) + }) + } + } func TestQueueFailedRequestDropped(t *testing.T) { @@ -323,7 +393,8 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { } func TestQueueSenderNoStartShutdown(t *testing.T) { - qs := newQueueSender(NewDefaultQueueSettings(), exportertest.NewNopCreateSettings(), "", nil, nil, nil) + queue := internal.NewBoundedMemoryQueue[Request](internal.MemoryQueueSettings[Request]{}) + qs := newQueueSender(queue, exportertest.NewNopCreateSettings(), 1, "") assert.NoError(t, qs.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 1bf24157898..b5f6164e81a 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -17,7 +17,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" ) diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 53c6533cc2d..005ffad0d1d 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -13,7 +13,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/pdata/ptrace" ) diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index d99b9025c91..3df47b25f29 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -25,8 +25,8 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/pdata/ptrace" diff --git a/exporter/exporterqueue/config.go b/exporter/exporterqueue/config.go new file mode 100644 index 00000000000..4da5ebc3869 --- /dev/null +++ b/exporter/exporterqueue/config.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +import ( + "errors" + + "go.opentelemetry.io/collector/component" +) + +// Config defines configuration for queueing requests before exporting. +// It's supposed to be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Config struct { + // Enabled indicates whether to not enqueue batches before exporting. + Enabled bool `mapstructure:"enabled"` + // NumConsumers is the number of consumers from the queue. + NumConsumers int `mapstructure:"num_consumers"` + // QueueSizeItems is the maximum number of items (spans, metric data points or log records) + // allowed in queue at any given time. + QueueSizeItems int `mapstructure:"queue_size_items"` + // QueueSize is the maximum number of requests allowed in queue at any given time. + // This option is left for backward compatibility and will be deprecated in the future. + // It's recommended to use QueueSizeItems instead. + QueueSize int `mapstructure:"queue_size"` +} + +// NewDefaultConfig returns the default Config. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewDefaultConfig() Config { + return Config{ + Enabled: true, + NumConsumers: 10, + QueueSizeItems: 100_000, + } +} + +// Validate checks if the QueueSettings configuration is valid +func (qCfg *Config) Validate() error { + if !qCfg.Enabled { + return nil + } + if qCfg.NumConsumers <= 0 { + return errors.New("number of consumers must be positive") + } + if qCfg.QueueSizeItems > 0 && qCfg.QueueSize > 0 { + return errors.New("only one of 'queue_size' and 'queue_size_items' can be specified") + } + if qCfg.QueueSizeItems <= 0 && qCfg.QueueSize <= 0 { + return errors.New("queue size must be positive") + } + return nil +} + +// PersistentQueueConfig defines configuration for queueing requests in a persistent storage. +// The struct is provided to be added in the exporter configuration as one struct under the "sending_queue" key. +// The exporter helper Go interface requires the fields to be provided separately to WithRequestQueue and +// NewPersistentQueueFactory. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type PersistentQueueConfig struct { + Config `mapstructure:",squash"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *component.ID `mapstructure:"storage"` +} diff --git a/exporter/exporterqueue/config_test.go b/exporter/exporterqueue/config_test.go new file mode 100644 index 00000000000..bf85ef7dd36 --- /dev/null +++ b/exporter/exporterqueue/config_test.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueueConfig_Validate(t *testing.T) { + qCfg := NewDefaultConfig() + assert.NoError(t, qCfg.Validate()) + + qCfg.NumConsumers = 0 + assert.EqualError(t, qCfg.Validate(), "number of consumers must be positive") + + qCfg = NewDefaultConfig() + qCfg.QueueSizeItems = 0 + assert.EqualError(t, qCfg.Validate(), "queue size must be positive") + + qCfg.QueueSize = 1 + qCfg.QueueSizeItems = 1 + assert.EqualError(t, qCfg.Validate(), "only one of 'queue_size' and 'queue_size_items' can be specified") + + // Confirm Validate doesn't return error with invalid config when feature is disabled + qCfg.Enabled = false + assert.NoError(t, qCfg.Validate()) +} diff --git a/exporter/exporterqueue/queue.go b/exporter/exporterqueue/queue.go new file mode 100644 index 00000000000..9befb623eee --- /dev/null +++ b/exporter/exporterqueue/queue.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterqueue // import "go.opentelemetry.io/collector/exporter/exporterqueue" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/internal" +) + +// Queue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue +// (boundedMemoryQueue) or via a disk-based queue (persistentQueue) +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Queue[T any] internal.Queue[T] + +// Settings defines settings for creating a queue. +type Settings[T any] struct { + Sizer internal.Sizer[T] + Capacity int + DataType component.DataType + ExporterSettings exporter.CreateSettings +} + +type Factory[T any] func(Settings[T]) Queue[T] + +// NewMemoryQueueFactory returns a factory to create a new memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewMemoryQueueFactory[T any]() Factory[T] { + return func(set Settings[T]) Queue[T] { + return internal.NewBoundedMemoryQueue[T](internal.MemoryQueueSettings[T]{ + Sizer: set.Sizer, + Capacity: set.Capacity, + }) + } +} + +// PersistentQueueSettings defines developer settings for the persistent queue factory. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type PersistentQueueSettings[T any] struct { + // Marshaler is used to serialize queue elements before storing them in the persistent storage. + Marshaler func(req T) ([]byte, error) + // Unmarshaler is used to deserialize requests after reading them from the persistent storage. + Unmarshaler func(data []byte) (T, error) +} + +// NewPersistentQueueFactory returns a factory to create a new persistent queue. +// If cfg.StorageID is nil then it falls back to memory queue. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewPersistentQueueFactory[T any](storageID *component.ID, factorySettings PersistentQueueSettings[T]) Factory[T] { + if storageID == nil { + return NewMemoryQueueFactory[T]() + } + return func(set Settings[T]) Queue[T] { + return internal.NewPersistentQueue[T](internal.PersistentQueueSettings[T]{ + Sizer: set.Sizer, + Capacity: set.Capacity, + DataType: set.DataType, + StorageID: *storageID, + Marshaler: factorySettings.Marshaler, + Unmarshaler: factorySettings.Unmarshaler, + ExporterSettings: set.ExporterSettings, + }) + } +} diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/internal/bounded_memory_queue.go similarity index 98% rename from exporter/exporterhelper/internal/bounded_memory_queue.go rename to exporter/internal/bounded_memory_queue.go index 85435d2aa61..e6f6679959d 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/internal/bounded_memory_queue.go @@ -3,7 +3,7 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package internal // import "go.opentelemetry.io/collector/exporter/internal" import ( "context" diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/internal/bounded_memory_queue_test.go similarity index 100% rename from exporter/exporterhelper/internal/bounded_memory_queue_test.go rename to exporter/internal/bounded_memory_queue_test.go diff --git a/exporter/exporterhelper/internal/consumers.go b/exporter/internal/consumers.go similarity index 97% rename from exporter/exporterhelper/internal/consumers.go rename to exporter/internal/consumers.go index 88b729ebfed..32431ea7e72 100644 --- a/exporter/exporterhelper/internal/consumers.go +++ b/exporter/internal/consumers.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package internal // import "go.opentelemetry.io/collector/exporter/internal" import ( "context" diff --git a/exporter/exporterhelper/internal/err.go b/exporter/internal/err.go similarity index 93% rename from exporter/exporterhelper/internal/err.go rename to exporter/internal/err.go index c93bd92f556..7f542767359 100644 --- a/exporter/exporterhelper/internal/err.go +++ b/exporter/internal/err.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package internal // import "go.opentelemetry.io/collector/exporter/internal" type shutdownErr struct { err error diff --git a/exporter/exporterhelper/internal/mock_storage.go b/exporter/internal/mock_storage.go similarity index 99% rename from exporter/exporterhelper/internal/mock_storage.go rename to exporter/internal/mock_storage.go index 507e1e6946e..12c54249e92 100644 --- a/exporter/exporterhelper/internal/mock_storage.go +++ b/exporter/internal/mock_storage.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package internal // import "go.opentelemetry.io/collector/exporter/internal" import ( "context" diff --git a/exporter/exporterhelper/internal/package_test.go b/exporter/internal/package_test.go similarity index 100% rename from exporter/exporterhelper/internal/package_test.go rename to exporter/internal/package_test.go diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/internal/persistent_queue.go similarity index 99% rename from exporter/exporterhelper/internal/persistent_queue.go rename to exporter/internal/persistent_queue.go index 0a6629cb87e..a1352217bfe 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/internal/persistent_queue.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package internal // import "go.opentelemetry.io/collector/exporter/internal" import ( "context" diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/internal/persistent_queue_test.go similarity index 100% rename from exporter/exporterhelper/internal/persistent_queue_test.go rename to exporter/internal/persistent_queue_test.go diff --git a/exporter/exporterhelper/internal/queue.go b/exporter/internal/queue.go similarity index 98% rename from exporter/exporterhelper/internal/queue.go rename to exporter/internal/queue.go index 8bd8879a940..1cee90f00ff 100644 --- a/exporter/exporterhelper/internal/queue.go +++ b/exporter/internal/queue.go @@ -3,7 +3,7 @@ // Copyright (c) 2017 Uber Technologies, Inc. // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package internal // import "go.opentelemetry.io/collector/exporter/internal" import ( "context" diff --git a/exporter/exporterhelper/internal/queue_capacity.go b/exporter/internal/queue_capacity.go similarity index 98% rename from exporter/exporterhelper/internal/queue_capacity.go rename to exporter/internal/queue_capacity.go index 0466de0d8b2..a84682e4845 100644 --- a/exporter/exporterhelper/internal/queue_capacity.go +++ b/exporter/internal/queue_capacity.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" +package internal // import "go.opentelemetry.io/collector/exporter/internal" import ( "sync/atomic" diff --git a/exporter/exporterhelper/internal/queue_capacity_test.go b/exporter/internal/queue_capacity_test.go similarity index 100% rename from exporter/exporterhelper/internal/queue_capacity_test.go rename to exporter/internal/queue_capacity_test.go