From f7c608dc724b62eeeae62c104411760176cc85f0 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 16 Oct 2024 01:06:04 -0700 Subject: [PATCH 1/8] mergeBatchFunc moved to request --- exporter/exporterbatcher/batch_func.go | 24 -- exporter/exporterhelper/common.go | 8 - exporter/exporterhelper/exporterhelper.go | 1 + .../exporterhelper/internal/base_exporter.go | 18 +- .../exporterhelper/internal/batch_sender.go | 36 +- .../internal/batch_sender_test.go | 352 ++++++------------ exporter/exporterhelper/internal/request.go | 70 ++++ exporter/exporterhelper/logs.go | 1 - exporter/exporterhelper/logs_batch.go | 15 +- exporter/exporterhelper/logs_batch_test.go | 35 +- exporter/exporterhelper/metrics.go | 1 - exporter/exporterhelper/metrics_batch.go | 15 +- exporter/exporterhelper/metrics_batch_test.go | 34 +- exporter/exporterhelper/traces.go | 1 - exporter/exporterhelper/traces_batch.go | 15 +- exporter/exporterhelper/traces_batch_test.go | 27 +- exporter/internal/request.go | 12 + 17 files changed, 276 insertions(+), 389 deletions(-) delete mode 100644 exporter/exporterbatcher/batch_func.go diff --git a/exporter/exporterbatcher/batch_func.go b/exporter/exporterbatcher/batch_func.go deleted file mode 100644 index 0298276ba7b..00000000000 --- a/exporter/exporterbatcher/batch_func.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" - -import "context" - -// BatchMergeFunc is a function that merges two requests into a single request. -// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is -// marked as not mutable. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type BatchMergeFunc[T any] func(context.Context, T, T) (T, error) - -// BatchMergeSplitFunc is a function that merge and/or splits one or two requests into multiple requests based on the -// configured limit provided in MaxSizeConfig. -// All the returned requests MUST have a number of items that does not exceed the maximum number of items. -// Size of the last returned request MUST be less or equal than the size of any other returned request. -// The original request MUST not be mutated if error is returned after mutation or if the exporter is -// marked as not mutable. The length of the returned slice MUST not be 0. The optionalReq argument can be nil, -// make sure to check it before using. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type BatchMergeSplitFunc[T any] func(ctx context.Context, cfg MaxSizeConfig, optionalReq T, req T) ([]T, error) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 7f396f40776..ab1f0db4e0b 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -69,11 +69,3 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { func WithBatcher(cfg exporterbatcher.Config) Option { return internal.WithBatcher(cfg) } - -// WithBatchFuncs enables setting custom batch merge functions. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], - msf exporterbatcher.BatchMergeSplitFunc[Request]) Option { - return internal.WithBatchFuncs(mf, msf) -} diff --git a/exporter/exporterhelper/exporterhelper.go b/exporter/exporterhelper/exporterhelper.go index d9e90d821d9..488e26a00b6 100644 --- a/exporter/exporterhelper/exporterhelper.go +++ b/exporter/exporterhelper/exporterhelper.go @@ -8,6 +8,7 @@ import "go.opentelemetry.io/collector/exporter/internal" // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type Request = internal.Request +type BatchRequest = internal.BatchRequest // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial // temporary failures. For example, if some items failed to process and can be retried, this interface allows to diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 1aebb318c8f..763972e793e 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -35,9 +35,6 @@ type BaseExporter struct { Signal pipeline.Signal - BatchMergeFunc exporterbatcher.BatchMergeFunc[internal.Request] - BatchMergeSplitfunc exporterbatcher.BatchMergeSplitFunc[internal.Request] - Marshaler exporterqueue.Marshaler[internal.Request] Unmarshaler exporterqueue.Unmarshaler[internal.Request] @@ -104,10 +101,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe } if be.BatcherCfg.Enabled { - bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc) - if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { - err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")) - } + bs := NewBatchSender(be.BatcherCfg, be.Set) be.BatchSender = bs } @@ -298,16 +292,6 @@ func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Op } } -// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters. -// It must be provided as the first option when creating a new exporter helper. -func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) Option { - return func(o *BaseExporter) error { - o.BatchMergeFunc = mf - o.BatchMergeSplitfunc = msf - return nil - } -} - func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { if err != nil { require.Equal(t, codes.Error, sd.Status().Code, "SpanData %v", sd) diff --git a/exporter/exporterhelper/internal/batch_sender.go b/exporter/exporterhelper/internal/batch_sender.go index 65d7e0965f7..68f9da4a3ec 100644 --- a/exporter/exporterhelper/internal/batch_sender.go +++ b/exporter/exporterhelper/internal/batch_sender.go @@ -24,9 +24,7 @@ import ( // - concurrencyLimit is reached. type BatchSender struct { BaseRequestSender - cfg exporterbatcher.Config - mergeFunc exporterbatcher.BatchMergeFunc[internal.Request] - mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request] + cfg exporterbatcher.Config // concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher. // If this number is reached and all the goroutines are busy, the batch will be sent right away. @@ -46,14 +44,11 @@ type BatchSender struct { } // newBatchSender returns a new batch consumer component. -func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings, - mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) *BatchSender { +func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender { bs := &BatchSender{ activeBatch: newEmptyBatch(), cfg: cfg, logger: set.Logger, - mergeFunc: mf, - mergeSplitFunc: msf, shutdownCh: nil, shutdownCompleteCh: make(chan struct{}), stopped: &atomic.Bool{}, @@ -104,7 +99,7 @@ func (bs *BatchSender) Start(_ context.Context, _ component.Host) error { type batch struct { ctx context.Context - request internal.Request + request internal.BatchRequest done chan struct{} err error @@ -147,19 +142,26 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error { } if bs.cfg.MaxSizeItems > 0 { - return bs.sendMergeSplitBatch(ctx, req) + return bs.sendMergeSplitBatch(ctx, req.(internal.BatchRequest)) } - return bs.sendMergeBatch(ctx, req) + return bs.sendMergeBatch(ctx, req.(internal.BatchRequest)) } // sendMergeSplitBatch sends the request to the batch which may be split into multiple requests. -func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error { +func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.BatchRequest) error { bs.mu.Lock() - reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req) - if err != nil || len(reqs) == 0 { + var reqs []internal.BatchRequest + var mergeSplitErr error + if bs.activeBatch.request == nil { + reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil) + } else { + reqs, mergeSplitErr = bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req) + } + + if mergeSplitErr != nil || len(reqs) == 0 { bs.mu.Unlock() - return err + return mergeSplitErr } bs.activeRequests.Add(1) @@ -196,12 +198,12 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Req } // sendMergeBatch sends the request to the batch and waits for the batch to be exported. -func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) error { +func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.BatchRequest) error { bs.mu.Lock() if bs.activeBatch.request != nil { var err error - req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req) + req, err = bs.activeBatch.request.Merge(ctx, req) if err != nil { bs.mu.Unlock() return err @@ -224,7 +226,7 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) // The context is only set once and is not updated after the first call. // Merging the context would be complex and require an additional goroutine to handle the context cancellation. // We take the approach of using the context from the first request since it's likely to have the shortest timeout. -func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.Request) { +func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.BatchRequest) { if bs.activeBatch.request == nil { bs.activeBatch.ctx = ctx } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index f6d53bca0e0..f75febca205 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -47,7 +47,7 @@ func TestBatchSender_Merge(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + be := queueBatchExporter(t, tt.batcherOption) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -117,7 +117,7 @@ func TestBatchSender_BatchExportError(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + be := queueBatchExporter(t, tt.batcherOption) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -153,7 +153,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { cfg.MinSizeItems = 5 cfg.MaxSizeItems = 10 cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + be := queueBatchExporter(t, WithBatcher(cfg)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -170,7 +170,6 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { // big request should be broken down into two requests, both are sent right away. require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 17, sink: sink})) - assert.Eventually(t, func() bool { return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 }, 50*time.Millisecond, 10*time.Millisecond) @@ -190,7 +189,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { func TestBatchSender_Shutdown(t *testing.T) { batchCfg := exporterbatcher.NewDefaultConfig() batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + be := queueBatchExporter(t, WithBatcher(batchCfg)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -212,7 +211,6 @@ func TestBatchSender_Disabled(t *testing.T) { cfg.Enabled = false cfg.MaxSizeItems = 5 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(cfg)) require.NotNil(t, be) require.NoError(t, err) @@ -229,39 +227,38 @@ func TestBatchSender_Disabled(t *testing.T) { assert.Equal(t, int64(8), sink.itemsCount.Load()) } -func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { - invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ internal.Request, req2 internal.Request) ([]internal.Request, - error) { - // reply with invalid 0 length slice if req2 is more than 20 items - if req2.(*fakeRequest).items > 20 { - return []internal.Request{}, nil - } - // otherwise reply with a single request. - return []internal.Request{req2}, nil - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 20 - be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // first request should be ignored due to invalid merge/split function. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 30, sink: sink})) - // second request should be sent after reaching the timeout. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 15, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) -} +// func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { +// invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ internal.Request, req2 internal.Request) ([]internal.Request, +// error) { +// // reply with invalid 0 length slice if req2 is more than 20 items +// if req2.(*fakeRequest).items > 20 { +// return []internal.Request{}, nil +// } +// // otherwise reply with a single request. +// return []internal.Request{req2}, nil +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 20 +// be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // first request should be ignored due to invalid merge/split function. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 30, sink: sink})) +// // second request should be sent after reaching the timeout. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 15, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// } func TestBatchSender_PostShutdown(t *testing.T) { be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())) require.NotNil(t, be) require.NoError(t, err) @@ -323,7 +320,6 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.NumConsumers = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(tt.batcherCfg), WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]())) require.NotNil(t, be) @@ -379,7 +375,6 @@ func TestBatchSender_BatchBlocking(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 3 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) @@ -410,7 +405,6 @@ func TestBatchSender_BatchCancelled(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) @@ -446,7 +440,6 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) @@ -476,45 +469,8 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) { assert.Equal(t, int64(3), sink.itemsCount.Load()) } -func TestBatchSender_WithBatcherOption(t *testing.T) { - tests := []struct { - name string - opts []Option - expectedErr bool - }{ - { - name: "no_funcs_set", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: true, - }, - { - name: "funcs_set_internally", - opts: []Option{WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: false, - }, - { - name: "nil_funcs", - opts: []Option{WithBatchFuncs(nil, nil), WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, tt.opts...) - if tt.expectedErr { - assert.Nil(t, be) - assert.Error(t, err) - } else { - assert.NotNil(t, be) - assert.NoError(t, err) - } - }) - } -} - func TestBatchSender_UnstartedShutdown(t *testing.T) { be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())) require.NoError(t, err) @@ -524,51 +480,50 @@ func TestBatchSender_UnstartedShutdown(t *testing.T) { // TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being // merged. -func TestBatchSender_ShutdownDeadlock(t *testing.T) { - blockMerge := make(chan struct{}) - waitMerge := make(chan struct{}, 10) - - // blockedBatchMergeFunc blocks until the blockMerge channel is closed - blockedBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { - waitMerge <- struct{}{} - <-blockMerge - r1.(*fakeRequest).items += r2.(*fakeRequest).items - return r1, nil - } - - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(bCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 2 concurrent requests - go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - - // Wait for the requests to enter the merge function - <-waitMerge - - // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, - // then wait for the exporter to finish. - startShutdown := make(chan struct{}) - doneShutdown := make(chan struct{}) - go func() { - close(startShutdown) - assert.NoError(t, be.Shutdown(context.Background())) - close(doneShutdown) - }() - <-startShutdown - close(blockMerge) - <-doneShutdown - - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) -} +// func TestBatchSender_ShutdownDeadlock(t *testing.T) { +// blockMerge := make(chan struct{}) +// waitMerge := make(chan struct{}, 10) + +// // blockedBatchMergeFunc blocks until the blockMerge channel is closed +// blockedBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { +// waitMerge <- struct{}{} +// <-blockMerge +// r1.(*fakeRequest).items += r2.(*fakeRequest).items +// return r1, nil +// } + +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg)) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests +// go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() +// go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() + +// // Wait for the requests to enter the merge function +// <-waitMerge + +// // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, +// // then wait for the exporter to finish. +// startShutdown := make(chan struct{}) +// doneShutdown := make(chan struct{}) +// go func() { +// close(startShutdown) +// assert.NoError(t, be.Shutdown(context.Background())) +// close(doneShutdown) +// }() +// <-startShutdown +// close(blockMerge) +// <-doneShutdown + +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) +// } func TestBatchSenderWithTimeout(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() @@ -576,7 +531,6 @@ func TestBatchSenderWithTimeout(t *testing.T) { tCfg := NewDefaultTimeoutConfig() tCfg.Timeout = 50 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg), WithTimeout(tCfg)) require.NoError(t, err) @@ -614,51 +568,50 @@ func TestBatchSenderWithTimeout(t *testing.T) { assert.EqualValues(t, 12, sink.itemsCount.Load()) } -func TestBatchSenderTimerResetNoConflict(t *testing.T) { - delayBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { - time.Sleep(30 * time.Millisecond) - if r1 == nil { - return r2, nil - } - fr1 := r1.(*fakeRequest) - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: fr1.items + fr2.items, - sink: fr1.sink, - exportErr: fr2.exportErr, - delay: fr1.delay + fr2.delay, - }, nil - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 50 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(bCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - - // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - time.Sleep(30 * time.Millisecond) - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, int64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 200*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) -} +// func TestBatchSenderTimerResetNoConflict(t *testing.T) { +// delayBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { +// time.Sleep(30 * time.Millisecond) +// if r1 == nil { +// return r2, nil +// } +// fr1 := r1.(*fakeRequest) +// fr2 := r2.(*fakeRequest) +// if fr2.mergeErr != nil { +// return nil, fr2.mergeErr +// } +// return &fakeRequest{ +// items: fr1.items + fr2.items, +// sink: fr1.sink, +// exportErr: fr2.exportErr, +// delay: fr1.delay + fr2.delay, +// }, nil +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 50 * time.Millisecond +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg)) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// time.Sleep(30 * time.Millisecond) +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, int64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 200*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) +// } func TestBatchSenderTimerFlush(t *testing.T) { if runtime.GOOS == "windows" { @@ -668,7 +621,6 @@ func TestBatchSenderTimerFlush(t *testing.T) { bCfg.MinSizeItems = 8 bCfg.FlushTimeout = 100 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -711,65 +663,3 @@ func queueBatchExporter(t *testing.T, opts ...Option) *BaseExporter { require.NoError(t, err) return be } - -func fakeBatchMergeFunc(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { - if r1 == nil { - return r2, nil - } - fr1 := r1.(*fakeRequest) - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: fr1.items + fr2.items, - sink: fr1.sink, - exportErr: fr2.exportErr, - delay: fr1.delay + fr2.delay, - }, nil -} - -func fakeBatchMergeSplitFunc(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, r1 internal.Request, r2 internal.Request) ([]internal.Request, error) { - maxItems := cfg.MaxSizeItems - if maxItems == 0 { - r, err := fakeBatchMergeFunc(ctx, r1, r2) - return []internal.Request{r}, err - } - - if r2.(*fakeRequest).mergeErr != nil { - return nil, r2.(*fakeRequest).mergeErr - } - - fr2 := r2.(*fakeRequest) - fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay} - var res []internal.Request - - // fill fr1 to maxItems if it's not nil - if r1 != nil { - fr1 := r1.(*fakeRequest) - fr1 = &fakeRequest{items: fr1.items, sink: fr1.sink, exportErr: fr1.exportErr, delay: fr1.delay} - if fr2.items <= maxItems-fr1.items { - fr1.items += fr2.items - if fr2.exportErr != nil { - fr1.exportErr = fr2.exportErr - } - return []internal.Request{fr1}, nil - } - // if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases - fr2.items -= maxItems - fr1.items - fr1.items = maxItems - res = append(res, fr1) - } - - // split fr2 to maxItems - for { - if fr2.items <= maxItems { - res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) - break - } - res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) - fr2.items -= maxItems - } - - return res, nil -} diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 0ae94fcf45e..7d129175720 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -55,6 +56,75 @@ func (r *fakeRequest) ItemsCount() int { return r.items } +func (r *fakeRequest) Merge(_ context.Context, + r2 internal.BatchRequest) (internal.BatchRequest, error) { + if r == nil { + return r2, nil + } + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{ + items: r.items + fr2.items, + sink: r.sink, + exportErr: fr2.exportErr, + delay: r.delay + fr2.delay, + }, nil +} + +func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, + r2 internal.BatchRequest) ([]internal.BatchRequest, error) { + if r.mergeErr != nil { + return nil, r.mergeErr + } + + maxItems := cfg.MaxSizeItems + if maxItems == 0 { + r, err := r.Merge(ctx, r2) + return []internal.BatchRequest{r}, err + } + + var fr2 *fakeRequest + if r2 == nil { + fr2 = &fakeRequest{sink: r.sink, exportErr: r.exportErr, delay: r.delay} + } else { + if r2.(*fakeRequest).mergeErr != nil { + return nil, r2.(*fakeRequest).mergeErr + } + fr2 = r2.(*fakeRequest) + fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay} + } + var res []internal.BatchRequest + + // fill fr1 to maxItems if it's not nil + + r = &fakeRequest{items: r.items, sink: r.sink, exportErr: r.exportErr, delay: r.delay} + if fr2.items <= maxItems-r.items { + r.items += fr2.items + if fr2.exportErr != nil { + r.exportErr = fr2.exportErr + } + return []internal.BatchRequest{r}, nil + } + // if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases + fr2.items -= maxItems - r.items + r.items = maxItems + res = append(res, r) + + // split fr2 to maxItems + for { + if fr2.items <= maxItems { + res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + break + } + res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + fr2.items -= maxItems + } + + return res, nil +} + type FakeRequestConverter struct { MetricsError error TracesError error diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 4f5b977b2e5..772a5673e24 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -86,7 +86,6 @@ func NewLogs( } logsOpts := []Option{ internal.WithMarshaler(logsRequestMarshaler), internal.WithUnmarshaler(newLogsRequestUnmarshalerFunc(pusher)), - internal.WithBatchFuncs(mergeLogs, mergeSplitLogs), } return NewLogsRequest(ctx, set, requestFromLogs(pusher), append(logsOpts, options...)...) } diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 296538bc0e0..cd1ed33c243 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -12,24 +12,23 @@ import ( ) // mergeLogs merges two logs requests into one. -func mergeLogs(_ context.Context, r1 Request, r2 Request) (Request, error) { - lr1, ok1 := r1.(*logsRequest) +func (req *logsRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { lr2, ok2 := r2.(*logsRequest) - if !ok1 || !ok2 { + if !ok2 { return nil, errors.New("invalid input type") } - lr2.ld.ResourceLogs().MoveAndAppendTo(lr1.ld.ResourceLogs()) - return lr1, nil + lr2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) + return req, nil } // mergeSplitLogs splits and/or merges the logs into multiple requests based on the MaxSizeConfig. -func mergeSplitLogs(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { +func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { var ( - res []Request + res []BatchRequest destReq *logsRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []Request{r1, r2} { + for _, req := range []BatchRequest{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index f5e10b5bcc9..64a8b49a83a 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/testdata" ) @@ -18,7 +19,7 @@ import ( func TestMergeLogs(t *testing.T) { lr1 := &logsRequest{ld: testdata.GenerateLogs(2)} lr2 := &logsRequest{ld: testdata.GenerateLogs(3)} - res, err := mergeLogs(context.Background(), lr1, lr2) + res, err := lr1.Merge(context.Background(), lr2) require.NoError(t, err) assert.Equal(t, 5, res.(*logsRequest).ld.LogRecordCount()) } @@ -26,7 +27,7 @@ func TestMergeLogs(t *testing.T) { func TestMergeLogsInvalidInput(t *testing.T) { lr1 := &tracesRequest{td: testdata.GenerateTraces(2)} lr2 := &logsRequest{ld: testdata.GenerateLogs(3)} - _, err := mergeLogs(context.Background(), lr1, lr2) + _, err := lr1.Merge(context.Background(), lr2) assert.Error(t, err) } @@ -34,8 +35,8 @@ func TestMergeSplitLogs(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - lr1 Request - lr2 Request + lr1 internal.BatchRequest + lr2 internal.BatchRequest expected []*logsRequest }{ { @@ -45,13 +46,6 @@ func TestMergeSplitLogs(t *testing.T) { lr2: &logsRequest{ld: plog.NewLogs()}, expected: []*logsRequest{{ld: plog.NewLogs()}}, }, - { - name: "both_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - lr1: nil, - lr2: nil, - expected: []*logsRequest{}, - }, { name: "first_request_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, @@ -60,17 +54,10 @@ func TestMergeSplitLogs(t *testing.T) { expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}}, }, { - name: "first_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - lr1: nil, - lr2: &logsRequest{ld: testdata.GenerateLogs(5)}, - expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}}, - }, - { - name: "first_nil_second_empty", + name: "first_empty_second_nil", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - lr1: nil, - lr2: &logsRequest{ld: plog.NewLogs()}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: nil, expected: []*logsRequest{{ld: plog.NewLogs()}}, }, { @@ -87,7 +74,7 @@ func TestMergeSplitLogs(t *testing.T) { { name: "split_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, - lr1: nil, + lr1: &logsRequest{ld: plog.NewLogs()}, lr2: &logsRequest{ld: testdata.GenerateLogs(10)}, expected: []*logsRequest{ {ld: testdata.GenerateLogs(4)}, @@ -132,7 +119,7 @@ func TestMergeSplitLogs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := mergeSplitLogs(context.Background(), tt.cfg, tt.lr1, tt.lr2) + res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2) require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i, r := range res { @@ -146,7 +133,7 @@ func TestMergeSplitLogs(t *testing.T) { func TestMergeSplitLogsInvalidInput(t *testing.T) { r1 := &tracesRequest{td: testdata.GenerateTraces(2)} r2 := &logsRequest{ld: testdata.GenerateLogs(3)} - _, err := mergeSplitLogs(context.Background(), exporterbatcher.MaxSizeConfig{}, r1, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r2) assert.Error(t, err) } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 64557029ce7..b2da8895f98 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -86,7 +86,6 @@ func NewMetrics( } metricsOpts := []Option{ internal.WithMarshaler(metricsRequestMarshaler), internal.WithUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher)), - internal.WithBatchFuncs(mergeMetrics, mergeSplitMetrics), } return NewMetricsRequest(ctx, set, requestFromMetrics(pusher), append(metricsOpts, options...)...) } diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go index 1a6448c8496..6331467b46b 100644 --- a/exporter/exporterhelper/metrics_batch.go +++ b/exporter/exporterhelper/metrics_batch.go @@ -12,24 +12,23 @@ import ( ) // mergeMetrics merges two metrics requests into one. -func mergeMetrics(_ context.Context, r1 Request, r2 Request) (Request, error) { - mr1, ok1 := r1.(*metricsRequest) +func (req *metricsRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { mr2, ok2 := r2.(*metricsRequest) - if !ok1 || !ok2 { + if !ok2 { return nil, errors.New("invalid input type") } - mr2.md.ResourceMetrics().MoveAndAppendTo(mr1.md.ResourceMetrics()) - return mr1, nil + mr2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics()) + return req, nil } // mergeSplitMetrics splits and/or merges the metrics into multiple requests based on the MaxSizeConfig. -func mergeSplitMetrics(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { +func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { var ( - res []Request + res []BatchRequest destReq *metricsRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []Request{r1, r2} { + for _, req := range []BatchRequest{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go index 860a1eee9c3..6edfdfe59c9 100644 --- a/exporter/exporterhelper/metrics_batch_test.go +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -18,7 +18,7 @@ import ( func TestMergeMetrics(t *testing.T) { mr1 := &metricsRequest{md: testdata.GenerateMetrics(2)} mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - res, err := mergeMetrics(context.Background(), mr1, mr2) + res, err := mr1.Merge(context.Background(), mr2) require.NoError(t, err) assert.Equal(t, 5, res.(*metricsRequest).md.MetricCount()) } @@ -26,7 +26,7 @@ func TestMergeMetrics(t *testing.T) { func TestMergeMetricsInvalidInput(t *testing.T) { mr1 := &tracesRequest{td: testdata.GenerateTraces(2)} mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - _, err := mergeMetrics(context.Background(), mr1, mr2) + _, err := mr1.Merge(context.Background(), mr2) assert.Error(t, err) } @@ -34,8 +34,8 @@ func TestMergeSplitMetrics(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - mr1 Request - mr2 Request + mr1 BatchRequest + mr2 BatchRequest expected []*metricsRequest }{ { @@ -45,13 +45,6 @@ func TestMergeSplitMetrics(t *testing.T) { mr2: &metricsRequest{md: pmetric.NewMetrics()}, expected: []*metricsRequest{{md: pmetric.NewMetrics()}}, }, - { - name: "both_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - mr1: nil, - mr2: nil, - expected: []*metricsRequest{}, - }, { name: "first_request_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, @@ -60,17 +53,10 @@ func TestMergeSplitMetrics(t *testing.T) { expected: []*metricsRequest{{md: testdata.GenerateMetrics(5)}}, }, { - name: "first_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - mr1: nil, - mr2: &metricsRequest{md: testdata.GenerateMetrics(5)}, - expected: []*metricsRequest{{md: testdata.GenerateMetrics(5)}}, - }, - { - name: "first_nil_second_empty", + name: "first_empty_second_nil", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - mr1: nil, - mr2: &metricsRequest{md: pmetric.NewMetrics()}, + mr1: &metricsRequest{md: pmetric.NewMetrics()}, + mr2: nil, expected: []*metricsRequest{{md: pmetric.NewMetrics()}}, }, { @@ -87,7 +73,7 @@ func TestMergeSplitMetrics(t *testing.T) { { name: "split_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 14}, - mr1: nil, + mr1: &metricsRequest{md: pmetric.NewMetrics()}, mr2: &metricsRequest{md: testdata.GenerateMetrics(15)}, // 15 metrics, 30 data points expected: []*metricsRequest{ {md: testdata.GenerateMetrics(7)}, // 7 metrics, 14 data points @@ -133,7 +119,7 @@ func TestMergeSplitMetrics(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := mergeSplitMetrics(context.Background(), tt.cfg, tt.mr1, tt.mr2) + res, err := tt.mr1.MergeSplit(context.Background(), tt.cfg, tt.mr2) require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { @@ -146,7 +132,7 @@ func TestMergeSplitMetrics(t *testing.T) { func TestMergeSplitMetricsInvalidInput(t *testing.T) { r1 := &tracesRequest{td: testdata.GenerateTraces(2)} r2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - _, err := mergeSplitMetrics(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r1, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) assert.Error(t, err) } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 407af781feb..7d7bedbd289 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -86,7 +86,6 @@ func NewTraces( } tracesOpts := []Option{ internal.WithMarshaler(tracesRequestMarshaler), internal.WithUnmarshaler(newTraceRequestUnmarshalerFunc(pusher)), - internal.WithBatchFuncs(mergeTraces, mergeSplitTraces), } return NewTracesRequest(ctx, set, requestFromTraces(pusher), append(tracesOpts, options...)...) } diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go index 1bdada95b7b..c2743e12356 100644 --- a/exporter/exporterhelper/traces_batch.go +++ b/exporter/exporterhelper/traces_batch.go @@ -12,24 +12,23 @@ import ( ) // mergeTraces merges two traces requests into one. -func mergeTraces(_ context.Context, r1 Request, r2 Request) (Request, error) { - tr1, ok1 := r1.(*tracesRequest) +func (req *tracesRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { tr2, ok2 := r2.(*tracesRequest) - if !ok1 || !ok2 { + if !ok2 { return nil, errors.New("invalid input type") } - tr2.td.ResourceSpans().MoveAndAppendTo(tr1.td.ResourceSpans()) - return tr1, nil + tr2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans()) + return req, nil } // mergeSplitTraces splits and/or merges the traces into multiple requests based on the MaxSizeConfig. -func mergeSplitTraces(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { +func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { var ( - res []Request + res []BatchRequest destReq *tracesRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []Request{r1, r2} { + for _, req := range []BatchRequest{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go index d88591b3091..1ff4434916c 100644 --- a/exporter/exporterhelper/traces_batch_test.go +++ b/exporter/exporterhelper/traces_batch_test.go @@ -18,7 +18,7 @@ import ( func TestMergeTraces(t *testing.T) { tr1 := &tracesRequest{td: testdata.GenerateTraces(2)} tr2 := &tracesRequest{td: testdata.GenerateTraces(3)} - res, err := mergeTraces(context.Background(), tr1, tr2) + res, err := tr1.Merge(context.Background(), tr2) require.NoError(t, err) assert.Equal(t, 5, res.(*tracesRequest).td.SpanCount()) } @@ -26,7 +26,7 @@ func TestMergeTraces(t *testing.T) { func TestMergeTracesInvalidInput(t *testing.T) { tr1 := &logsRequest{ld: testdata.GenerateLogs(2)} tr2 := &tracesRequest{td: testdata.GenerateTraces(3)} - _, err := mergeTraces(context.Background(), tr1, tr2) + _, err := tr1.Merge(context.Background(), tr2) assert.Error(t, err) } @@ -34,8 +34,8 @@ func TestMergeSplitTraces(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - tr1 Request - tr2 Request + tr1 BatchRequest + tr2 BatchRequest expected []*tracesRequest }{ { @@ -45,13 +45,6 @@ func TestMergeSplitTraces(t *testing.T) { tr2: &tracesRequest{td: ptrace.NewTraces()}, expected: []*tracesRequest{{td: ptrace.NewTraces()}}, }, - { - name: "both_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - tr1: nil, - tr2: nil, - expected: []*tracesRequest{}, - }, { name: "first_request_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, @@ -67,10 +60,10 @@ func TestMergeSplitTraces(t *testing.T) { expected: []*tracesRequest{{td: testdata.GenerateTraces(5)}}, }, { - name: "first_nil_second_empty", + name: "first_empty_second_nil", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - tr1: nil, - tr2: &tracesRequest{td: ptrace.NewTraces()}, + tr1: &tracesRequest{td: ptrace.NewTraces()}, + tr2: nil, expected: []*tracesRequest{{td: ptrace.NewTraces()}}, }, { @@ -87,7 +80,7 @@ func TestMergeSplitTraces(t *testing.T) { { name: "split_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, - tr1: nil, + tr1: &tracesRequest{td: ptrace.NewTraces()}, tr2: &tracesRequest{td: testdata.GenerateTraces(10)}, expected: []*tracesRequest{ {td: testdata.GenerateTraces(4)}, @@ -133,7 +126,7 @@ func TestMergeSplitTraces(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := mergeSplitTraces(context.Background(), tt.cfg, tt.tr1, tt.tr2) + res, err := tt.tr1.MergeSplit(context.Background(), tt.cfg, tt.tr2) require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { @@ -146,7 +139,7 @@ func TestMergeSplitTraces(t *testing.T) { func TestMergeSplitTracesInvalidInput(t *testing.T) { r1 := &tracesRequest{td: testdata.GenerateTraces(2)} r2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - _, err := mergeSplitTraces(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r1, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) assert.Error(t, err) } diff --git a/exporter/internal/request.go b/exporter/internal/request.go index 1b82e23504d..3666e1552b1 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -5,6 +5,8 @@ package internal // import "go.opentelemetry.io/collector/exporter/internal" import ( "context" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" ) // Request represents a single request that can be sent to an external endpoint. @@ -19,6 +21,16 @@ type Request interface { ItemsCount() int } +// BatchRequest represents a single request that can be sent to an external endpoint. It can be merged with +// another BatchRequest and/or split into multiple given a size limit. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchRequest interface { + Request + Merge(context.Context, BatchRequest) (BatchRequest, error) + MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, BatchRequest) ([]BatchRequest, error) +} + // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial // temporary failures. For example, if some items failed to process and can be retried, this interface allows to // return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. From 0f5e73bf5c547505b7ab67b0c8e0b83b3d60deed Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 17 Oct 2024 02:38:47 -0700 Subject: [PATCH 2/8] Add chlog --- .../merge-function-as-requet-method.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 .chloggen/merge-function-as-requet-method.yaml diff --git a/.chloggen/merge-function-as-requet-method.yaml b/.chloggen/merge-function-as-requet-method.yaml new file mode 100644 index 00000000000..5eae7238d8d --- /dev/null +++ b/.chloggen/merge-function-as-requet-method.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Changed mergeFunc and mergeSplitFunc to be part of batch request. + +# One or more tracking issues or pull requests related to the change +issues: [] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + mergeFunc and mergeSplitFunc used to be part of the configuration pass to the exporter. Now it is changed + | to be a method function of request. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] From 9546974b031256d0ce50094b795d23670686aef2 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 17 Oct 2024 14:38:38 -0700 Subject: [PATCH 3/8] fix change log --- .chloggen/merge-function-as-requet-method.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/merge-function-as-requet-method.yaml b/.chloggen/merge-function-as-requet-method.yaml index 5eae7238d8d..04aa3ad6fe2 100644 --- a/.chloggen/merge-function-as-requet-method.yaml +++ b/.chloggen/merge-function-as-requet-method.yaml @@ -10,7 +10,7 @@ component: exporter note: Changed mergeFunc and mergeSplitFunc to be part of batch request. # One or more tracking issues or pull requests related to the change -issues: [] +issues: [10368] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. From 8d289758b56013f39a811821f8ec4571884b65ed Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Thu, 17 Oct 2024 17:23:05 -0700 Subject: [PATCH 4/8] Fix test --- .../exporterhelperprofiles/profiles.go | 1 - .../exporterhelperprofiles/profiles_batch.go | 23 +++++----- .../profiles_batch_test.go | 46 +++++++++---------- 3 files changed, 32 insertions(+), 38 deletions(-) diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles.go b/exporter/exporterhelper/exporterhelperprofiles/profiles.go index 89410f22892..069613dbed3 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles.go @@ -88,7 +88,6 @@ func NewProfilesExporter( } profilesOpts := []exporterhelper.Option{ internal.WithMarshaler(profilesRequestMarshaler), internal.WithUnmarshaler(newProfileRequestUnmarshalerFunc(pusher)), - internal.WithBatchFuncs(mergeProfiles, mergeSplitProfiles), } return NewProfilesRequestExporter(ctx, set, requestFromProfiles(pusher), append(profilesOpts, options...)...) } diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go index 0db7d879e20..fb5c1dd4937 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go @@ -12,29 +12,28 @@ import ( "go.opentelemetry.io/collector/pdata/pprofile" ) -// mergeProfiles merges two profiles requests into one. -func mergeProfiles(_ context.Context, r1 exporterhelper.Request, r2 exporterhelper.Request) (exporterhelper.Request, error) { - tr1, ok1 := r1.(*profilesRequest) +// Merge merges two profiles requests into one. +func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.BatchRequest) (exporterhelper.BatchRequest, error) { tr2, ok2 := r2.(*profilesRequest) - if !ok1 || !ok2 { + if !ok2 { return nil, errors.New("invalid input type") } - tr2.pd.ResourceProfiles().MoveAndAppendTo(tr1.pd.ResourceProfiles()) - return tr1, nil + tr2.pd.ResourceProfiles().MoveAndAppendTo(req.pd.ResourceProfiles()) + return req, nil } -// mergeSplitProfiles splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. -func mergeSplitProfiles(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 exporterhelper.Request, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { +// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. +func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.BatchRequest) ([]exporterhelper.BatchRequest, error) { var ( - res []exporterhelper.Request + res []exporterhelper.BatchRequest destReq *profilesRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []exporterhelper.Request{r1, r2} { - if req == nil { + for _, r := range []exporterhelper.Request{req, r2} { + if r == nil { continue } - srcReq, ok := req.(*profilesRequest) + srcReq, ok := r.(*profilesRequest) if !ok { return nil, errors.New("invalid input type") } diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go index 0272d8126b1..702c414fe3d 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go @@ -23,7 +23,7 @@ import ( func TestMergeProfiles(t *testing.T) { pr1 := &profilesRequest{pd: testdata.GenerateProfiles(2)} pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} - res, err := mergeProfiles(context.Background(), pr1, pr2) + res, err := pr1.Merge(context.Background(), pr2) require.NoError(t, err) fmt.Fprintf(os.Stdout, "%#v\n", res.(*profilesRequest).pd) assert.Equal(t, 5, res.(*profilesRequest).pd.SampleCount()) @@ -32,7 +32,7 @@ func TestMergeProfiles(t *testing.T) { func TestMergeProfilesInvalidInput(t *testing.T) { pr1 := &tracesRequest{td: testdata.GenerateTraces(2)} pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} - _, err := mergeProfiles(context.Background(), pr1, pr2) + _, err := pr2.Merge(context.Background(), pr1) assert.Error(t, err) } @@ -40,8 +40,8 @@ func TestMergeSplitProfiles(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - pr1 exporterhelper.Request - pr2 exporterhelper.Request + pr1 exporterhelper.BatchRequest + pr2 exporterhelper.BatchRequest expected []*profilesRequest }{ { @@ -51,13 +51,6 @@ func TestMergeSplitProfiles(t *testing.T) { pr2: &profilesRequest{pd: pprofile.NewProfiles()}, expected: []*profilesRequest{{pd: pprofile.NewProfiles()}}, }, - { - name: "both_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - pr1: nil, - pr2: nil, - expected: []*profilesRequest{}, - }, { name: "first_request_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, @@ -66,17 +59,10 @@ func TestMergeSplitProfiles(t *testing.T) { expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}}, }, { - name: "first_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - pr1: nil, - pr2: &profilesRequest{pd: testdata.GenerateProfiles(5)}, - expected: []*profilesRequest{{pd: testdata.GenerateProfiles(5)}}, - }, - { - name: "first_nil_second_empty", + name: "first_empty_second_nil", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - pr1: nil, - pr2: &profilesRequest{pd: pprofile.NewProfiles()}, + pr1: &profilesRequest{pd: pprofile.NewProfiles()}, + pr2: nil, expected: []*profilesRequest{{pd: pprofile.NewProfiles()}}, }, { @@ -93,8 +79,8 @@ func TestMergeSplitProfiles(t *testing.T) { { name: "split_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, - pr1: nil, - pr2: &profilesRequest{pd: testdata.GenerateProfiles(10)}, + pr1: &profilesRequest{pd: testdata.GenerateProfiles(10)}, + pr2: nil, expected: []*profilesRequest{ {pd: testdata.GenerateProfiles(4)}, {pd: testdata.GenerateProfiles(4)}, @@ -133,7 +119,7 @@ func TestMergeSplitProfiles(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := mergeSplitProfiles(context.Background(), tt.cfg, tt.pr1, tt.pr2) + res, err := tt.pr1.MergeSplit(context.Background(), tt.cfg, tt.pr2) require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i, r := range res { @@ -147,7 +133,7 @@ func TestMergeSplitProfiles(t *testing.T) { func TestMergeSplitProfilesInvalidInput(t *testing.T) { r1 := &tracesRequest{td: testdata.GenerateTraces(2)} r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} - _, err := mergeSplitProfiles(context.Background(), exporterbatcher.MaxSizeConfig{}, r1, r2) + _, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1) assert.Error(t, err) } @@ -172,3 +158,13 @@ func (req *tracesRequest) Export(ctx context.Context) error { func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } + +func (req *tracesRequest) Merge(_ context.Context, _ exporterhelper.BatchRequest) (exporterhelper.BatchRequest, error) { + return nil, nil +} + +// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. +func (req *tracesRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.BatchRequest) ( + []exporterhelper.BatchRequest, error) { + return nil, nil +} From c74b52df22a65669a84726e90402f24c8fe90454 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Fri, 18 Oct 2024 14:49:46 -0700 Subject: [PATCH 5/8] Edits according to feedback --- .../profiles_batch_test.go | 24 ++++++++----------- .../exporterhelper/internal/batch_sender.go | 10 ++++++-- exporter/internal/request.go | 13 ++++++++++ 3 files changed, 31 insertions(+), 16 deletions(-) diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go index 702c414fe3d..e1ea65ee876 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go @@ -12,11 +12,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pprofile" - "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" ) @@ -30,7 +28,7 @@ func TestMergeProfiles(t *testing.T) { } func TestMergeProfilesInvalidInput(t *testing.T) { - pr1 := &tracesRequest{td: testdata.GenerateTraces(2)} + pr1 := &dummyRequest{} pr2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} _, err := pr2.Merge(context.Background(), pr1) assert.Error(t, err) @@ -131,7 +129,7 @@ func TestMergeSplitProfiles(t *testing.T) { } func TestMergeSplitProfilesInvalidInput(t *testing.T) { - r1 := &tracesRequest{td: testdata.GenerateTraces(2)} + r1 := &dummyRequest{} r2 := &profilesRequest{pd: testdata.GenerateProfiles(3)} _, err := r2.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r1) assert.Error(t, err) @@ -146,25 +144,23 @@ func TestExtractProfiles(t *testing.T) { } } -type tracesRequest struct { - td ptrace.Traces - pusher consumer.ConsumeTracesFunc +// dummyRequest implements BatchRequest. It is for checking that merging two request types would fail +type dummyRequest struct { } -func (req *tracesRequest) Export(ctx context.Context) error { - return req.pusher(ctx, req.td) +func (req *dummyRequest) Export(_ context.Context) error { + return nil } -func (req *tracesRequest) ItemsCount() int { - return req.td.SpanCount() +func (req *dummyRequest) ItemsCount() int { + return 1 } -func (req *tracesRequest) Merge(_ context.Context, _ exporterhelper.BatchRequest) (exporterhelper.BatchRequest, error) { +func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.BatchRequest) (exporterhelper.BatchRequest, error) { return nil, nil } -// MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. -func (req *tracesRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.BatchRequest) ( +func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.BatchRequest) ( []exporterhelper.BatchRequest, error) { return nil, nil } diff --git a/exporter/exporterhelper/internal/batch_sender.go b/exporter/exporterhelper/internal/batch_sender.go index 68f9da4a3ec..9be8933b70f 100644 --- a/exporter/exporterhelper/internal/batch_sender.go +++ b/exporter/exporterhelper/internal/batch_sender.go @@ -5,6 +5,7 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" + "errors" "sync" "sync/atomic" "time" @@ -141,10 +142,15 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error { return bs.NextSender.Send(ctx, req) } + batchReq, ok := req.(internal.BatchRequest) + if !ok { + return errors.New("Incoming request does not implement BatchRequest interface.") + } + if bs.cfg.MaxSizeItems > 0 { - return bs.sendMergeSplitBatch(ctx, req.(internal.BatchRequest)) + return bs.sendMergeSplitBatch(ctx, batchReq) } - return bs.sendMergeBatch(ctx, req.(internal.BatchRequest)) + return bs.sendMergeBatch(ctx, batchReq) } // sendMergeSplitBatch sends the request to the batch which may be split into multiple requests. diff --git a/exporter/internal/request.go b/exporter/internal/request.go index 3666e1552b1..a8dd0357e0d 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -27,7 +27,20 @@ type Request interface { // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type BatchRequest interface { Request + // Merge() is a function that merges this request with another one into a single request. + // Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is + // marked as not mutable. + // Experimental: This API is at the early stage of development and may change without backward compatibility + // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. Merge(context.Context, BatchRequest) (BatchRequest, error) + // MergeSplit() is a function that merge and/or splits this request with another one into multiple requests based on the + // configured limit provided in MaxSizeConfig. + // All the returned requests MUST have a number of items that does not exceed the maximum number of items. + // Size of the last returned request MUST be less or equal than the size of any other returned request. + // The original request MUST not be mutated if error is returned after mutation or if the exporter is + // marked as not mutable. The length of the returned slice MUST not be 0. + // Experimental: This API is at the early stage of development and may change without backward compatibility + // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, BatchRequest) ([]BatchRequest, error) } From 2b2c528eb3d9f4db9fc9d0feeca559f6b0add331 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Sun, 20 Oct 2024 18:14:27 -0700 Subject: [PATCH 6/8] Made merge() and mergesplit() required method --- exporter/exporterhelper/exporterhelper.go | 1 - .../exporterhelperprofiles/profiles_batch.go | 6 +++--- .../profiles_batch_test.go | 12 +++++------ .../exporterhelper/internal/batch_sender.go | 20 +++++++------------ exporter/exporterhelper/internal/request.go | 10 +++++----- .../internal/retry_sender_test.go | 17 ++++++++++++++++ exporter/exporterhelper/logs_batch.go | 8 ++++---- exporter/exporterhelper/logs_batch_test.go | 4 ++-- exporter/exporterhelper/metrics_batch.go | 8 ++++---- exporter/exporterhelper/metrics_batch_test.go | 4 ++-- exporter/exporterhelper/traces_batch.go | 8 ++++---- exporter/exporterhelper/traces_batch_test.go | 4 ++-- exporter/internal/request.go | 12 ++--------- 13 files changed, 58 insertions(+), 56 deletions(-) diff --git a/exporter/exporterhelper/exporterhelper.go b/exporter/exporterhelper/exporterhelper.go index 488e26a00b6..d9e90d821d9 100644 --- a/exporter/exporterhelper/exporterhelper.go +++ b/exporter/exporterhelper/exporterhelper.go @@ -8,7 +8,6 @@ import "go.opentelemetry.io/collector/exporter/internal" // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type Request = internal.Request -type BatchRequest = internal.BatchRequest // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial // temporary failures. For example, if some items failed to process and can be retried, this interface allows to diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go index fb5c1dd4937..fc725666438 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch.go @@ -13,7 +13,7 @@ import ( ) // Merge merges two profiles requests into one. -func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.BatchRequest) (exporterhelper.BatchRequest, error) { +func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.Request) (exporterhelper.Request, error) { tr2, ok2 := r2.(*profilesRequest) if !ok2 { return nil, errors.New("invalid input type") @@ -23,9 +23,9 @@ func (req *profilesRequest) Merge(_ context.Context, r2 exporterhelper.BatchRequ } // MergeSplit splits and/or merges the profiles into multiple requests based on the MaxSizeConfig. -func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.BatchRequest) ([]exporterhelper.BatchRequest, error) { +func (req *profilesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { var ( - res []exporterhelper.BatchRequest + res []exporterhelper.Request destReq *profilesRequest capacityLeft = cfg.MaxSizeItems ) diff --git a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go index e1ea65ee876..9674e2d3fd4 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go +++ b/exporter/exporterhelper/exporterhelperprofiles/profiles_batch_test.go @@ -38,8 +38,8 @@ func TestMergeSplitProfiles(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - pr1 exporterhelper.BatchRequest - pr2 exporterhelper.BatchRequest + pr1 exporterhelper.Request + pr2 exporterhelper.Request expected []*profilesRequest }{ { @@ -144,7 +144,7 @@ func TestExtractProfiles(t *testing.T) { } } -// dummyRequest implements BatchRequest. It is for checking that merging two request types would fail +// dummyRequest implements Request. It is for checking that merging two request types would fail type dummyRequest struct { } @@ -156,11 +156,11 @@ func (req *dummyRequest) ItemsCount() int { return 1 } -func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.BatchRequest) (exporterhelper.BatchRequest, error) { +func (req *dummyRequest) Merge(_ context.Context, _ exporterhelper.Request) (exporterhelper.Request, error) { return nil, nil } -func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.BatchRequest) ( - []exporterhelper.BatchRequest, error) { +func (req *dummyRequest) MergeSplit(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ exporterhelper.Request) ( + []exporterhelper.Request, error) { return nil, nil } diff --git a/exporter/exporterhelper/internal/batch_sender.go b/exporter/exporterhelper/internal/batch_sender.go index 9be8933b70f..21eed2c91d8 100644 --- a/exporter/exporterhelper/internal/batch_sender.go +++ b/exporter/exporterhelper/internal/batch_sender.go @@ -5,7 +5,6 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" - "errors" "sync" "sync/atomic" "time" @@ -100,7 +99,7 @@ func (bs *BatchSender) Start(_ context.Context, _ component.Host) error { type batch struct { ctx context.Context - request internal.BatchRequest + request internal.Request done chan struct{} err error @@ -142,22 +141,17 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error { return bs.NextSender.Send(ctx, req) } - batchReq, ok := req.(internal.BatchRequest) - if !ok { - return errors.New("Incoming request does not implement BatchRequest interface.") - } - if bs.cfg.MaxSizeItems > 0 { - return bs.sendMergeSplitBatch(ctx, batchReq) + return bs.sendMergeSplitBatch(ctx, req) } - return bs.sendMergeBatch(ctx, batchReq) + return bs.sendMergeBatch(ctx, req) } // sendMergeSplitBatch sends the request to the batch which may be split into multiple requests. -func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.BatchRequest) error { +func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error { bs.mu.Lock() - var reqs []internal.BatchRequest + var reqs []internal.Request var mergeSplitErr error if bs.activeBatch.request == nil { reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil) @@ -204,7 +198,7 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Bat } // sendMergeBatch sends the request to the batch and waits for the batch to be exported. -func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.BatchRequest) error { +func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) error { bs.mu.Lock() if bs.activeBatch.request != nil { @@ -232,7 +226,7 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.BatchReq // The context is only set once and is not updated after the first call. // Merging the context would be complex and require an additional goroutine to handle the context cancellation. // We take the approach of using the context from the first request since it's likely to have the shortest timeout. -func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.BatchRequest) { +func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.Request) { if bs.activeBatch.request == nil { bs.activeBatch.ctx = ctx } diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 7d129175720..7f71d7e94ea 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -57,7 +57,7 @@ func (r *fakeRequest) ItemsCount() int { } func (r *fakeRequest) Merge(_ context.Context, - r2 internal.BatchRequest) (internal.BatchRequest, error) { + r2 internal.Request) (internal.Request, error) { if r == nil { return r2, nil } @@ -74,7 +74,7 @@ func (r *fakeRequest) Merge(_ context.Context, } func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, - r2 internal.BatchRequest) ([]internal.BatchRequest, error) { + r2 internal.Request) ([]internal.Request, error) { if r.mergeErr != nil { return nil, r.mergeErr } @@ -82,7 +82,7 @@ func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSiz maxItems := cfg.MaxSizeItems if maxItems == 0 { r, err := r.Merge(ctx, r2) - return []internal.BatchRequest{r}, err + return []internal.Request{r}, err } var fr2 *fakeRequest @@ -95,7 +95,7 @@ func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSiz fr2 = r2.(*fakeRequest) fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay} } - var res []internal.BatchRequest + var res []internal.Request // fill fr1 to maxItems if it's not nil @@ -105,7 +105,7 @@ func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSiz if fr2.exportErr != nil { r.exportErr = fr2.exportErr } - return []internal.BatchRequest{r}, nil + return []internal.Request{r}, nil } // if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases fr2.items -= maxItems - r.items diff --git a/exporter/exporterhelper/internal/retry_sender_test.go b/exporter/exporterhelper/internal/retry_sender_test.go index bdb80d326c4..9ebf4b1f5ad 100644 --- a/exporter/exporterhelper/internal/retry_sender_test.go +++ b/exporter/exporterhelper/internal/retry_sender_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/exporter/internal" @@ -348,6 +349,14 @@ func (mer *mockErrorRequest) ItemsCount() int { return 7 } +func (mer *mockErrorRequest) Merge(context.Context, internal.Request) (internal.Request, error) { + return nil, nil +} + +func (mer *mockErrorRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) { + return nil, nil +} + func newErrorRequest() internal.Request { return &mockErrorRequest{} } @@ -390,6 +399,14 @@ func (m *mockRequest) ItemsCount() int { return m.cnt } +func (m *mockRequest) Merge(context.Context, internal.Request) (internal.Request, error) { + return nil, nil +} + +func (m *mockRequest) MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, internal.Request) ([]internal.Request, error) { + return nil, nil +} + func newMockRequest(cnt int, consumeError error) *mockRequest { return &mockRequest{ cnt: cnt, diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index cd1ed33c243..cb296fda95f 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -12,7 +12,7 @@ import ( ) // mergeLogs merges two logs requests into one. -func (req *logsRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { +func (req *logsRequest) Merge(_ context.Context, r2 Request) (Request, error) { lr2, ok2 := r2.(*logsRequest) if !ok2 { return nil, errors.New("invalid input type") @@ -22,13 +22,13 @@ func (req *logsRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, } // mergeSplitLogs splits and/or merges the logs into multiple requests based on the MaxSizeConfig. -func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { +func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { var ( - res []BatchRequest + res []Request destReq *logsRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []BatchRequest{req, r2} { + for _, req := range []Request{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index 64a8b49a83a..92a18bd864b 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -35,8 +35,8 @@ func TestMergeSplitLogs(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - lr1 internal.BatchRequest - lr2 internal.BatchRequest + lr1 internal.Request + lr2 internal.Request expected []*logsRequest }{ { diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go index 6331467b46b..6721563eac4 100644 --- a/exporter/exporterhelper/metrics_batch.go +++ b/exporter/exporterhelper/metrics_batch.go @@ -12,7 +12,7 @@ import ( ) // mergeMetrics merges two metrics requests into one. -func (req *metricsRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { +func (req *metricsRequest) Merge(_ context.Context, r2 Request) (Request, error) { mr2, ok2 := r2.(*metricsRequest) if !ok2 { return nil, errors.New("invalid input type") @@ -22,13 +22,13 @@ func (req *metricsRequest) Merge(_ context.Context, r2 BatchRequest) (BatchReque } // mergeSplitMetrics splits and/or merges the metrics into multiple requests based on the MaxSizeConfig. -func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { +func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { var ( - res []BatchRequest + res []Request destReq *metricsRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []BatchRequest{req, r2} { + for _, req := range []Request{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go index 6edfdfe59c9..854cc59db3a 100644 --- a/exporter/exporterhelper/metrics_batch_test.go +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -34,8 +34,8 @@ func TestMergeSplitMetrics(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - mr1 BatchRequest - mr2 BatchRequest + mr1 Request + mr2 Request expected []*metricsRequest }{ { diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go index c2743e12356..ec8a3954610 100644 --- a/exporter/exporterhelper/traces_batch.go +++ b/exporter/exporterhelper/traces_batch.go @@ -12,7 +12,7 @@ import ( ) // mergeTraces merges two traces requests into one. -func (req *tracesRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { +func (req *tracesRequest) Merge(_ context.Context, r2 Request) (Request, error) { tr2, ok2 := r2.(*tracesRequest) if !ok2 { return nil, errors.New("invalid input type") @@ -22,13 +22,13 @@ func (req *tracesRequest) Merge(_ context.Context, r2 BatchRequest) (BatchReques } // mergeSplitTraces splits and/or merges the traces into multiple requests based on the MaxSizeConfig. -func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { +func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) { var ( - res []BatchRequest + res []Request destReq *tracesRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []BatchRequest{req, r2} { + for _, req := range []Request{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go index 1ff4434916c..ca83c5cfb91 100644 --- a/exporter/exporterhelper/traces_batch_test.go +++ b/exporter/exporterhelper/traces_batch_test.go @@ -34,8 +34,8 @@ func TestMergeSplitTraces(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - tr1 BatchRequest - tr2 BatchRequest + tr1 Request + tr2 Request expected []*tracesRequest }{ { diff --git a/exporter/internal/request.go b/exporter/internal/request.go index a8dd0357e0d..ed6ee39af1c 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -19,20 +19,12 @@ type Request interface { // sent. For example, for OTLP exporter, this value represents the number of spans, // metric data points or log records. ItemsCount() int -} - -// BatchRequest represents a single request that can be sent to an external endpoint. It can be merged with -// another BatchRequest and/or split into multiple given a size limit. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type BatchRequest interface { - Request // Merge() is a function that merges this request with another one into a single request. // Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is // marked as not mutable. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. - Merge(context.Context, BatchRequest) (BatchRequest, error) + Merge(context.Context, Request) (Request, error) // MergeSplit() is a function that merge and/or splits this request with another one into multiple requests based on the // configured limit provided in MaxSizeConfig. // All the returned requests MUST have a number of items that does not exceed the maximum number of items. @@ -41,7 +33,7 @@ type BatchRequest interface { // marked as not mutable. The length of the returned slice MUST not be 0. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. - MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, BatchRequest) ([]BatchRequest, error) + MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, Request) ([]Request, error) } // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial From a13023740b6df5a5649a630316315b8bf0b73000 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Sun, 20 Oct 2024 18:15:45 -0700 Subject: [PATCH 7/8] Updated change log --- .chloggen/merge-function-as-requet-method.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/merge-function-as-requet-method.yaml b/.chloggen/merge-function-as-requet-method.yaml index 04aa3ad6fe2..fae049f3f02 100644 --- a/.chloggen/merge-function-as-requet-method.yaml +++ b/.chloggen/merge-function-as-requet-method.yaml @@ -7,7 +7,7 @@ change_type: breaking component: exporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Changed mergeFunc and mergeSplitFunc to be part of batch request. +note: Made mergeFunc and mergeSplitFunc required method of exporter.Request # One or more tracking issues or pull requests related to the change issues: [10368] From bd6422799b2203f22cf12e83a4311cc931da4315 Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Mon, 21 Oct 2024 12:57:00 -0700 Subject: [PATCH 8/8] Mod tidied --- exporter/exporterhelper/exporterhelperprofiles/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/exporterhelper/exporterhelperprofiles/go.mod b/exporter/exporterhelper/exporterhelperprofiles/go.mod index 74b8902be9e..8d3cd30d83d 100644 --- a/exporter/exporterhelper/exporterhelperprofiles/go.mod +++ b/exporter/exporterhelper/exporterhelperprofiles/go.mod @@ -14,7 +14,6 @@ require ( go.opentelemetry.io/collector/exporter v0.111.0 go.opentelemetry.io/collector/exporter/exporterprofiles v0.111.0 go.opentelemetry.io/collector/exporter/exportertest v0.111.0 - go.opentelemetry.io/collector/pdata v1.17.0 go.opentelemetry.io/collector/pdata/pprofile v0.111.0 go.opentelemetry.io/collector/pdata/testdata v0.111.0 go.opentelemetry.io/collector/pipeline/pipelineprofiles v0.0.0-20241021162523-3193106bf4b1 @@ -38,6 +37,7 @@ require ( go.opentelemetry.io/collector/config/configtelemetry v0.111.0 // indirect go.opentelemetry.io/collector/extension v0.111.0 // indirect go.opentelemetry.io/collector/extension/experimental/storage v0.111.0 // indirect + go.opentelemetry.io/collector/pdata v1.17.0 // indirect go.opentelemetry.io/collector/pipeline v0.111.0 // indirect go.opentelemetry.io/collector/receiver v0.111.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.111.0 // indirect