Skip to content

Commit

Permalink
Batch exporter merge()
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Oct 29, 2024
1 parent a58c48b commit 21a6c7c
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 22 deletions.
17 changes: 14 additions & 3 deletions exporter/internal/queue/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,23 @@ type BaseBatcher struct {
}

func NewBatcher(batchCfg exporterbatcher.Config, queue Queue[internal.Request], maxWorkers int) (Batcher, error) {
if batchCfg.Enabled {
if !batchCfg.Enabled {
return &DisabledBatcher{
BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
stopWG: sync.WaitGroup{},
},
}, nil
}

if batchCfg.MaxSizeConfig.MaxSizeItems != 0 {
return nil, errors.ErrUnsupported
}

return &DisabledBatcher{
BaseBatcher{
return &QueueBatcher{
BaseBatcher: BaseBatcher{
batchCfg: batchCfg,
queue: queue,
maxWorkers: maxWorkers,
Expand Down
15 changes: 0 additions & 15 deletions exporter/internal/queue/disabled_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,3 @@ func TestDisabledBatcher_Basic(t *testing.T) {
})
}
}

func TestDisabledBatcher_BatchingNotImplemented(t *testing.T) {
cfg := exporterbatcher.NewDefaultConfig()
cfg.Enabled = true
maxWorkers := 0

q := NewBoundedMemoryQueue[internal.Request](
MemoryQueueSettings[internal.Request]{
Sizer: &RequestSizer[internal.Request]{},
Capacity: 10,
})

_, err := NewBatcher(cfg, q, maxWorkers)
require.Error(t, err)
}
19 changes: 15 additions & 4 deletions exporter/internal/queue/fake_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ func newFakeRequestSink() *fakeRequestSink {
type fakeRequest struct {
items int
exportErr error
delay time.Duration
sink *fakeRequestSink

mergeErr error
delay time.Duration
sink *fakeRequestSink
}

func (r *fakeRequest) Export(ctx context.Context) error {
Expand All @@ -53,8 +55,17 @@ func (r *fakeRequest) ItemsCount() int {
}

func (r *fakeRequest) Merge(_ context.Context,
_ internal.Request) (internal.Request, error) {
return nil, errors.New("not implemented")
r2 internal.Request) (internal.Request, error) {
fr2 := r2.(*fakeRequest)
if fr2.mergeErr != nil {
return nil, fr2.mergeErr
}
return &fakeRequest{
items: r.items + fr2.items,
sink: r.sink,
exportErr: fr2.exportErr,
delay: r.delay + fr2.delay,
}, nil
}

func (r *fakeRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig,
Expand Down
111 changes: 111 additions & 0 deletions exporter/internal/queue/queue_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queue // import "go.opentelemetry.io/collector/exporter/internal/queue"

import (
"context"
"math"
"sync"
"time"

"go.opentelemetry.io/collector/component"
)

// QueueBatcher continuosly reads from the queue and flushes asynchronously if size limit is met or on timeout.
type QueueBatcher struct {
BaseBatcher
currentBatchMu sync.Mutex
currentBatch *batch
timer *time.Timer
shutdownCh chan bool
}

func (qb *QueueBatcher) resetTimer() {
if qb.batchCfg.FlushTimeout != 0 {
qb.timer.Reset(qb.batchCfg.FlushTimeout)
}
}

// Start starts the goroutine that reads from the queue and flushes asynchronously.
func (qb *QueueBatcher) Start(_ context.Context, _ component.Host) error {
qb.startWorkerPool()
qb.shutdownCh = make(chan bool, 1)

if qb.batchCfg.FlushTimeout == 0 {
qb.timer = time.NewTimer(math.MaxInt)
qb.timer.Stop()
} else {
qb.timer = time.NewTimer(qb.batchCfg.FlushTimeout)
}

// This goroutine reads and then flushes.
// 1. Reading from the queue is blocked until the queue is non-empty or until the queue is stopped.
// 2. flushAsync() blocks until there are idle workers in the worker pool.
qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
idx, ctx, req, ok := qb.queue.Read(context.Background())
if !ok {
qb.shutdownCh <- true
return
}

qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.resetTimer()
qb.currentBatch = &batch{
req: req,
ctx: ctx,
idxList: []uint64{idx}}
} else {
mergedReq, mergeErr := qb.currentBatch.req.Merge(qb.currentBatch.ctx, req)
if mergeErr != nil {
qb.queue.OnProcessingFinished(idx, mergeErr)
qb.currentBatchMu.Unlock()
continue
}
qb.currentBatch = &batch{
req: mergedReq,
ctx: qb.currentBatch.ctx,
idxList: append(qb.currentBatch.idxList, idx)}
}

if qb.currentBatch.req.ItemsCount() > qb.batchCfg.MinSizeItems {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
qb.flushAsync(batchToFlush) // returns when it successfully started a goroutine for flushing.
qb.resetTimer()
} else {
qb.currentBatchMu.Unlock()
}
}
}()

qb.stopWG.Add(1)
go func() {
defer qb.stopWG.Done()
for {
select {
case <-qb.shutdownCh:
return
case <-qb.timer.C:
qb.currentBatchMu.Lock()
if qb.currentBatch == nil || qb.currentBatch.req == nil {
qb.currentBatchMu.Unlock()
continue

Check warning on line 98 in exporter/internal/queue/queue_batcher.go

View check run for this annotation

Codecov / codecov/patch

exporter/internal/queue/queue_batcher.go#L97-L98

Added lines #L97 - L98 were not covered by tests
}
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
qb.flushAsync(batchToFlush) // returns when it successfully started a goroutine for flushing.
qb.resetTimer()
}

}
}()

return nil
}
Loading

0 comments on commit 21a6c7c

Please sign in to comment.