Skip to content

Commit

Permalink
[chore] split exporterhelper so a separate profiles module can use it (
Browse files Browse the repository at this point in the history
…open-telemetry#11215)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description

This is part of open-telemetry#11131, splitting the exporterhelper module with a new
`internal` submodule that holds everything shared by the signals.
This is so a new `exporterhelperprofiles` module can make use of the
shared structs.
  • Loading branch information
dmathieu authored Sep 19, 2024
1 parent 396ae85 commit df3c9e3
Show file tree
Hide file tree
Showing 26 changed files with 1,161 additions and 1,041 deletions.
328 changes: 14 additions & 314 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,178 +4,69 @@
package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper"

import (
"context"
"fmt"

"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
type requestSender interface {
component.Component
send(context.Context, Request) error
setNextSender(nextSender requestSender)
}

type baseRequestSender struct {
component.StartFunc
component.ShutdownFunc
nextSender requestSender
}

var _ requestSender = (*baseRequestSender)(nil)

func (b *baseRequestSender) send(ctx context.Context, req Request) error {
return b.nextSender.send(ctx, req)
}

func (b *baseRequestSender) setNextSender(nextSender requestSender) {
b.nextSender = nextSender
}

type obsrepSenderFactory func(obsrep *obsReport) requestSender

// Option apply changes to baseExporter.
type Option interface {
apply(*baseExporter) error
}

type optionFunc func(*baseExporter) error

func (of optionFunc) apply(e *baseExporter) error {
return of(e)
}
// Option apply changes to BaseExporter.
type Option = internal.Option

// WithStart overrides the default Start function for an exporter.
// The default start function does nothing and always returns nil.
func WithStart(start component.StartFunc) Option {
return optionFunc(func(o *baseExporter) error {
o.StartFunc = start
return nil
})
return internal.WithStart(start)
}

// WithShutdown overrides the default Shutdown function for an exporter.
// The default shutdown function does nothing and always returns nil.
func WithShutdown(shutdown component.ShutdownFunc) Option {
return optionFunc(func(o *baseExporter) error {
o.ShutdownFunc = shutdown
return nil
})
return internal.WithShutdown(shutdown)
}

// WithTimeout overrides the default TimeoutSettings for an exporter.
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutConfig TimeoutConfig) Option {
return optionFunc(func(o *baseExporter) error {
o.timeoutSender.cfg = timeoutConfig
return nil
})
return internal.WithTimeout(timeoutConfig)
}

// WithRetry overrides the default configretry.BackOffConfig for an exporter.
// The default configretry.BackOffConfig is to disable retries.
func WithRetry(config configretry.BackOffConfig) Option {
return optionFunc(func(o *baseExporter) error {
if !config.Enabled {
o.exportFailureMessage += " Try enabling retry_on_failure config option to retry on retryable errors."
return nil
}
o.retrySender = newRetrySender(config, o.set)
return nil
})
return internal.WithRetry(config)
}

// WithQueue overrides the default QueueConfig for an exporter.
// The default QueueConfig is to disable queueing.
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueConfig) Option {
return optionFunc(func(o *baseExporter) error {
if o.marshaler == nil || o.unmarshaler == nil {
return fmt.Errorf("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
Marshaler: o.marshaler,
Unmarshaler: o.unmarshaler,
})
q := qf(context.Background(), exporterqueue.Settings{
DataType: o.signal,
ExporterSettings: o.set,
}, exporterqueue.Config{
Enabled: config.Enabled,
NumConsumers: config.NumConsumers,
QueueSize: config.QueueSize,
})
o.queueSender = newQueueSender(q, o.set, config.NumConsumers, o.exportFailureMessage, o.obsrep)
return nil
})
func WithQueue(config internal.QueueConfig) Option {
return internal.WithQueue(config)
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return optionFunc(func(o *baseExporter) error {
if o.marshaler != nil || o.unmarshaler != nil {
return fmt.Errorf("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
}
if !cfg.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
o.queueCfg = cfg
o.queueFactory = queueFactory
return nil
})
return internal.WithRequestQueue(cfg, queueFactory)
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
func WithCapabilities(capabilities consumer.Capabilities) Option {
return optionFunc(func(o *baseExporter) error {
o.consumerOptions = append(o.consumerOptions, consumer.WithCapabilities(capabilities))
return nil
})
return internal.WithCapabilities(capabilities)
}

// BatcherOption apply changes to batcher sender.
type BatcherOption interface {
apply(*batchSender) error
}

type batcherOptionFunc func(*batchSender) error

func (of batcherOptionFunc) apply(e *batchSender) error {
return of(e)
}
type BatcherOption = internal.BatcherOption

// WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types.
func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption {
return batcherOptionFunc(func(bs *batchSender) error {
if mf == nil || msf == nil {
return fmt.Errorf("WithRequestBatchFuncs must be provided with non-nil functions")
}
if bs.mergeFunc != nil || bs.mergeSplitFunc != nil {
return fmt.Errorf("WithRequestBatchFuncs can only be used once with request-based exporters")
}
bs.mergeFunc = mf
bs.mergeSplitFunc = msf
return nil
})
return internal.WithRequestBatchFuncs(mf, msf)
}

// WithBatcher enables batching for an exporter based on custom request types.
Expand All @@ -184,196 +75,5 @@ func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf expor
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option {
return optionFunc(func(o *baseExporter) error {
if !cfg.Enabled {
return nil
}

bs := newBatchSender(cfg, o.set, o.batchMergeFunc, o.batchMergeSplitfunc)
for _, opt := range opts {
if err := opt.apply(bs); err != nil {
return err
}
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
return fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")
}
o.batchSender = bs
return nil
})
}

// withMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withMarshaler(marshaler exporterqueue.Marshaler[Request]) Option {
return optionFunc(func(o *baseExporter) error {
o.marshaler = marshaler
return nil
})
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withUnmarshaler(unmarshaler exporterqueue.Unmarshaler[Request]) Option {
return optionFunc(func(o *baseExporter) error {
o.unmarshaler = unmarshaler
return nil
})
}

// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters.
// It must be provided as the first option when creating a new exporter helper.
func withBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) Option {
return optionFunc(func(o *baseExporter) error {
o.batchMergeFunc = mf
o.batchMergeSplitfunc = msf
return nil
})
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.StartFunc
component.ShutdownFunc

signal component.DataType

batchMergeFunc exporterbatcher.BatchMergeFunc[Request]
batchMergeSplitfunc exporterbatcher.BatchMergeSplitFunc[Request]

marshaler exporterqueue.Marshaler[Request]
unmarshaler exporterqueue.Unmarshaler[Request]

set exporter.Settings
obsrep *obsReport

// Message for the user to be added with an export failure message.
exportFailureMessage string

// Chain of senders that the exporter helper applies before passing the data to the actual exporter.
// The data is handled by each sender in the respective order starting from the queueSender.
// Most of the senders are optional, and initialized with a no-op path-through sender.
batchSender requestSender
queueSender requestSender
obsrepSender requestSender
retrySender requestSender
timeoutSender *timeoutSender // timeoutSender is always initialized.

consumerOptions []consumer.Option

queueCfg exporterqueue.Config
queueFactory exporterqueue.Factory[Request]
batcherCfg exporterbatcher.Config
batcherOpts []BatcherOption
}

func newBaseExporter(set exporter.Settings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
obsReport, err := newExporter(obsReportSettings{exporterID: set.ID, exporterCreateSettings: set, dataType: signal})
if err != nil {
return nil, err
}

be := &baseExporter{
signal: signal,

batchSender: &baseRequestSender{},
queueSender: &baseRequestSender{},
obsrepSender: osf(obsReport),
retrySender: &baseRequestSender{},
timeoutSender: &timeoutSender{cfg: NewDefaultTimeoutConfig()},

set: set,
obsrep: obsReport,
}

for _, op := range options {
err = multierr.Append(err, op.apply(be))
}
if err != nil {
return nil, err
}

if be.batcherCfg.Enabled {
bs := newBatchSender(be.batcherCfg, be.set, be.batchMergeFunc, be.batchMergeSplitfunc)
for _, opt := range be.batcherOpts {
err = multierr.Append(err, opt.apply(bs))
}
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
be.batchSender = bs
}

if be.queueCfg.Enabled {
set := exporterqueue.Settings{
DataType: be.signal,
ExporterSettings: be.set,
}
be.queueSender = newQueueSender(be.queueFactory(context.Background(), set, be.queueCfg), be.set, be.queueCfg.NumConsumers, be.exportFailureMessage, be.obsrep)
for _, op := range options {
err = multierr.Append(err, op.apply(be))
}
}

if err != nil {
return nil, err
}

be.connectSenders()

if bs, ok := be.batchSender.(*batchSender); ok {
// If queue sender is enabled assign to the batch sender the same number of workers.
if qs, ok := be.queueSender.(*queueSender); ok {
bs.concurrencyLimit = int64(qs.numConsumers)
}
// Batcher sender mutates the data.
be.consumerOptions = append(be.consumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
}

return be, nil
}

// send sends the request using the first sender in the chain.
func (be *baseExporter) send(ctx context.Context, req Request) error {
err := be.queueSender.send(ctx, req)
if err != nil {
be.set.Logger.Error("Exporting failed. Rejecting data."+be.exportFailureMessage,
zap.Error(err), zap.Int("rejected_items", req.ItemsCount()))
}
return err
}

// connectSenders connects the senders in the predefined order.
func (be *baseExporter) connectSenders() {
be.queueSender.setNextSender(be.batchSender)
be.batchSender.setNextSender(be.obsrepSender)
be.obsrepSender.setNextSender(be.retrySender)
be.retrySender.setNextSender(be.timeoutSender)
}

func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
return err
}

// If no error then start the batchSender.
if err := be.batchSender.Start(ctx, host); err != nil {
return err
}

// Last start the queueSender.
return be.queueSender.Start(ctx, host)
}

func (be *baseExporter) Shutdown(ctx context.Context) error {
return multierr.Combine(
// First shutdown the retry sender, so the queue sender can flush the queue without retries.
be.retrySender.Shutdown(ctx),
// Then shutdown the batch sender
be.batchSender.Shutdown(ctx),
// Then shutdown the queue sender.
be.queueSender.Shutdown(ctx),
// Last shutdown the wrapped exporter itself.
be.ShutdownFunc.Shutdown(ctx))
return internal.WithBatcher(cfg, opts...)
}
6 changes: 0 additions & 6 deletions exporter/exporterhelper/exporterhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,3 @@ type Request = internal.Request
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type RequestErrorHandler = internal.RequestErrorHandler

// extractPartialRequest returns a new Request that may contain the items left to be sent
// if only some items failed to process and can be retried. Otherwise, it returns the original Request.
func extractPartialRequest(req Request, err error) Request {
return internal.ExtractPartialRequest(req, err)
}
Loading

0 comments on commit df3c9e3

Please sign in to comment.