Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitryax committed Nov 19, 2023
1 parent df85077 commit 0399b37
Show file tree
Hide file tree
Showing 32 changed files with 696 additions and 258 deletions.
34 changes: 34 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add API for enabling queue in the new exporter helpers.

# One or more tracking issues or pull requests related to the change
issues: [7874]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following experimental API is introduced in exporter/exporterhelper package:
- `WithRequestQueue`: a new exporter helper option for using a queue.
- queue.Queue: an interface for queue implementations.
- queue.Factory: a queue factory interface, implementations of this interface are intended to be used with WithRequestQueue option.
- queue.Settings: queue factory settings.
- queue.Config: common configuration for queue implementations.
- queue.NewDefaultConfig: a function for creating a default queue configuration.
- queue/memoryqueue.NewFactory: a new factory for creating a memory queue.
- queue/memoryqueue.Config: a configuration for the memory queue factory.
- queue/memoryqueue.NewDefaultConfig: a function for creating a default memory queue configuration.
All the new APIs are intended to be used by exporters that operate over client-provided requests instead of pdata.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
55 changes: 39 additions & 16 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue"
"go.opentelemetry.io/collector/exporter/exporterhelper/request"
)

// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
Expand All @@ -27,7 +29,9 @@ type baseRequestSender struct {
nextSender requestSender
}

var _ requestSender = (*baseRequestSender)(nil)
type shutdowner interface {
shutdown(ctx context.Context) error
}

func (b *baseRequestSender) send(ctx context.Context, req Request) error {
return b.nextSender.send(ctx, req)
Expand All @@ -51,7 +55,7 @@ func (l *errorLoggingRequestSender) send(ctx context.Context, req Request) error
return err
}

type obsrepSenderFactory func(obsrep *ObsReport) requestSender
type obsrepSenderFactory func(obsrep *ObsReport, nextSender *senderWrapper) requestSender

