Skip to content

Commit

Permalink
[exporterhelper] Refactor options and baseExporter (#8369)
Browse files Browse the repository at this point in the history
Separate all the parts of the baseExporter into an explicit chain of
request senders. It makes it easier to follow the data flow and add
additional senders.

This change also removes the baseSettings, because keeping the settings
is not needed anymore. All the options update the internal senders in
place.

This change also removes confusing error messages like "Exporting
failed. Dropping data. Try enabling sending_queue to survive temporary
failures" when the Queue is not even available in the exporter
(WithQueue option is not applied) which means users don't
`sending_queue` config option. Now, such messages are only shown when
the sending_queue (or retry_on_failure) is available, but not enabled by
the user.
  • Loading branch information
dmitryax authored Sep 8, 2023
1 parent eaac340 commit 4e71dc1
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 308 deletions.
20 changes: 20 additions & 0 deletions .chloggen/exporterhelper-avoid-messages.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Stop logging error messages suggesting user to enable `retry_on_failure` or `sending_queue` when they are not available.

# One or more tracking issues or pull requests related to the change
issues: [8369]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
226 changes: 150 additions & 76 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"context"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
Expand All @@ -29,9 +32,34 @@ func NewDefaultTimeoutSettings() TimeoutSettings {

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdown()
send(req internal.Request) error
setNextSender(nextSender requestSender)
}

type baseRequestSender struct {
nextSender requestSender
}

var _ requestSender = (*baseRequestSender)(nil)

func (b *baseRequestSender) start(context.Context, component.Host, exporter.CreateSettings) error {
return nil
}

func (b *baseRequestSender) shutdown() {}

func (b *baseRequestSender) send(req internal.Request) error {
return b.nextSender.send(req)
}

func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
}

type obsrepSenderFactory func(obsrep *obsExporter) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
ctx context.Context
Expand All @@ -56,99 +84,68 @@ func (req *baseRequest) OnProcessingFinished() {
}
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
queue internal.ProducerConsumerQueue
RetrySettings
requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

// newBaseSettings returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
// TODO: The first three arguments will be removed when the old exporter helpers will be updated to call the new ones.
func newBaseSettings(requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
marshaler: marshaler,
unmarshaler: unmarshaler,
}

for _, op := range options {
op(bs)
}

return bs
}

// Option apply changes to baseSettings.
type Option func(*baseSettings)
// Option apply changes to baseExporter.
type Option func(*baseExporter)

// WithStart overrides the default Start function for an exporter.
// The default start function does nothing and always returns nil.
func WithStart(start component.StartFunc) Option {
return func(o *baseSettings) {
return func(o *baseExporter) {
o.StartFunc = start
}
}

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown component.ShutdownFunc) Option {
return func(o *baseSettings) {
return func(o *baseExporter) {
o.ShutdownFunc = shutdown
}
}

// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) Option {
return func(o *baseSettings) {
o.TimeoutSettings = timeoutSettings
return func(o *baseExporter) {
o.timeoutSender.cfg = timeoutSettings
}
}

// WithRetry overrides the default RetrySettings for an exporter.
// The default RetrySettings is to disable retries.
func WithRetry(retrySettings RetrySettings) Option {
return func(o *baseSettings) {
o.RetrySettings = retrySettings
return func(o *baseExporter) {
o.retrySender = newRetrySender(o.set.ID, retrySettings, o.sampledLogger, o.onTemporaryFailure)
}
}

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
}
if !config.Enabled {
return
var queue internal.ProducerConsumerQueue
if config.Enabled {
if config.StorageID == nil {
queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
} else {
queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
}
}
if config.StorageID == nil {
o.queue = internal.NewBoundedMemoryQueue(config.QueueSize, config.NumConsumers)
return
}
o.queue = internal.NewPersistentQueue(config.QueueSize, config.NumConsumers, *config.StorageID, o.marshaler, o.unmarshaler)
qs := newQueueSender(o.set.ID, o.signal, queue, o.sampledLogger)
o.queueSender = qs
o.setOnTemporaryFailure(qs.onTemporaryFailure)
}
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return func(o *baseSettings) {
return func(o *baseExporter) {
o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities))
}
}
Expand All @@ -157,48 +154,106 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
type baseExporter struct {
component.StartFunc
component.ShutdownFunc
obsrep *obsExporter
sender requestSender
qrSender *queuedRetrySender

requestExporter bool
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
signal component.DataType

set exporter.CreateSettings
obsrep *obsExporter
sampledLogger *zap.Logger

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
queueSender requestSender
obsrepSender requestSender
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.

// onTemporaryFailure is a function that is called when the retrySender is unable to send data to the next consumer.
onTemporaryFailure onRequestHandlingFinishedFunc

consumerOptions []consumer.Option
}

