Skip to content

Commit

Permalink
[exporterhelper] Add WithRequestQueue option to the exporter
Browse files Browse the repository at this point in the history
The new configuration interface for the end users provides a new `queue_size_items` option to limit the queue by a number of spans, log records, or metric data points. The previous way to limit the queue by number of requests is preserved under the same field, `queue_size,` which will later be deprecated through a longer transition process.
  • Loading branch information
dmitryax committed Jan 29, 2024
1 parent 1ed45ec commit da40531
Show file tree
Hide file tree
Showing 25 changed files with 387 additions and 101 deletions.
32 changes: 32 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add API for enabling queue in the new exporter helpers.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental API is introduced in exporter package:
- `exporterhelper.WithRequestQueue`: a new exporter helper option for using a queue.
- `exporterqueue.Queue`: an interface for queue implementations.
- `exporterqueue.Factory`: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option.
- `exporterqueue.Settings`: queue factory settings.
- `exporterqueue.Config`: common configuration for queue implementations.
- `exporterqueue.NewDefaultConfig`: a function for creating a default queue configuration.
- `exporterqueue.NewMemoryQueueFactory`: a new factory for creating a memory queue.
- `exporterqueue.NewPersistentQueueFactory: a factory for creating a persistent queue.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
43 changes: 38 additions & 5 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
Expand Down Expand Up @@ -85,17 +87,48 @@ func WithRetry(config configretry.BackOffConfig) Option {
func WithQueue(config QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return
}
consumeErrHandler := func(err error, req Request) {
o.set.Logger.Error("Exporting failed. Dropping data."+o.exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
qf := exporterqueue.NewPersistentQueueFactory[Request](config.StorageID, exporterqueue.PersistentQueueSettings[Request]{
Marshaler: o.marshaler,
Unmarshaler: o.unmarshaler,
})
queue := qf(exporterqueue.Settings[Request]{
Sizer: &internal.RequestSizer[Request]{},
Capacity: config.QueueSize,
DataType: o.signal,
ExporterSettings: o.set,
})
o.queueSender = newQueueSender(queue, o.set, config.NumConsumers, o.exportFailureMessage)
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return func(o *baseExporter) {
if !cfg.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return
}
set := exporterqueue.Settings[Request]{
DataType: o.signal,
ExporterSettings: o.set,
}
if cfg.QueueSizeItems > 0 {
set.Sizer = &internal.ItemsSizer[Request]{}
set.Capacity = cfg.QueueSizeItems
} else {
set.Sizer = &internal.RequestSizer[Request]{}
set.Capacity = cfg.QueueSize
}
o.queueSender = newQueueSender(config, o.set, o.signal, o.marshaler, o.unmarshaler, consumeErrHandler)
o.queueSender = newQueueSender(queueFactory(set), o.set, cfg.NumConsumers, o.exportFailureMessage)
}
}

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/pdata/plog"
)

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down
34 changes: 8 additions & 26 deletions exporter/exporterhelper/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exporterqueue"
"go.opentelemetry.io/collector/exporter/internal"
"go.opentelemetry.io/collector/internal/obsreportconfig/obsmetrics"
)

Expand Down Expand Up @@ -72,7 +73,7 @@ func (qCfg *QueueSettings) Validate() error {
type queueSender struct {
baseRequestSender
fullName string
queue internal.Queue[Request]
queue exporterqueue.Queue[Request]
traceAttribute attribute.KeyValue
logger *zap.Logger
meter otelmetric.Meter
Expand All @@ -82,28 +83,8 @@ type queueSender struct {
metricSize otelmetric.Int64ObservableGauge
}

func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal component.DataType,
marshaler RequestMarshaler, unmarshaler RequestUnmarshaler, consumeErrHandler func(error, Request)) *queueSender {

isPersistent := config.StorageID != nil
var queue internal.Queue[Request]
queueSizer := &internal.RequestSizer[Request]{}
if isPersistent {
queue = internal.NewPersistentQueue[Request](internal.PersistentQueueSettings[Request]{
Sizer: queueSizer,
Capacity: config.QueueSize,
DataType: signal,
StorageID: *config.StorageID,
Marshaler: marshaler,
Unmarshaler: unmarshaler,
ExporterSettings: set,
})
} else {
queue = internal.NewBoundedMemoryQueue[Request](internal.MemoryQueueSettings[Request]{
Sizer: queueSizer,
Capacity: config.QueueSize,
})
}
func newQueueSender(queue exporterqueue.Queue[Request], set exporter.CreateSettings, numConsumers int,
exportFailureMessage string) *queueSender {
qs := &queueSender{
fullName: set.ID.String(),
queue: queue,
Expand All @@ -114,11 +95,12 @@ func newQueueSender(config QueueSettings, set exporter.CreateSettings, signal co
consumeFunc := func(ctx context.Context, req Request) error {
err := qs.nextSender.send(ctx, req)
if err != nil {
consumeErrHandler(err, req)
set.Logger.Error("Exporting failed. Dropping data."+exportFailureMessage,
zap.Error(err), zap.Int("dropped_items", req.ItemsCount()))
}
return err
}
qs.consumers = internal.NewQueueConsumers(queue, config.NumConsumers, consumeFunc)
qs.consumers = internal.NewQueueConsumers[Request](queue, numConsumers, consumeFunc)
return qs
}

Expand Down
Loading

0 comments on commit da40531

Please sign in to comment.