Skip to content

Commit

Permalink
[exporterqueue] Bare minimum frame of queue batcher + unit test. (#11532
Browse files Browse the repository at this point in the history
)

#### Description

This PR is a bare minimum implementation of a component called queue
batcher. On completion, this component will replace `consumers` in
`queue_sender`, and thus moving queue-batch from a pulling model instead
of pushing model.

Limitations of the current code
* This implements only the case where batching is disabled, which means
no merge of splitting of requests + no timeout flushing.
* This implementation does not enforce an upper bound on concurrency

All these code paths are marked as panic currently, and they will be
replaced with actual implementation in coming PRs. This PR is split from
#11507 for
easier review.

Design doc:

https://docs.google.com/document/d/1y5jt7bQ6HWt04MntF8CjUwMBBeNiJs2gV4uUZfJjAsE/edit?usp=sharing


#### Link to tracking issue

#8122
#10368
  • Loading branch information
sfc-gh-sili authored Oct 26, 2024
1 parent dfc232e commit 3e13ca1
Show file tree
Hide file tree
Showing 4 changed files with 264 additions and 0 deletions.
74 changes: 74 additions & 0 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type batch struct {
ctx context.Context
req internal.Request
idxList []uint64
}

// Batcher is in charge of reading items from the queue and send them out asynchronously.
type Batcher interface {
component.Component
}

type BaseBatcher struct {
batchCfg exporterbatcher.Config
queue Queue[internal.Request]
maxWorkers int
stopWG sync.WaitGroup
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) {
if maxWorkers != 0 {
return nil, errors.ErrUnsupported
}

if batchCfg.Enabled {
return nil, errors.ErrUnsupported
}

return &DisabledBatcher{
BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
stopWG: sync.WaitGroup{},
},
}, nil
}

// flush exports the incoming batch synchronously.
func (qb *BaseBatcher) flush(batchToFlush batch) {
err := batchToFlush.req.Export(batchToFlush.ctx)
for _, idx := range batchToFlush.idxList {
qb.queue.OnProcessingFinished(idx, err)
}
}

// flushAsync starts a goroutine that calls flushIfNecessary. It blocks until a worker is available.
func (qb *BaseBatcher) flushAsync(batchToFlush batch) {
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
qb.flush(batchToFlush)
}()
}

// Shutdown ensures that queue and all Batcher are stopped.
func (qb *BaseBatcher) Shutdown(_ context.Context) error {
qb.stopWG.Wait()
return nil
}
38 changes: 38 additions & 0 deletions exporter/internal/queue/disabled_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"

"go.opentelemetry.io/collector/component"
)

// DisabledBatcher is a special-case of Batcher that has no size limit for sending. Any items read from the queue will
// be sent out (asynchronously) immediately regardless of the size.
type DisabledBatcher struct {
BaseBatcher
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *DisabledBatcher) Start(_ context.Context, _ component.Host) error {
// This goroutine reads and then flushes.
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped.
// 2. flushAsync() blocks until there are idle workers in the worker pool.
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
idx, _, req, ok := qb.queue.Read(context.Background())
if !ok {
return
}
qb.flushAsync(batch{
req: req,
ctx: context.Background(),
idxList: []uint64{idx}})
}
}()
return nil
}
89 changes: 89 additions & 0 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

func TestDisabledBatcher_InfiniteWorkerPool(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

maxWorkers := 0
ba, err := NewBatcher(cfg, q, maxWorkers)
require.NoError(t, err)

require.NoError(t, q.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, q.Shutdown(context.Background()))
require.NoError(t, ba.Shutdown(context.Background()))
})

sink := newFakeRequestSink()

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, sink: sink}))
require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 8, exportErr: errors.New("transient error"), sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 8
}, 30*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 17, sink: sink}))
assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 2 && sink.itemsCount.Load() == 25
}, 30*time.Millisecond, 10*time.Millisecond)

require.NoError(t, q.Offer(context.Background(), &fakeRequest{items: 13, sink: sink}))

assert.Eventually(t, func() bool {
return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 38
}, 30*time.Millisecond, 10*time.Millisecond)
}

func TestDisabledBatcher_LimitedWorkerNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = false
maxWorkers := 1

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

_, err := NewBatcher(cfg, q, maxWorkers)
require.Error(t, err)
}

func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
maxWorkers := 0

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

_, err := NewBatcher(cfg, q, maxWorkers)
require.Error(t, err)
}
63 changes: 63 additions & 0 deletions exporter/internal/queue/fake_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"errors"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/exporter/exporterbatcher"
"go.opentelemetry.io/collector/exporter/internal"
)

type fakeRequestSink struct {
requestsCount *atomic.Int64
itemsCount *atomic.Int64
}

func newFakeRequestSink() *fakeRequestSink {
return &fakeRequestSink{
requestsCount: new(atomic.Int64),
itemsCount: new(atomic.Int64),
}
}

type fakeRequest struct {
items int
exportErr error
delay time.Duration
sink *fakeRequestSink
}

func (r *fakeRequest) Export(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(r.delay):
}
if r.exportErr != nil {
return r.exportErr
}
if r.sink != nil {
r.sink.requestsCount.Add(1)
r.sink.itemsCount.Add(int64(r.items))
}
return nil
}

func (r *fakeRequest) ItemsCount() int {
return r.items
}

func (r *fakeRequest) Merge(_ context.Context,
_ internal.Request) (internal.Request, error) {
return nil, errors.New("not implemented")
}

func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig,
_ internal.Request) ([]internal.Request, error) {
return nil, errors.New("not implemented")
}

0 comments on commit 3e13ca1

Please sign in to comment.