Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporterhelper] Rename Timeout/QueueSettings to Config #11132

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .chloggen/6767-exporterhelper-rename-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'deprecation'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Deprecate TimeoutSettings/QueueSettings in favor of TimeoutConfig/QueueConfig."

# One or more tracking issues or pull requests related to the change
issues: [6767]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
6 changes: 3 additions & 3 deletions exporter/debugexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func createTracesExporter(ctx context.Context, set exporter.Settings, config com
return exporterhelper.NewTracesExporter(ctx, set, config,
debugExporter.pushTraces,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
exporterhelper.WithShutdown(otlptext.LoggerSync(exporterLogger)),
)
}
Expand All @@ -66,7 +66,7 @@ func createMetricsExporter(ctx context.Context, set exporter.Settings, config co
return exporterhelper.NewMetricsExporter(ctx, set, config,
debugExporter.pushMetrics,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
exporterhelper.WithShutdown(otlptext.LoggerSync(exporterLogger)),
)
}
Expand All @@ -78,7 +78,7 @@ func createLogsExporter(ctx context.Context, set exporter.Settings, config compo
return exporterhelper.NewLogsExporter(ctx, set, config,
debugExporter.pushLogs,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithTimeout(exporterhelper.TimeoutConfig{Timeout: 0}),
exporterhelper.WithShutdown(otlptext.LoggerSync(exporterLogger)),
)
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func TestBatchSender_ShutdownDeadlock(t *testing.T) {
func TestBatchSenderWithTimeout(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 10
tCfg := NewDefaultTimeoutSettings()
tCfg := NewDefaultTimeoutConfig()
tCfg.Timeout = 50 * time.Millisecond
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)),
Expand Down
16 changes: 8 additions & 8 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func WithShutdown(shutdown component.ShutdownFunc) Option {
}
}

// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) Option {
// WithTimeout overrides the default TimeoutConfig for an exporter.
// The default TimeoutConfig is 5 seconds.
func WithTimeout(timeoutConfig TimeoutConfig) Option {
return func(o *baseExporter) error {
o.timeoutSender.cfg = timeoutSettings
o.timeoutSender.cfg = timeoutConfig
return nil
}
}
Expand All @@ -86,10 +86,10 @@ func WithRetry(config configretry.BackOffConfig) Option {
}
}

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
func WithQueue(config QueueConfig) Option {
return func(o *baseExporter) error {
if o.marshaler == nil || o.unmarshaler == nil {
return fmt.Errorf("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
Expand Down Expand Up @@ -252,7 +252,7 @@ func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsre
queueSender: &baseRequestSender{},
obsrepSender: osf(obsReport),
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutConfig()},

set: set,
obsrep: obsReport,
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestBaseExporterWithOptions(t *testing.T) {
defaultSettings, defaultDataType, newNoopObsrepSender,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings()),
WithTimeout(NewDefaultTimeoutConfig()),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -73,7 +73,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
require.Nil(t, bs.marshaler)
require.Nil(t, bs.unmarshaler)
_, err = newBaseExporter(exportertest.NewNopSettings(), defaultDataType, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueSettings()))
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
require.Error(t, err)

_, err = newBaseExporter(exportertest.NewNopSettings(), defaultDataType, newNoopObsrepSender,
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func TestLogsRequestExporter_Default_ExportError(t *testing.T) {
}

func TestLogsExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := configretry.NewDefaultBackOffConfig()
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

rCfg := configretry.NewDefaultBackOffConfig()
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func TestMetricsRequestExporter_Default_ExportError(t *testing.T) {
}

func TestMetricsExporter_WithPersistentQueue(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID
rCfg := configretry.NewDefaultBackOffConfig()
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

rCfg := configretry.NewDefaultBackOffConfig()
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
qCfg.QueueSize = 2
wantErr := errors.New("some-error")
Expand Down
20 changes: 14 additions & 6 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@

const defaultQueueSize = 1000

// QueueSettings defines configuration for queueing batches before sending to the consumerSender.
type QueueSettings struct {
// Deprecated: [v0.110.0] Use QueueConfig instead.
type QueueSettings = QueueConfig

// QueueConfig defines configuration for queueing batches before sending to the consumerSender.
type QueueConfig struct {
// Enabled indicates whether to not enqueue batches before sending to the consumerSender.
Enabled bool `mapstructure:"enabled"`
// NumConsumers is the number of consumers from the queue. Defaults to 10.
Expand All @@ -37,9 +40,14 @@
StorageID *component.ID `mapstructure:"storage"`
}

// NewDefaultQueueSettings returns the default settings for QueueSettings.
// Deprecated: [v0.110.0] Use NewDefaultQueueConfig instead.
func NewDefaultQueueSettings() QueueSettings {
return QueueSettings{
return NewDefaultQueueConfig()

Check warning on line 45 in exporter/exporterhelper/queue_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/queue_sender.go#L45

Added line #L45 was not covered by tests
}

// NewDefaultQueueConfig returns the default config for QueueConfig.
func NewDefaultQueueConfig() QueueConfig {
return QueueConfig{
Enabled: true,
NumConsumers: 10,
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
Expand All @@ -49,8 +57,8 @@
}
}

// Validate checks if the QueueSettings configuration is valid
func (qCfg *QueueSettings) Validate() error {
// Validate checks if the QueueConfig configuration is valid
func (qCfg *QueueConfig) Validate() error {
if !qCfg.Enabled {
return nil
}
Expand Down
24 changes: 12 additions & 12 deletions exporter/exporterhelper/queue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender,
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) {
}

func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender,
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) {
}

func TestQueuedRetry_RejectOnFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.QueueSize = 0
qCfg.NumConsumers = 0
set := exportertest.NewNopSettings()
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestQueuedRetryHappyPath(t *testing.T) {
queueOptions: []Option{
withMarshaler(mockRequestMarshaler),
withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
WithQueue(QueueSettings{
WithQueue(QueueConfig{
Enabled: true,
QueueSize: 10,
NumConsumers: 1,
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) {
tt, err := componenttest.SetupTelemetry(defaultID)
require.NoError(t, err)

qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 0 // to make every request go straight to the queue
rCfg := configretry.NewDefaultBackOffConfig()
set := exporter.Settings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}
Expand Down Expand Up @@ -248,14 +248,14 @@ func TestNoCancellationContext(t *testing.T) {
assert.True(t, d.IsZero())
}

func TestQueueSettings_Validate(t *testing.T) {
qCfg := NewDefaultQueueSettings()
func TestQueueConfig_Validate(t *testing.T) {
qCfg := NewDefaultQueueConfig()
assert.NoError(t, qCfg.Validate())

qCfg.QueueSize = 0
assert.EqualError(t, qCfg.Validate(), "queue size must be positive")

qCfg = NewDefaultQueueSettings()
qCfg = NewDefaultQueueConfig()
qCfg.NumConsumers = 0

assert.EqualError(t, qCfg.Validate(), "number of queue consumers must be positive")
Expand All @@ -276,7 +276,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
withMarshaler(mockRequestMarshaler),
withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})),
func() Option {
qs := NewDefaultQueueSettings()
qs := NewDefaultQueueConfig()
qs.Enabled = false
return WithQueue(qs)
}(),
Expand Down Expand Up @@ -340,7 +340,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := configretry.NewDefaultBackOffConfig()
Expand All @@ -366,7 +366,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence
rCfg := configretry.NewDefaultBackOffConfig()
Expand All @@ -385,7 +385,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
}

func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = &storageID // enable persistence to ensure data is re-queued on shutdown
Expand Down
12 changes: 6 additions & 6 deletions exporter/exporterhelper/retry_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func mockRequestMarshaler(Request) ([]byte, error) {
}

func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
rCfg := configretry.NewDefaultBackOffConfig()
mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data")))
be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender,
Expand All @@ -60,7 +60,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) {
}

func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
be, err := newBaseExporter(defaultSettings, defaultDataType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler),
Expand All @@ -86,7 +86,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) {
}

func TestQueuedRetry_OnError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = 0
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestQueuedRetry_OnError(t *testing.T) {
}

func TestQueuedRetry_MaxElapsedTime(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = time.Millisecond
Expand Down Expand Up @@ -164,7 +164,7 @@ func (e wrappedError) Unwrap() error {
}

func TestQueuedRetry_ThrottleError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.InitialInterval = 10 * time.Millisecond
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) {
}

func TestQueuedRetry_RetryOnError(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg := NewDefaultQueueConfig()
qCfg.NumConsumers = 1
qCfg.QueueSize = 1
rCfg := configretry.NewDefaultBackOffConfig()
Expand Down
20 changes: 14 additions & 6 deletions exporter/exporterhelper/timeout_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,40 @@
"time"
)

// TimeoutSettings for timeout. The timeout applies to individual attempts to send data to the backend.
type TimeoutSettings struct {
// Deprecated: [v0.110.0] Use TimeoutConfig instead.
type TimeoutSettings = TimeoutConfig

// TimeoutConfig for timeout. The timeout applies to individual attempts to send data to the backend.
type TimeoutConfig struct {
// Timeout is the timeout for every attempt to send data to the backend.
// A zero timeout means no timeout.
Timeout time.Duration `mapstructure:"timeout"`
}

func (ts *TimeoutSettings) Validate() error {
func (ts *TimeoutConfig) Validate() error {
// Negative timeouts are not acceptable, since all sends will fail.
if ts.Timeout < 0 {
return errors.New("'timeout' must be non-negative")
}
return nil
}

// NewDefaultTimeoutSettings returns the default settings for TimeoutSettings.
// Deprecated: [v0.110.0] Use NewDefaultTimeoutConfig instead.
func NewDefaultTimeoutSettings() TimeoutSettings {
return TimeoutSettings{
return NewDefaultTimeoutConfig()

Check warning on line 32 in exporter/exporterhelper/timeout_sender.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/timeout_sender.go#L32

Added line #L32 was not covered by tests
}

// NewDefaultTimeoutConfig returns the default config for TimeoutConfig.
func NewDefaultTimeoutConfig() TimeoutConfig {
return TimeoutConfig{
Timeout: 5 * time.Second,
}
}

// timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender.
type timeoutSender struct {
baseRequestSender
cfg TimeoutSettings
cfg TimeoutConfig
}

func (ts *timeoutSender) send(ctx context.Context, req Request) error {
Expand Down
Loading
Loading