// Option apply changes to baseExporter.
type Option func(*baseExporter)
Expand All @@ -76,7 +80,7 @@ func WithShutdown(shutdown component.ShutdownFunc) Option {
// The default TimeoutSettings is 5 seconds.
func WithTimeout(timeoutSettings TimeoutSettings) Option {
return func(o *baseExporter) {
o.timeoutSender.cfg = timeoutSettings
o.timeoutSender.sender = &timeoutSender{cfg: timeoutSettings}
}
}

Expand All @@ -101,7 +105,7 @@ func WithRetry(config RetrySettings) Option {
func WithQueue(config QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
o.queueSender = &errorLoggingRequestSender{
Expand All @@ -114,6 +118,23 @@ func WithQueue(config QueueSettings) Option {
}
}

// WithRequestQueue enables queueing for an exporter.
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg queue.Config, queueFactory queue.Factory) Option {
return func(o *baseExporter) {
if !cfg.Enabled {
o.queueSender = &errorLoggingRequestSender{
logger: o.set.Logger,
message: "Exporting failed. Dropping data. Try enabling sending_queue to survive temporary failures.",
}
return
}
qs := newQueueSender(o.set, cfg, o.signal, queueFactory, newItemsCapacityLimiter(cfg.QueueItemsSize), o.queueSender.nextSender)
}
}

// WithCapabilities overrides the default Capabilities() function for a Consumer.
// The default is non-mutable data.
// TODO: Verify if we can change the default to be mutable as we do for processors.
Expand All @@ -129,8 +150,8 @@ type baseExporter struct {
component.ShutdownFunc

requestExporter bool
marshaler RequestMarshaler
unmarshaler RequestUnmarshaler
marshaler request.Marshaler
unmarshaler request.Unmarshaler
signal component.DataType

set exporter.CreateSettings
Expand All @@ -148,8 +169,9 @@ type baseExporter struct {
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, marshaler RequestMarshaler,
unmarshaler RequestUnmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool,
marshaler request.Marshaler,
unmarshaler request.Unmarshaler, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
Expand All @@ -171,10 +193,18 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
obsrep: obsReport,
}

// Initialize the chain of senders in the reverse order.
be.timeoutSender = &senderWrapper{sender: &timeoutSender{cfg: NewDefaultTimeoutSettings()}}
be.retrySender = &senderWrapper{
sender: &errorLoggingRequestSender{logger: set.Logger, nextSender: be.timeoutSender},
nextSender: be.timeoutSender,
}
be.obsrepSender = &senderWrapper{sender: osf(obsReport, be.retrySender)}
be.queueSender = &senderWrapper{nextSender: be.obsrepSender}

for _, op := range options {
op(be)
}
be.connectSenders()

return be, nil
}
Expand All @@ -184,13 +214,6 @@ func (be *baseExporter) send(ctx context.Context, req Request) error {
return be.queueSender.send(ctx, req)
}

// connectSenders connects the senders in the predefined order.
func (be *baseExporter) connectSenders() {
be.queueSender.setNextSender(be.obsrepSender)
be.obsrepSender.setNextSender(be.retrySender)
be.retrySender.setNextSender(be.timeoutSender)
}

func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
if err := be.StartFunc.Start(ctx, host); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ var (
}
)

func newNoopObsrepSender(_ *ObsReport) requestSender {
return &baseRequestSender{}
func newNoopObsrepSender(_ *ObsReport, nextSender *senderWrapper) requestSender {
return &senderWrapper{nextSender: nextSender}
}

func TestBaseExporter(t *testing.T) {
Expand Down
47 changes: 24 additions & 23 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,53 +7,62 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter/exporterhelper/queue"
"go.opentelemetry.io/collector/exporter/exporterhelper/request"
"sync/atomic"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
)

// boundedMemoryQueue implements a producer-consumer exchange similar to a ring buffer queue,
// where the queue is bounded and if it fills up due to slow consumers, the new items written by
// the producer are dropped.
type boundedMemoryQueue[T itemsCounter] struct {
type boundedMemoryQueue struct {
component.StartFunc
queue.CapacityLimiter
logger *zap.Logger
stopped *atomic.Bool
items chan queueRequest[T]
items chan queueRequest
}

// NewBoundedMemoryQueue constructs the new queue of specified capacity, and with an optional
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue[T itemsCounter](logger *zap.Logger, capacity int) Queue[T] {
return &boundedMemoryQueue[T]{
logger: logger,
items: make(chan queueRequest[T], capacity),
stopped: &atomic.Bool{},
func NewBoundedMemoryQueue(logger *zap.Logger, capacityLimiter queue.CapacityLimiter) queue.Queue {
return &boundedMemoryQueue{
CapacityLimiter: capacityLimiter,
logger: logger,
items: make(chan queueRequest, capacityLimiter.Capacity()),
stopped: &atomic.Bool{},
}
}

// Offer is used by the producer to submit new item to the queue.
func (q *boundedMemoryQueue[T]) Offer(ctx context.Context, req T) error {
func (q *boundedMemoryQueue) Offer(ctx context.Context, req request.Request) error {
if q.stopped.Load() {
return ErrQueueIsStopped
}

if !q.CapacityLimiter.Claim(req) {
return ErrQueueIsFull
}

select {
case q.items <- queueRequest[T]{
case q.items <- queueRequest{
req: req,
ctx: ctx,
}:
return nil
default:
// Should never happen since we checked q.CapacityLimiter.Claim(req) above.
return ErrQueueIsFull
}
}

// Consume consumes an item from the queue once it's available. It returns false if the queue is stopped.
func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool {
func (q *boundedMemoryQueue) Consume(consumeFunc func(context.Context, request.Request) error) bool {
item, ok := <-q.items
if !ok {
return false
Expand All @@ -63,26 +72,18 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err
q.logger.Error("Exporting failed. Dropping data.", zap.Error(err),
zap.Int("dropped_items", item.req.ItemsCount()))
}
q.CapacityLimiter.Release(item.req)
return true
}

// Shutdown stops accepting items, and stops all consumers. It blocks until all consumers have stopped.
func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error {
func (q *boundedMemoryQueue) Shutdown(context.Context) error {
q.stopped.Store(true) // disable producer
close(q.items)
return nil
}

// Size returns the current size of the queue
func (q *boundedMemoryQueue[T]) Size() int {
return len(q.items)
}

func (q *boundedMemoryQueue[T]) Capacity() int {
return cap(q.items)
}

type queueRequest[T itemsCounter] struct {
req T
type queueRequest struct {
req request.Request
ctx context.Context
}
Loading

0 comments on commit 0399b37

Please sign in to comment.