From 3e13ca11e000ed3b5494b4565c1cedbd8e24815e Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Fri, 25 Oct 2024 20:47:50 -0700 Subject: [PATCH] [exporterqueue] Bare minimum frame of queue batcher + unit test. (#11532) #### Description This PR is a bare minimum implementation of a component called queue batcher. On completion, this component will replace `consumers` in `queue_sender`, and thus moving queue-batch from a pulling model instead of pushing model. Limitations of the current code * This implements only the case where batching is disabled, which means no merge of splitting of requests + no timeout flushing. * This implementation does not enforce an upper bound on concurrency All these code paths are marked as panic currently, and they will be replaced with actual implementation in coming PRs. This PR is split from https://github.com/open-telemetry/opentelemetry-collector/pull/11507 for easier review. Design doc: https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing #### Link to tracking issue https://github.com/open-telemetry/opentelemetry-collector/issues/8122 https://github.com/open-telemetry/opentelemetry-collector/issues/10368 --- exporter/internal/queue/batcher.go | 74 +++++++++++++++ exporter/internal/queue/disabled_batcher.go | 38 ++++++++ .../internal/queue/disabled_batcher_test.go | 89 +++++++++++++++++++ exporter/internal/queue/fake_request.go | 63 +++++++++++++ 4 files changed, 264 insertions(+) create mode 100644 exporter/internal/queue/batcher.go create mode 100644 exporter/internal/queue/disabled_batcher.go create mode 100644 exporter/internal/queue/disabled_batcher_test.go create mode 100644 exporter/internal/queue/fake_request.go diff --git a/exporter/internal/queue/batcher.go b/exporter/internal/queue/batcher.go new file mode 100644 index 00000000000..9149dda6613 --- /dev/null +++ b/exporter/internal/queue/batcher.go @@ -0,0 +1,74 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "errors" + "sync" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type batch struct { + ctx context.Context + req internal.Request + idxList []uint64 +} + +// Batcher is in charge of reading items from the queue and send them out asynchronously. +type Batcher interface { + component.Component +} + +type BaseBatcher struct { + batchCfg exporterbatcher.Config + queue Queue[internal.Request] + maxWorkers int + stopWG sync.WaitGroup +} + +func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) { + if maxWorkers != 0 { + return nil, errors.ErrUnsupported + } + + if batchCfg.Enabled { + return nil, errors.ErrUnsupported + } + + return &DisabledBatcher{ + BaseBatcher{ + batchCfg: batchCfg, + queue: queue, + maxWorkers: maxWorkers, + stopWG: sync.WaitGroup{}, + }, + }, nil +} + +// flush exports the incoming batch synchronously. +func (qb *BaseBatcher) flush(batchToFlush batch) { + err := batchToFlush.req.Export(batchToFlush.ctx) + for _, idx := range batchToFlush.idxList { + qb.queue.OnProcessingFinished(idx, err) + } +} + +// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available. +func (qb *BaseBatcher) flushAsync(batchToFlush batch) { + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + qb.flush(batchToFlush) + }() +} + +// Shutdown ensures that queue and all Batcher are stopped. +func (qb *BaseBatcher) Shutdown(_ context.Context) error { + qb.stopWG.Wait() + return nil +} diff --git a/exporter/internal/queue/disabled_batcher.go b/exporter/internal/queue/disabled_batcher.go new file mode 100644 index 00000000000..c5078885538 --- /dev/null +++ b/exporter/internal/queue/disabled_batcher.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + + "go.opentelemetry.io/collector/component" +) + +// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will +// be sent out (asynchronously) immediately regardless of the size. +type DisabledBatcher struct { + BaseBatcher +} + +// Start starts the goroutine that reads from the queue and flushes asynchronously. +func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error { + // This goroutine reads and then flushes. + // 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped. + // 2. flushAsync() blocks until there are idle workers in the worker pool. + qb.stopWG.Add(1) + go func() { + defer qb.stopWG.Done() + for { + idx, _, req, ok := qb.queue.Read(context.Background()) + if !ok { + return + } + qb.flushAsync(batch{ + req: req, + ctx: context.Background(), + idxList: []uint64{idx}}) + } + }() + return nil +} diff --git a/exporter/internal/queue/disabled_batcher_test.go b/exporter/internal/queue/disabled_batcher_test.go new file mode 100644 index 00000000000..e031ce83870 --- /dev/null +++ b/exporter/internal/queue/disabled_batcher_test.go @@ -0,0 +1,89 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + maxWorkers := 0 + ba, err := NewBatcher(cfg, q, maxWorkers) + require.NoError(t, err) + + require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { + require.NoError(t, q.Shutdown(context.Background())) + require.NoError(t, ba.Shutdown(context.Background())) + }) + + sink := newFakeRequestSink() + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink})) + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8 + }, 30*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink})) + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25 + }, 30*time.Millisecond, 10*time.Millisecond) + + require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink})) + + assert.Eventually(t, func() bool { + return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38 + }, 30*time.Millisecond, 10*time.Millisecond) +} + +func TestDisabledBatcher_LimitedWorkerNotImplemented(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = false + maxWorkers := 1 + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + _, err := NewBatcher(cfg, q, maxWorkers) + require.Error(t, err) +} + +func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) { + cfg := exporterbatcher.NewDefaultConfig() + cfg.Enabled = true + maxWorkers := 0 + + q := NewBoundedMemoryQueue[internal.Request]( + MemoryQueueSettings[internal.Request]{ + Sizer: &RequestSizer[internal.Request]{}, + Capacity: 10, + }) + + _, err := NewBatcher(cfg, q, maxWorkers) + require.Error(t, err) +} diff --git a/exporter/internal/queue/fake_request.go b/exporter/internal/queue/fake_request.go new file mode 100644 index 00000000000..a0983db6d35 --- /dev/null +++ b/exporter/internal/queue/fake_request.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package queue // import "go.opentelemetry.io/collector/exporter/internal/queue" + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" +) + +type fakeRequestSink struct { + requestsCount *atomic.Int64 + itemsCount *atomic.Int64 +} + +func newFakeRequestSink() *fakeRequestSink { + return &fakeRequestSink{ + requestsCount: new(atomic.Int64), + itemsCount: new(atomic.Int64), + } +} + +type fakeRequest struct { + items int + exportErr error + delay time.Duration + sink *fakeRequestSink +} + +func (r *fakeRequest) Export(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(r.delay): + } + if r.exportErr != nil { + return r.exportErr + } + if r.sink != nil { + r.sink.requestsCount.Add(1) + r.sink.itemsCount.Add(int64(r.items)) + } + return nil +} + +func (r *fakeRequest) ItemsCount() int { + return r.items +} + +func (r *fakeRequest) Merge(_ context.Context, + _ internal.Request) (internal.Request, error) { + return nil, errors.New("not implemented") +} + +func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, + _ internal.Request) ([]internal.Request, error) { + return nil, errors.New("not implemented") +}