func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) {
be := &baseExporter{}
// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler internal.RequestMarshaler,
unmarshaler internal.RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

var err error
be.obsrep, err = newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
obsrep, err := newObsExporter(obsreport.ExporterSettings{ExporterID: set.ID, ExporterCreateSettings: set}, globalInstruments)
if err != nil {
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queue, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := bs.StartFunc.Start(ctx, host); err != nil {
return err
}
be := &baseExporter{
requestExporter: requestExporter,
marshaler: marshaler,
unmarshaler: unmarshaler,
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsrep),
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutSettings()},

// If no error then start the queuedRetrySender.
return be.qrSender.start(ctx, host, set)
set: set,
obsrep: obsrep,
sampledLogger: createSampledLogger(set.Logger),
}
be.ShutdownFunc = func(ctx context.Context) error {
// First shutdown the queued retry sender
be.qrSender.shutdown()
// Last shutdown the wrapped exporter itself.
return bs.ShutdownFunc.Shutdown(ctx)

for _, op := range options {
op(be)
}
be.connectSenders()

return be, nil
}

// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper.
// This can be used to wrap with observability (create spans, record metrics) the consumer sender.
func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) {
be.qrSender.consumerSender = f(be.qrSender.consumerSender)
// send sends the request using the first sender in the chain.
func (be *baseExporter) send(req internal.Request) error {
return be.queueSender.send(req)
}

// connectSenders connects the senders in the predefined order.
func (be *baseExporter) connectSenders() {
be.queueSender.setNextSender(be.obsrepSender)
be.obsrepSender.setNextSender(be.retrySender)
be.retrySender.setNextSender(be.timeoutSender)
}

func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
return err
}

// If no error then start the queueSender.
return be.queueSender.start(ctx, host, be.set)
}

func (be *baseExporter) Shutdown(ctx context.Context) error {
// First shutdown the retry sender, so it can push any pending requests to back the queue.
be.retrySender.shutdown()

// Then shutdown the queue sender.
be.queueSender.shutdown()

// Last shutdown the wrapped exporter itself.
return be.ShutdownFunc.Shutdown(ctx)
}

func (be *baseExporter) setOnTemporaryFailure(onTemporaryFailure onRequestHandlingFinishedFunc) {
be.onTemporaryFailure = onTemporaryFailure
if rs, ok := be.retrySender.(*retrySender); ok {
rs.onTemporaryFailure = onTemporaryFailure
}
}

// timeoutSender is a requestSender that adds a `timeout` to every request that passes this sender.
type timeoutSender struct {
baseRequestSender
cfg TimeoutSettings
}

Expand All @@ -213,3 +268,22 @@ func (ts *timeoutSender) send(req internal.Request) error {
}
return req.Export(ctx)
}

func createSampledLogger(logger *zap.Logger) *zap.Logger {
if logger.Core().Enabled(zapcore.DebugLevel) {
// Debugging is enabled. Don't do any sampling.
return logger
}

// Create a logger that samples all messages to 1 per 10 seconds initially,
// and 1/100 of messages after that.
opts := zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewSamplerWithOptions(
core,
10*time.Second,
1,
100,
)
})
return logger.WithOptions(opts)
}
20 changes: 11 additions & 9 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exporterhelper

import (
Expand Down Expand Up @@ -31,12 +32,16 @@ var (
}
)

func newNoopObsrepSender(_ *obsExporter) requestSender {
return &baseRequestSender{}
}

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, newBaseSettings(false, nil, nil), "")
be, err := newBaseExporter(defaultSettings, "", false, nil, nil, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, newBaseSettings(true, nil, nil), "")
be, err = newBaseExporter(defaultSettings, "", true, nil, nil, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -45,13 +50,10 @@ func TestBaseExporter(t *testing.T) {
func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := newBaseExporter(
defaultSettings,
newBaseSettings(
false, nil, nil,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
"",
defaultSettings, "", false, nil, nil, newNoopObsrepSender,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings()),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
Loading

0 comments on commit 4e71dc1

Please sign in to comment.