diff --git a/.chloggen/exporterhelper-avoid-messages.yaml b/.chloggen/exporterhelper-avoid-messages.yaml new file mode 100644 index 00000000000..beec973dce2 --- /dev/null +++ b/.chloggen/exporterhelper-avoid-messages.yaml @@ -0,0 +1,20 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Stop logging error messages suggesting user to enable `retry_on_failure` or `sending_queue` when they are not available. + +# One or more tracking issues or pull requests related to the change +issues: [8369] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 3dfe4c41311..443fc477865 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -7,6 +7,9 @@ import ( "context" "time" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -29,9 +32,34 @@ func NewDefaultTimeoutSettings() TimeoutSettings { // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs). type requestSender interface { + start(ctx context.Context, host component.Host, set exporter.CreateSettings) error + shutdown() send(req internal.Request) error + setNextSender(nextSender requestSender) +} + +type baseRequestSender struct { + nextSender requestSender +} + +var _ requestSender = (*baseRequestSender)(nil) + +func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error { + return nil +} + +func (b *baseRequestSender) shutdown() {} + +func (b *baseRequestSender) send(req internal.Request) error { + return b.nextSender.send(req) +} + +func (b *baseRequestSender) setNextSender(nextSender requestSender) { + b.nextSender = nextSender } +type obsrepSenderFactory func(obsrep *obsExporter) requestSender + // baseRequest is a base implementation for the internal.Request. type baseRequest struct { ctx context.Context @@ -56,47 +84,13 @@ func (req *baseRequest) OnProcessingFinished() { } } -// baseSettings represents all the options that users can configure. -type baseSettings struct { - component.StartFunc - component.ShutdownFunc - consumerOptions []consumer.Option - TimeoutSettings - queue internal.ProducerConsumerQueue - RetrySettings - requestExporter bool - marshaler internal.RequestMarshaler - unmarshaler internal.RequestUnmarshaler -} - -// newBaseSettings returns the baseSettings starting from the default and applying all configured options. -// requestExporter indicates whether the base settings are for a new request exporter or not. -// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones. -func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler, - unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings { - bs := &baseSettings{ - requestExporter: requestExporter, - TimeoutSettings: NewDefaultTimeoutSettings(), - // TODO: Enable retry by default (call DefaultRetrySettings) - RetrySettings: RetrySettings{Enabled: false}, - marshaler: marshaler, - unmarshaler: unmarshaler, - } - - for _, op := range options { - op(bs) - } - - return bs -} - -// Option apply changes to baseSettings. -type Option func(*baseSettings) +// Option apply changes to baseExporter. +type Option func(*baseExporter) // WithStart overrides the default Start function for an exporter. // The default start function does nothing and always returns nil. func WithStart(start component.StartFunc) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.StartFunc = start } } @@ -104,7 +98,7 @@ func WithStart(start component.StartFunc) Option { // WithShutdown overrides the default Shutdown function for an exporter. // The default shutdown function does nothing and always returns nil. func WithShutdown(shutdown component.ShutdownFunc) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.ShutdownFunc = shutdown } } @@ -112,16 +106,16 @@ func WithShutdown(shutdown component.ShutdownFunc) Option { // WithTimeout overrides the default TimeoutSettings for an exporter. // The default TimeoutSettings is 5 seconds. func WithTimeout(timeoutSettings TimeoutSettings) Option { - return func(o *baseSettings) { - o.TimeoutSettings = timeoutSettings + return func(o *baseExporter) { + o.timeoutSender.cfg = timeoutSettings } } // WithRetry overrides the default RetrySettings for an exporter. // The default RetrySettings is to disable retries. func WithRetry(retrySettings RetrySettings) Option { - return func(o *baseSettings) { - o.RetrySettings = retrySettings + return func(o *baseExporter) { + o.retrySender = newRetrySender(o.set.ID, retrySettings, o.sampledLogger, o.onTemporaryFailure) } } @@ -129,18 +123,21 @@ func WithRetry(retrySettings RetrySettings) Option { // The default QueueSettings is to disable queueing. // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. func WithQueue(config QueueSettings) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { if o.requestExporter { panic("queueing is not available for the new request exporters yet") } - if !config.Enabled { - return + var queue internal.ProducerConsumerQueue + if config.Enabled { + if config.StorageID == nil { + queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) + } else { + queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) + } } - if config.StorageID == nil { - o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers) - return - } - o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler) + qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger) + o.queueSender = qs + o.setOnTemporaryFailure(qs.onTemporaryFailure) } } @@ -148,7 +145,7 @@ func WithQueue(config QueueSettings) Option { // The default is non-mutable data. // TODO: Verify if we can change the default to be mutable as we do for processors. func WithCapabilities(capabilities consumer.Capabilities) Option { - return func(o *baseSettings) { + return func(o *baseExporter) { o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities)) } } @@ -157,48 +154,106 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { type baseExporter struct { component.StartFunc component.ShutdownFunc - obsrep *obsExporter - sender requestSender - qrSender *queuedRetrySender + + requestExporter bool + marshaler internal.RequestMarshaler + unmarshaler internal.RequestUnmarshaler + signal component.DataType + + set exporter.CreateSettings + obsrep *obsExporter + sampledLogger *zap.Logger + + // Chain of senders that the exporter helper applies before passing the data to the actual exporter. + // The data is handled by each sender in the respective order starting from the queueSender. + // Most of the senders are optional, and initialized with a no-op path-through sender. + queueSender requestSender + obsrepSender requestSender + retrySender requestSender + timeoutSender *timeoutSender // timeoutSender is always initialized. + + // onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer. + onTemporaryFailure onRequestHandlingFinishedFunc + + consumerOptions []consumer.Option } -func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) { - be := &baseExporter{} +// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones. +func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler, + unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { - var err error - be.obsrep, err = newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) + obsrep, err := newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments) if err != nil { return nil, err } - be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger) - be.sender = be.qrSender - be.StartFunc = func(ctx context.Context, host component.Host) error { - // First start the wrapped exporter. - if err := bs.StartFunc.Start(ctx, host); err != nil { - return err - } + be := &baseExporter{ + requestExporter: requestExporter, + marshaler: marshaler, + unmarshaler: unmarshaler, + signal: signal, + + queueSender: &baseRequestSender{}, + obsrepSender: osf(obsrep), + retrySender: &baseRequestSender{}, + timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}, - // If no error then start the queuedRetrySender. - return be.qrSender.start(ctx, host, set) + set: set, + obsrep: obsrep, + sampledLogger: createSampledLogger(set.Logger), } - be.ShutdownFunc = func(ctx context.Context) error { - // First shutdown the queued retry sender - be.qrSender.shutdown() - // Last shutdown the wrapped exporter itself. - return bs.ShutdownFunc.Shutdown(ctx) + + for _, op := range options { + op(be) } + be.connectSenders() + return be, nil } -// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper. -// This can be used to wrap with observability (create spans, record metrics) the consumer sender. -func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) { - be.qrSender.consumerSender = f(be.qrSender.consumerSender) +// send sends the request using the first sender in the chain. +func (be *baseExporter) send(req internal.Request) error { + return be.queueSender.send(req) +} + +// connectSenders connects the senders in the predefined order. +func (be *baseExporter) connectSenders() { + be.queueSender.setNextSender(be.obsrepSender) + be.obsrepSender.setNextSender(be.retrySender) + be.retrySender.setNextSender(be.timeoutSender) +} + +func (be *baseExporter) Start(ctx context.Context, host component.Host) error { + // First start the wrapped exporter. + if err := be.StartFunc.Start(ctx, host); err != nil { + return err + } + + // If no error then start the queueSender. + return be.queueSender.start(ctx, host, be.set) +} + +func (be *baseExporter) Shutdown(ctx context.Context) error { + // First shutdown the retry sender, so it can push any pending requests to back the queue. + be.retrySender.shutdown() + + // Then shutdown the queue sender. + be.queueSender.shutdown() + + // Last shutdown the wrapped exporter itself. + return be.ShutdownFunc.Shutdown(ctx) +} + +func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) { + be.onTemporaryFailure = onTemporaryFailure + if rs, ok := be.retrySender.(*retrySender); ok { + rs.onTemporaryFailure = onTemporaryFailure + } } // timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender. type timeoutSender struct { + baseRequestSender cfg TimeoutSettings } @@ -213,3 +268,22 @@ func (ts *timeoutSender) send(req internal.Request) error { } return req.Export(ctx) } + +func createSampledLogger(logger *zap.Logger) *zap.Logger { + if logger.Core().Enabled(zapcore.DebugLevel) { + // Debugging is enabled. Don't do any sampling. + return logger + } + + // Create a logger that samples all messages to 1 per 10 seconds initially, + // and 1/100 of messages after that. + opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { + return zapcore.NewSamplerWithOptions( + core, + 10*time.Second, + 1, + 100, + ) + }) + return logger.WithOptions(opts) +} diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 076cd55b00e..d505a3a35be 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -1,5 +1,6 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 + package exporterhelper import ( @@ -31,12 +32,16 @@ var ( } ) +func newNoopObsrepSender(_ *obsExporter) requestSender { + return &baseRequestSender{} +} + func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) - be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "") + be, err = newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -45,13 +50,10 @@ func TestBaseExporter(t *testing.T) { func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( - defaultSettings, - newBaseSettings( - false, nil, nil, - WithStart(func(ctx context.Context, host component.Host) error { return want }), - WithShutdown(func(ctx context.Context) error { return want }), - WithTimeout(NewDefaultTimeoutSettings())), - "", + defaultSettings, "", false, nil, nil, newNoopObsrepSender, + WithStart(func(ctx context.Context, host component.Host) error { return want }), + WithShutdown(func(ctx context.Context) error { return want }), + WithTimeout(NewDefaultTimeoutSettings()), ) require.NoError(t, err) require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost())) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index ef22d50dd02..a6a6e162710 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -89,26 +89,20 @@ func NewLogsExporter( return nil, errNilPushLogsData } - bs := newBaseSettings(false, logsRequestMarshaler, newLogsRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeLogs) + be, err := newBaseExporter(set, component.DataTypeLogs, false, logsRequestMarshaler, + newLogsRequestUnmarshalerFunc(pusher), newLogsExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &logsExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req := newLogsRequest(ctx, ld, pusher) - serr := be.sender.send(req) + serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &logsExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewLogsRequestExporter( return nil, errNilLogsConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeLogs) + be, err := newBaseExporter(set, component.DataTypeLogs, true, nil, nil, newLogsExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &logsExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { req, cErr := converter.RequestFromLogs(ctx, ld) @@ -166,12 +152,12 @@ func NewLogsRequestExporter( baseRequest: baseRequest{ctx: ctx}, Request: req, } - sErr := be.sender.send(r) + sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &logsExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewLogsRequestExporter( } type logsExporterWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newLogsExporterWithObservability(obsrep *obsExporter) requestSender { + return &logsExporterWithObservability{obsrep: obsrep} } func (lewo *logsExporterWithObservability) send(req internal.Request) error { diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index a13b010e955..a678ebeebb5 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -89,26 +89,20 @@ func NewMetricsExporter( return nil, errNilPushMetricsData } - bs := newBaseSettings(false, metricsRequestMarshaler, newMetricsRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + be, err := newBaseExporter(set, component.DataTypeMetrics, false, metricsRequestMarshaler, + newMetricsRequestUnmarshalerFunc(pusher), newMetricsSenderWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &metricsSenderWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req := newMetricsRequest(ctx, md, pusher) - serr := be.sender.send(req) + serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &metricsExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewMetricsRequestExporter( return nil, errNilMetricsConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeMetrics) + be, err := newBaseExporter(set, component.DataTypeMetrics, true, nil, nil, newMetricsSenderWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &metricsSenderWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { req, cErr := converter.RequestFromMetrics(ctx, md) @@ -166,12 +152,12 @@ func NewMetricsRequestExporter( Request: req, baseRequest: baseRequest{ctx: ctx}, } - sErr := be.sender.send(r) + sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &metricsExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewMetricsRequestExporter( } type metricsSenderWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender { + return &metricsSenderWithObservability{obsrep: obsrep} } func (mewo *metricsSenderWithObservability) send(req internal.Request) error { diff --git a/exporter/exporterhelper/queued_retry.go b/exporter/exporterhelper/queued_retry.go index 2e2bc1f9a82..3aceb411745 100644 --- a/exporter/exporterhelper/queued_retry.go +++ b/exporter/exporterhelper/queued_retry.go @@ -14,7 +14,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" @@ -65,51 +64,32 @@ func (qCfg *QueueSettings) Validate() error { return nil } -type queuedRetrySender struct { +type queueSender struct { + baseRequestSender fullName string id component.ID signal component.DataType - consumerSender requestSender queue internal.ProducerConsumerQueue - retryStopCh chan struct{} traceAttribute attribute.KeyValue logger *zap.Logger requeuingEnabled bool } -func newQueuedRetrySender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, - rCfg RetrySettings, nextSender requestSender, logger *zap.Logger) *queuedRetrySender { - retryStopCh := make(chan struct{}) - sampledLogger := createSampledLogger(logger) - traceAttr := attribute.String(obsmetrics.ExporterKey, id.String()) - - qrs := &queuedRetrySender{ +func newQueueSender(id component.ID, signal component.DataType, queue internal.ProducerConsumerQueue, logger *zap.Logger) *queueSender { + return &queueSender{ fullName: id.String(), id: id, signal: signal, queue: queue, - retryStopCh: retryStopCh, - traceAttribute: traceAttr, - logger: sampledLogger, + traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), + logger: logger, // TODO: this can be further exposed as a config param rather than relying on a type of queue requeuingEnabled: queue != nil && queue.IsPersistent(), } - - qrs.consumerSender = &retrySender{ - traceAttribute: traceAttr, - cfg: rCfg, - nextSender: nextSender, - stopCh: retryStopCh, - logger: sampledLogger, - // Following three functions actually depend on queuedRetrySender - onTemporaryFailure: qrs.onTemporaryFailure, - } - - return qrs } -func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { - if !qrs.requeuingEnabled || qrs.queue == nil { +func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { + if !qs.requeuingEnabled || qs.queue == nil { logger.Error( "Exporting failed. No more retries left. Dropping data.", zap.Error(err), @@ -118,7 +98,7 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna return err } - if qrs.queue.Produce(req) { + if qs.queue.Produce(req) { logger.Error( "Exporting failed. Putting back to the end of the queue.", zap.Error(err), @@ -134,16 +114,16 @@ func (qrs *queuedRetrySender) onTemporaryFailure(logger *zap.Logger, req interna } // start is invoked during service startup. -func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { - if qrs.queue == nil { +func (qs *queueSender) start(ctx context.Context, host component.Host, set exporter.CreateSettings) error { + if qs.queue == nil { return nil } - err := qrs.queue.Start(ctx, host, internal.QueueSettings{ + err := qs.queue.Start(ctx, host, internal.QueueSettings{ CreateSettings: set, - DataType: qrs.signal, + DataType: qs.signal, Callback: func(item internal.Request) { - _ = qrs.consumerSender.send(item) + _ = qs.nextSender.send(item) item.OnProcessingFinished() }, }) @@ -153,14 +133,14 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, se // Start reporting queue length metric err = globalInstruments.queueSize.UpsertEntry(func() int64 { - return int64(qrs.queue.Size()) - }, metricdata.NewLabelValue(qrs.fullName)) + return int64(qs.queue.Size()) + }, metricdata.NewLabelValue(qs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue size metric: %w", err) } err = globalInstruments.queueCapacity.UpsertEntry(func() int64 { - return int64(qrs.queue.Capacity()) - }, metricdata.NewLabelValue(qrs.fullName)) + return int64(qs.queue.Capacity()) + }, metricdata.NewLabelValue(qs.fullName)) if err != nil { return fmt.Errorf("failed to create retry queue capacity metric: %w", err) } @@ -169,19 +149,16 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host, se } // shutdown is invoked during service shutdown. -func (qrs *queuedRetrySender) shutdown() { - // First Stop the retry goroutines, so that unblocks the queue numWorkers. - close(qrs.retryStopCh) - - if qrs.queue != nil { +func (qs *queueSender) shutdown() { + if qs.queue != nil { // Cleanup queue metrics reporting _ = globalInstruments.queueSize.UpsertEntry(func() int64 { return int64(0) - }, metricdata.NewLabelValue(qrs.fullName)) + }, metricdata.NewLabelValue(qs.fullName)) // Stop the queued sender, this will drain the queue and will call the retry (which is stopped) that will only // try once every request. - qrs.queue.Stop() + qs.queue.Stop() } } @@ -217,31 +194,12 @@ func NewDefaultRetrySettings() RetrySettings { } } -func createSampledLogger(logger *zap.Logger) *zap.Logger { - if logger.Core().Enabled(zapcore.DebugLevel) { - // Debugging is enabled. Don't do any sampling. - return logger - } - - // Create a logger that samples all messages to 1 per 10 seconds initially, - // and 1/100 of messages after that. - opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core { - return zapcore.NewSamplerWithOptions( - core, - 10*time.Second, - 1, - 100, - ) - }) - return logger.WithOptions(opts) -} - // send implements the requestSender interface -func (qrs *queuedRetrySender) send(req internal.Request) error { - if qrs.queue == nil { - err := qrs.consumerSender.send(req) +func (qs *queueSender) send(req internal.Request) error { + if qs.queue == nil { + err := qs.nextSender.send(req) if err != nil { - qrs.logger.Error( + qs.logger.Error( "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.", zap.Int("dropped_items", req.Count()), ) @@ -254,16 +212,16 @@ func (qrs *queuedRetrySender) send(req internal.Request) error { req.SetContext(noCancellationContext{Context: req.Context()}) span := trace.SpanFromContext(req.Context()) - if !qrs.queue.Produce(req) { - qrs.logger.Error( + if !qs.queue.Produce(req) { + qs.logger.Error( "Dropping data because sending_queue is full. Try increasing queue_size.", zap.Int("dropped_items", req.Count()), ) - span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qrs.traceAttribute)) + span.AddEvent("Dropped item, sending_queue is full.", trace.WithAttributes(qs.traceAttribute)) return errSendingQueueIsFull } - span.AddEvent("Enqueued item.", trace.WithAttributes(qrs.traceAttribute)) + span.AddEvent("Enqueued item.", trace.WithAttributes(qs.traceAttribute)) return nil } @@ -292,14 +250,33 @@ func NewThrottleRetry(err error, delay time.Duration) error { type onRequestHandlingFinishedFunc func(*zap.Logger, internal.Request, error) error type retrySender struct { + baseRequestSender traceAttribute attribute.KeyValue cfg RetrySettings - nextSender requestSender stopCh chan struct{} logger *zap.Logger onTemporaryFailure onRequestHandlingFinishedFunc } +func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { + if onTemporaryFailure == nil { + onTemporaryFailure = func(logger *zap.Logger, req internal.Request, err error) error { + return err + } + } + return &retrySender{ + traceAttribute: attribute.String(obsmetrics.ExporterKey, id.String()), + cfg: rCfg, + stopCh: make(chan struct{}), + logger: logger, + onTemporaryFailure: onTemporaryFailure, + } +} + +func (rs *retrySender) shutdown() { + close(rs.stopCh) +} + // send implements the requestSender interface func (rs *retrySender) send(req internal.Request) error { if !rs.cfg.Enabled { diff --git a/exporter/exporterhelper/queued_retry_test.go b/exporter/exporterhelper/queued_retry_test.go index cb0885751b3..7472cdeff49 100644 --- a/exporter/exporterhelper/queued_retry_test.go +++ b/exporter/exporterhelper/queued_retry_test.go @@ -41,11 +41,9 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() mockR := newMockRequest(context.Background(), 2, consumererror.NewPermanent(errors.New("bad data"))) - bs := newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)) - be, err := newBaseExporter(defaultSettings, bs, "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -53,7 +51,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -66,13 +64,11 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() rCfg.Enabled = false - bs := newBaseSettings(false, mockRequestMarshaler, + be, err := newBaseExporter(defaultSettings, "", false, mockRequestMarshaler, mockRequestUnmarshaler(newMockRequest(context.Background(), 2, errors.New("transient error"))), - WithRetry(rCfg), WithQueue(qCfg)) - be, err := newBaseExporter(defaultSettings, bs, "") + newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -81,7 +77,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests @@ -95,10 +91,8 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -106,9 +100,10 @@ func TestQueuedRetry_OnError(t *testing.T) { traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) mockR := newMockRequest(context.Background(), 2, traceErr) + ocs := be.obsrepSender.(*observabilityConsumerSender) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -122,23 +117,22 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) firstMockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(firstMockR)) + require.NoError(t, be.send(firstMockR)) }) // Enqueue another request to ensure when calling shutdown we drain the queue. secondMockR := newMockRequest(context.Background(), 3, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(secondMockR)) + require.NoError(t, be.send(secondMockR)) }) assert.NoError(t, be.Shutdown(context.Background())) @@ -156,10 +150,9 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -170,14 +163,14 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { mockR := newMockRequest(ctx, 2, nil) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_MaxElapsedTime(t *testing.T) { @@ -186,10 +179,9 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -197,14 +189,14 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { ocs.run(func() { // Add an item that will always fail. - require.NoError(t, be.sender.send(newErrorRequest(context.Background()))) + require.NoError(t, be.send(newErrorRequest(context.Background()))) }) mockR := newMockRequest(context.Background(), 2, nil) start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -217,7 +209,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { mockR.checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 7) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } type wrappedError struct { @@ -233,10 +225,9 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -247,7 +238,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { start := time.Now() ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -257,7 +248,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { mockR.checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_RetryOnError(t *testing.T) { @@ -266,10 +257,9 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := NewDefaultRetrySettings() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -278,7 +268,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { mockR := newMockRequest(context.Background(), 2, errors.New("transient error")) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() @@ -286,23 +276,22 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { mockR.checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) - require.Zero(t, be.qrSender.queue.Size()) + require.Zero(t, be.queueSender.(*queueSender).queue.Size()) } func TestQueuedRetry_DropOnFull(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) }) ocs.run(func() { - require.Error(t, be.sender.send(newMockRequest(context.Background(), 2, errors.New("transient error")))) + require.Error(t, be.send(newMockRequest(context.Background(), 2, errors.New("transient error")))) }) } @@ -314,10 +303,9 @@ func TestQueuedRetryHappyPath(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs + ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -329,7 +317,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { ocs.run(func() { req := newMockRequest(context.Background(), 2, nil) reqs = append(reqs, req) - require.NoError(t, be.sender.send(req)) + require.NoError(t, be.send(req)) }) } @@ -349,13 +337,13 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := NewDefaultRetrySettings() - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) checkValueForGlobalManager(t, defaultExporterTags, int64(defaultQueueSize), "exporter/queue_capacity") for i := 0; i < 7; i++ { - require.NoError(t, be.sender.send(newErrorRequest(context.Background()))) + require.NoError(t, be.send(newErrorRequest(context.Background()))) } checkValueForGlobalManager(t, defaultExporterTags, int64(7), "exporter/queue_size") @@ -397,11 +385,10 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg.NumConsumers = 1 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - ocs := newObservabilityConsumerSender(be.qrSender.consumerSender) - be.qrSender.consumerSender = ocs - be.qrSender.requeuingEnabled = true + ocs := be.obsrepSender.(*observabilityConsumerSender) + be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -411,7 +398,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { mockR := newMockRequest(context.Background(), 1, traceErr) ocs.run(func() { // This is asynchronous so it should just enqueue, no errors expected. - require.NoError(t, be.sender.send(mockR)) + require.NoError(t, be.send(mockR)) ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing }) ocs.awaitAsyncProcessing() @@ -429,9 +416,9 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { qCfg.QueueSize = 0 rCfg := NewDefaultRetrySettings() rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) - be.qrSender.requeuingEnabled = true + be.queueSender.(*queueSender).requeuingEnabled = true require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { assert.NoError(t, be.Shutdown(context.Background())) @@ -440,18 +427,46 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1)) mockR := newMockRequest(context.Background(), 1, traceErr) - require.Error(t, be.qrSender.consumerSender.send(mockR), "sending_queue is full") + require.Error(t, be.retrySender.send(mockR), "sending_queue is full") mockR.checkNumRequests(t, 1) } func TestQueueRetryWithDisabledQueue(t *testing.T) { qs := NewDefaultQueueSettings() qs.Enabled = false - bs := newBaseSettings(false, nil, nil, WithQueue(qs)) - require.Nil(t, bs.queue) - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), bs, component.DataTypeLogs) + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithQueue(qs)) + require.Nil(t, be.queueSender.(*queueSender).queue) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.Error(t, be.send(mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) + require.NoError(t, be.Shutdown(context.Background())) +} + +func TestQueueRetryWithNoQueue(t *testing.T) { + rCfg := NewDefaultRetrySettings() + rCfg.MaxElapsedTime = time.Nanosecond // fail fast + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) + require.NoError(t, err) + require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + ocs := be.obsrepSender.(*observabilityConsumerSender) + mockR := newMockRequest(context.Background(), 2, errors.New("some error")) + ocs.run(func() { + // This is asynchronous so it should just enqueue, no errors expected. + require.Error(t, be.send(mockR)) + }) + ocs.awaitAsyncProcessing() + mockR.checkNumRequests(t, 1) + ocs.checkSendItemsCount(t, 0) + ocs.checkDroppedItemsCount(t, 2) require.NoError(t, be.Shutdown(context.Background())) } @@ -465,7 +480,7 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(set, "", false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -489,10 +504,7 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - bs := newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)) - bs.marshaler = mockRequestMarshaler - bs.unmarshaler = mockRequestUnmarshaler(&mockRequest{}) - be, err := newBaseExporter(set, bs, "") + be, err := newBaseExporter(set, "", false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -516,27 +528,25 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { req := newMockRequest(context.Background(), 3, errors.New("some error")) - be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil, WithRetry(rCfg), WithQueue(qCfg)), "") + be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), &mockHost{})) // wraps original queue so we can count operations - be.qrSender.queue = &producerConsumerQueueWithCounter{ - ProducerConsumerQueue: be.qrSender.queue, + be.queueSender.(*queueSender).queue = &producerConsumerQueueWithCounter{ + ProducerConsumerQueue: be.queueSender.(*queueSender).queue, produceCounter: produceCounter, } - be.qrSender.requeuingEnabled = true + be.queueSender.(*queueSender).requeuingEnabled = true // replace nextSender inside retrySender to always return error so it doesn't exit send loop - castedSender, ok := be.qrSender.consumerSender.(*retrySender) - require.True(t, ok, "consumerSender should be a retrySender type") - castedSender.nextSender = &errorRequestSender{ + be.retrySender.setNextSender(&errorRequestSender{ errToReturn: errors.New("some error"), - } + }) // Invoke queuedRetrySender so the producer will put the item for consumer to poll - require.NoError(t, be.sender.send(req)) + require.NoError(t, be.send(req)) // first wait for the item to be produced to the queue initially assert.Eventually(t, func() bool { @@ -551,10 +561,13 @@ func TestQueuedRetryPersistentEnabled_shutdown_dataIsRequeued(t *testing.T) { } func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { - bs := newBaseSettings(true, nil, nil, WithRetry(NewDefaultRetrySettings())) + bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, + WithRetry(NewDefaultRetrySettings())) + assert.Nil(t, err) assert.True(t, bs.requestExporter) assert.Panics(t, func() { - _ = newBaseSettings(true, nil, nil, WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) + _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), "", true, nil, nil, newNoopObsrepSender, + WithRetry(NewDefaultRetrySettings()), WithQueue(NewDefaultQueueSettings())) }) } @@ -630,16 +643,15 @@ func newMockRequest(ctx context.Context, cnt int, consumeError error) *mockReque } type observabilityConsumerSender struct { + baseRequestSender waitGroup *sync.WaitGroup sentItemsCount *atomic.Int64 droppedItemsCount *atomic.Int64 - nextSender requestSender } -func newObservabilityConsumerSender(nextSender requestSender) *observabilityConsumerSender { +func newObservabilityConsumerSender(_ *obsExporter) requestSender { return &observabilityConsumerSender{ waitGroup: new(sync.WaitGroup), - nextSender: nextSender, droppedItemsCount: &atomic.Int64{}, sentItemsCount: &atomic.Int64{}, } @@ -737,6 +749,7 @@ func (pcq *producerConsumerQueueWithCounter) Produce(item internal.Request) bool } type errorRequestSender struct { + baseRequestSender errToReturn error } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 03b0fefb95e..7e855b64bfc 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -89,26 +89,20 @@ func NewTracesExporter( return nil, errNilPushTraceData } - bs := newBaseSettings(false, tracesRequestMarshaler, newTraceRequestUnmarshalerFunc(pusher), options...) - be, err := newBaseExporter(set, bs, component.DataTypeTraces) + be, err := newBaseExporter(set, component.DataTypeTraces, false, tracesRequestMarshaler, + newTraceRequestUnmarshalerFunc(pusher), newTracesExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &tracesExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req := newTracesRequest(ctx, td, pusher) - serr := be.sender.send(req) + serr := be.send(req) if errors.Is(serr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(req.Context(), int64(req.Count())) } return serr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &traceExporter{ baseExporter: be, @@ -141,18 +135,10 @@ func NewTracesRequestExporter( return nil, errNilTracesConverter } - bs := newBaseSettings(true, nil, nil, options...) - - be, err := newBaseExporter(set, bs, component.DataTypeTraces) + be, err := newBaseExporter(set, component.DataTypeTraces, true, nil, nil, newTracesExporterWithObservability, options...) if err != nil { return nil, err } - be.wrapConsumerSender(func(nextSender requestSender) requestSender { - return &tracesExporterWithObservability{ - obsrep: be.obsrep, - nextSender: nextSender, - } - }) tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { req, cErr := converter.RequestFromTraces(ctx, td) @@ -166,12 +152,12 @@ func NewTracesRequestExporter( baseRequest: baseRequest{ctx: ctx}, Request: req, } - sErr := be.sender.send(r) + sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count())) } return sErr - }, bs.consumerOptions...) + }, be.consumerOptions...) return &traceExporter{ baseExporter: be, @@ -180,8 +166,12 @@ func NewTracesRequestExporter( } type tracesExporterWithObservability struct { - obsrep *obsExporter - nextSender requestSender + baseRequestSender + obsrep *obsExporter +} + +func newTracesExporterWithObservability(obsrep *obsExporter) requestSender { + return &tracesExporterWithObservability{obsrep: obsrep} } func (tewo *tracesExporterWithObservability) send(req internal.Request) error {