diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go new file mode 100644 index 00000000000..9149dda6613 --- /dev/null +++ b/exporter/internal/queue/batcher.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "errors" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type batch struct { + ctx context.Context + req internal.Request + idxList []uint64 +} + +// Batcher is in charge of reading items from the queue and send them out asynchronously. +type Batcher interface { + component.Component +} + +type BaseBatcher struct { + batchCfg exporterbatcher.Config + queue Queue[internal.Request] + maxWorkers int + stopWG sync.WaitGroup +} + +func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) { + if maxWorkers != 0 { + return nil, errors.ErrUnsupported + } + + if batchCfg.Enabled { + return nil, errors.ErrUnsupported + } + + return &DisabledBatcher{ + BaseBatcher{ + batchCfg: batchCfg, + queue: queue, + maxWorkers: maxWorkers, + stopWG: sync.WaitGroup{}, + }, + }, nil +} + +// flush exports the incoming batch synchronously. +func (qb *BaseBatcher) flush(batchToFlush batch) { + err := batchToFlush.req.Export(batchToFlush.ctx) + for _, idx := range batchToFlush.idxList { + qb.queue.OnProcessingFinished(idx, err) + } +} + +// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. +func (qb *BaseBatcher) flushAsync(batchToFlush batch) { + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + qb.flush(batchToFlush) + }() +} + +// Shutdown ensures that queue and all Batcher are stopped. +func (qb *BaseBatcher) Shutdown(_ context.Context) error { + qb.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go new file mode 100644 index 00000000000..c5078885538 --- /dev/null +++ b/exporter/internal/queue/disabled_batcher.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + + "go.opentelemetry.io/collector/component" +) + +// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will +// be sent out (asynchronously) immediately regardless of the size. +type DisabledBatcher struct { + BaseBatcher +} + +// Start starts the goroutine that reads from the queue and flushes asynchronously. +func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { + // This goroutine reads and then flushes. + // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. + // 2. flushAsync() blocks until there are idle workers in the worker pool. + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + idx, _, req, ok := qb.queue.Read(context.Background()) + if !ok { + return + } + qb.flushAsync(batch{ + req: req, + ctx: context.Background(), + idxList: []uint64{idx}}) + } + }() + return nil +} diff --git a/exporter/internal/queue/disabled_batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go new file mode 100644 index 00000000000..e031ce83870 --- /dev/null +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + maxWorkers := 0 + ba, err := NewBatcher(cfg, q, maxWorkers) + require.NoError(t, err) + + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, q.Shutdown(context.Background())) + require.NoError(t, ba.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 30*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25 + }, 30*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38 + }, 30*time.Millisecond, 10*time.Millisecond) +} + +func TestDisabledBatcher_LimitedWorkerNotImplemented(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + maxWorkers := 1 + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + _, err := NewBatcher(cfg, q, maxWorkers) + require.Error(t, err) +} + +func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = true + maxWorkers := 0 + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + _, err := NewBatcher(cfg, q, maxWorkers) + require.Error(t, err) +} diff --git a/exporter/internal/queue/fake_request.go b/exporter/internal/queue/fake_request.go new file mode 100644 index 00000000000..a0983db6d35 --- /dev/null +++ b/exporter/internal/queue/fake_request.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type fakeRequestSink struct { + requestsCount *atomic.Int64 + itemsCount *atomic.Int64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: new(atomic.Int64), + itemsCount: new(atomic.Int64), + } +} + +type fakeRequest struct { + items int + exportErr error + delay time.Duration + sink *fakeRequestSink +} + +func (r *fakeRequest) Export(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.delay): + } + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(int64(r.items)) + } + return nil +} + +func (r *fakeRequest) ItemsCount() int { + return r.items +} + +func (r *fakeRequest) Merge(_ context.Context, + _ internal.Request) (internal.Request, error) { + return nil, errors.New("not implemented") +} + +func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, + _ internal.Request) ([]internal.Request, error) { + return nil, errors.New("not implemented") +}