diff --git a/collector/cmd/otelarrowcol/go.mod b/collector/cmd/otelarrowcol/go.mod index b061fbb0..e958cae0 100644 --- a/collector/cmd/otelarrowcol/go.mod +++ b/collector/cmd/otelarrowcol/go.mod @@ -170,14 +170,3 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - - - - - - - - - - - diff --git a/collector/cmd/otelarrowcol/go.sum b/collector/cmd/otelarrowcol/go.sum index 782f103f..315a9a29 100644 --- a/collector/cmd/otelarrowcol/go.sum +++ b/collector/cmd/otelarrowcol/go.sum @@ -695,8 +695,6 @@ github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3 github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0 h1:gYEMyJHTSczSIRbkiVYQDH1ScQxyQKNgXJG3WarmtOE= -github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0/go.mod h1:pXv7/nt9MWXKio5S2deXbgq0q8JEKvm8IWJTLpNolqQ= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index ff27ae85..75d9cc6b 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.uber.org/multierr" "go.uber.org/zap" + "golang.org/x/sync/semaphore" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" @@ -45,6 +46,7 @@ type batchProcessor struct { timeout time.Duration sendBatchSize int sendBatchMaxSize int + maxInFlightBytes int // batchFunc is a factory for new batch objects corresponding // with the appropriate signal. @@ -91,6 +93,7 @@ type shard struct { // newItem is used to receive data items from producers. newItem chan dataItem + sem *semaphore.Weighted // batch is an in-flight data item containing one of the // underlying data types. batch batch @@ -115,13 +118,15 @@ type dataItem struct { type batch interface { // export the current batch export(ctx context.Context, req any) error - splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, bytes int, req any) + splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, req any) // itemCount returns the size of the current batch itemCount() int // add item to the current batch add(item any) + + sizeBytes(data any) int } // countedError is useful when a producer adds items that are split @@ -161,6 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func shutdownC: make(chan struct{}, 1), metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), + maxInFlightBytes: int(cfg.MaxInFlightBytes), } if len(bp.metadataKeys) == 0 { bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)} @@ -189,6 +195,7 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard { newItem: make(chan dataItem, runtime.NumCPU()), exportCtx: exportCtx, batch: bp.batchFunc(), + sem: semaphore.NewWeighted(int64(bp.maxInFlightBytes)), } b.processor.goroutines.Add(1) @@ -302,7 +309,8 @@ func (b *shard) resetTimer() { } func (b *shard) sendItems(trigger trigger) { - sent, bytes, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) + sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) + bytes := int64(b.batch.sizeBytes(req)) var waiters []chan error var countItems []int @@ -344,7 +352,7 @@ func (b *shard) sendItems(trigger trigger) { if err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) } else { - b.processor.telemetry.record(trigger, int64(sent), int64(bytes)) + b.processor.telemetry.record(trigger, int64(sent), bytes) } }() @@ -354,8 +362,6 @@ func (b *shard) sendItems(trigger trigger) { func (b *shard) consumeAndWait(ctx context.Context, data any) error { respCh := make(chan error, 1) - // TODO: add a semaphore to only write to channel if sizeof(data) keeps - // us below some configured inflight byte limit. item := dataItem{ data: data, responseCh: respCh, @@ -370,12 +376,42 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { item.count = telem.LogRecordCount() } + bytes := int64(b.batch.sizeBytes(data)) + err := b.sem.Acquire(ctx, bytes) + if err != nil { + return err + } + + // The purpose of this function is to ensure semaphore + // releases all previously acquired bytes + defer func() { + if item.count == 0 { + b.sem.Release(bytes) + return + } + + // context may have timed out before we received all + // responses. Start goroutine to wait and release + // all acquired bytes after the parent thread returns. + go func() { + for newErr := range respCh { + unwrap := newErr.(countedError) + + item.count -= unwrap.count + if item.count != 0 { + continue + } + break + } + b.sem.Release(bytes) + }() + }() + select { case <-ctx.Done(): return ctx.Err() case b.newItem <- item: } - var err error for { select { @@ -506,6 +542,12 @@ func newBatchLogsProcessor(set processor.CreateSettings, next consumer.Logs, cfg return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) }, useOtel) } +func recoverError(retErr *error) { + if r := recover(); r != nil { + *retErr = fmt.Errorf("%v", r) + } +} + type batchTraces struct { nextConsumer consumer.Traces traceData ptrace.Traces @@ -530,15 +572,19 @@ func (bt *batchTraces) add(item any) { td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) } -func (bt *batchTraces) export(ctx context.Context, req any) error { +func (bt *batchTraces) sizeBytes(data any) int { + return bt.sizer.TracesSize(data.(ptrace.Traces)) +} + +func (bt *batchTraces) export(ctx context.Context, req any) (retErr error) { + defer recoverError(&retErr) td := req.(ptrace.Traces) return bt.nextConsumer.ConsumeTraces(ctx, td) } -func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) { +func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) { var req ptrace.Traces var sent int - var bytes int if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize { req = splitTraces(sendBatchMaxSize, bt.traceData) bt.spanCount -= sendBatchMaxSize @@ -549,10 +595,7 @@ func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, ret bt.traceData = ptrace.NewTraces() bt.spanCount = 0 } - if returnBytes { - bytes = bt.sizer.TracesSize(req) - } - return sent, bytes, req + return sent, req } func (bt *batchTraces) itemCount() int { @@ -570,15 +613,19 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}} } -func (bm *batchMetrics) export(ctx context.Context, req any) error { +func (bm *batchMetrics) sizeBytes(data any) int { + return bm.sizer.MetricsSize(data.(pmetric.Metrics)) +} + +func (bm *batchMetrics) export(ctx context.Context, req any) (retErr error) { + defer recoverError(&retErr) md := req.(pmetric.Metrics) return bm.nextConsumer.ConsumeMetrics(ctx, md) } -func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) { +func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) { var req pmetric.Metrics var sent int - var bytes int if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize { req = splitMetrics(sendBatchMaxSize, bm.metricData) bm.dataPointCount -= sendBatchMaxSize @@ -590,10 +637,7 @@ func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, re bm.dataPointCount = 0 } - if returnBytes { - bytes = bm.sizer.MetricsSize(req) - } - return sent, bytes, req + return sent, req } func (bm *batchMetrics) itemCount() int { @@ -622,15 +666,19 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs { return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}} } -func (bl *batchLogs) export(ctx context.Context, req any) error { +func (bl *batchLogs) sizeBytes(data any) int { + return bl.sizer.LogsSize(data.(plog.Logs)) +} + +func (bl *batchLogs) export(ctx context.Context, req any) (retErr error) { + defer recoverError(&retErr) ld := req.(plog.Logs) return bl.nextConsumer.ConsumeLogs(ctx, ld) } -func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) { +func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) { var req plog.Logs var sent int - var bytes int if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize { req = splitLogs(sendBatchMaxSize, bl.logData) @@ -642,10 +690,7 @@ func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, retur bl.logData = plog.NewLogs() bl.logCount = 0 } - if returnBytes { - bytes = bl.sizer.LogsSize(req) - } - return sent, bytes, req + return sent, req } func (bl *batchLogs) itemCount() int { @@ -661,4 +706,4 @@ func (bl *batchLogs) add(item any) { } bl.logCount += newLogsCount ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs()) -} +} \ No newline at end of file diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 8ce40ffc..01ebdfcd 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -80,11 +80,250 @@ func TestProcessorLifecycle(t *testing.T) { } } +type panicConsumer struct { +} + +func (pc *panicConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + panic("testing panic") + return nil +} +func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + panic("testing panic") + return nil +} +func (pc *panicConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { + panic("testing panic") + return nil +} + +func (pc *panicConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func TestBatchProcessorSpansPanicRecover(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.Timeout = 10 * time.Second + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + bp, err := newBatchTracesProcessor(creationSet, &panicConsumer{}, cfg, true) + + require.NoError(t, err) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 10 + spansPerRequest := 100 + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + // ConsumeTraces is a blocking function and should be run in a go routine + // until batch size reached to unblock. + wg.Add(1) + go func() { + err = bp.ConsumeTraces(context.Background(), td) + assert.Contains(t, err.Error(), "testing panic") + wg.Done() + }() + } + + wg.Wait() + require.NoError(t, bp.Shutdown(context.Background())) +} + +func TestBatchProcessorMetricsPanicRecover(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.Timeout = 10 * time.Second + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true) + + require.NoError(t, err) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 10 + metricsPerRequest := 100 + sentResourceMetrics := pmetric.NewMetrics().ResourceMetrics() + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + md := testdata.GenerateMetrics(metricsPerRequest) + metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ { + metrics.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex)) + } + md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty()) + wg.Add(1) + go func() { + err = bp.ConsumeMetrics(context.Background(), md) + assert.Contains(t, err.Error(), "testing panic") + wg.Done() + }() + } + + wg.Wait() + require.NoError(t, bp.Shutdown(context.Background())) +} + +func TestBatchProcessorLogsPanicRecover(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.Timeout = 10 * time.Second + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + bp, err := newBatchLogsProcessor(creationSet, &panicConsumer{}, cfg, true) + + require.NoError(t, err) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 10 + logsPerRequest := 100 + sentResourceLogs := plog.NewLogs().ResourceLogs() + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + ld := testdata.GenerateLogs(logsPerRequest) + logs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() + for logIndex := 0; logIndex < logsPerRequest; logIndex++ { + logs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex)) + } + ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) + wg.Add(1) + go func() { + err = bp.ConsumeLogs(context.Background(), ld) + assert.Contains(t, err.Error(), "testing panic") + wg.Done() + }() + } + + wg.Wait() + require.NoError(t, bp.Shutdown(context.Background())) +} + +type blockingConsumer struct { + lock sync.Mutex + numItems int + blocking chan struct{} +} + +func (bc *blockingConsumer) getItemsWaiting() int { + bc.lock.Lock() + defer bc.lock.Unlock() + return bc.numItems +} + +func (bc *blockingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + bc.lock.Lock() + bc.numItems += td.SpanCount() + bc.lock.Unlock() + <-bc.blocking + return nil +} + +func (bc *blockingConsumer) unblock() { + bc.lock.Lock() + defer bc.lock.Unlock() + close(bc.blocking) +} + +func (bc *blockingConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// helper function to help determine a setting for cfg.MaxInFlightBytes based +// on the number of requests and number of spans per request. +func calculateMaxInFlightBytes(numRequests, spansPerRequest int) uint32 { + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(0, spanIndex)) + } + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + + szr := &ptrace.ProtoMarshaler{} + return uint32(szr.TracesSize(td) * numRequests) +} + +// This test is meant to confirm that semaphore is still +// released if the client context is canceled. +func TestBatchProcessorCancelContext(t *testing.T) { + requestCount := 10 + spansPerRequest := 250 + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.Timeout = 10 * time.Second + cfg.MaxInFlightBytes = calculateMaxInFlightBytes(requestCount, spansPerRequest) + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + bc := &blockingConsumer{blocking: make(chan struct{}, 1)} + bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true) + require.NoError(t, err) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + var wg sync.WaitGroup + ctx, cancel := context.WithCancel(context.Background()) + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + // ConsumeTraces is a blocking function and should be run in a go routine + // until batch size reached to unblock. + wg.Add(1) + go func() { + err = bp.ConsumeTraces(ctx, td) + assert.Contains(t, err.Error(), "context canceled") + wg.Done() + }() + } + + // check all spans arrived in blockingConsumer. + require.Eventually(t, func() bool { + numSpans := requestCount * spansPerRequest + return bc.getItemsWaiting() == numSpans + }, 5*time.Second, 10*time.Millisecond) + + // semaphore should be fully acquired at this point. + assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1))) + + wg.Add(1) + go func() { + td := testdata.GenerateTraces(spansPerRequest) + err = bp.ConsumeTraces(ctx, td) + assert.Contains(t, err.Error(), "context canceled") + wg.Done() + }() + + // cancel context and wait for ConsumeTraces to return. + cancel() + wg.Wait() + + // check sending another request does not change the semaphore count, even after ConsumeTraces returns. + assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1))) + + // signal to the blockingConsumer to return response to waiters. + bc.unblock() + + // Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes. + require.Eventually(t, func() bool { + return bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) + }, 5*time.Second, 10*time.Millisecond) + require.NoError(t, bp.Shutdown(context.Background())) +} + func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - cfg.Timeout = 5 * time.Second + cfg.Timeout = 10 * time.Second creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true) @@ -288,9 +527,9 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry, us require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.SpanCount()), - sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), + sendCount: float64(expectedBatchesNum), + sendSizeSum: float64(sink.SpanCount()), + sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), }) } @@ -343,8 +582,9 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, + Timeout: 3 * time.Second, + SendBatchSize: 1000, + MaxInFlightBytes: defaultMaxBytes, } sink := new(consumertest.TracesSink) @@ -377,8 +617,9 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 50, + Timeout: 200 * time.Millisecond, + SendBatchSize: 50, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 100 @@ -442,8 +683,9 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 2 * time.Second, - SendBatchSize: 50, + Timeout: 2 * time.Second, + SendBatchSize: 50, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 100 @@ -505,7 +747,7 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { batchMetrics.add(md) require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount) - sent, _, req := batchMetrics.splitBatch(ctx, sendBatchMaxSize, true) + sent, req := batchMetrics.splitBatch(ctx, sendBatchMaxSize, true) sendErr := batchMetrics.export(ctx, req) require.NoError(t, sendErr) require.Equal(t, sendBatchMaxSize, sent) @@ -515,8 +757,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { func TestBatchMetricsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 101, + Timeout: 100 * time.Millisecond, + SendBatchSize: 101, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 metricsPerRequest := 10 @@ -560,8 +803,9 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { func TestBatchMetricProcessor_Shutdown(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, + Timeout: 3 * time.Second, + SendBatchSize: 1000, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 metricsPerRequest := 10 @@ -652,8 +896,9 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) { func BenchmarkBatchMetricProcessor(b *testing.B) { b.StopTimer() cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 2000, + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, + MaxInFlightBytes: defaultMaxBytes, } runMetricsProcessorBenchmark(b, cfg) } @@ -661,9 +906,10 @@ func BenchmarkBatchMetricProcessor(b *testing.B) { func BenchmarkMultiBatchMetricProcessor(b *testing.B) { b.StopTimer() cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 2000, - MetadataKeys: []string{"test", "test2"}, + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, + MetadataKeys: []string{"test", "test2"}, + MaxInFlightBytes: defaultMaxBytes, } runMetricsProcessorBenchmark(b, cfg) } @@ -711,8 +957,9 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 50, + Timeout: 200 * time.Millisecond, + SendBatchSize: 50, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 100 @@ -776,8 +1023,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 2 * time.Second, - SendBatchSize: 50, + Timeout: 2 * time.Second, + SendBatchSize: 50, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 100 @@ -827,8 +1075,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo func TestBatchLogsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 100, + Timeout: 3 * time.Second, + SendBatchSize: 100, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 logsPerRequest := 10 @@ -872,8 +1121,9 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { func TestBatchLogProcessor_Shutdown(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, + Timeout: 3 * time.Second, + SendBatchSize: 1000, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 logsPerRequest := 10 @@ -1127,7 +1377,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { }) wg.Add(1) - go func() { + go func() { err := batcher.ConsumeTraces(ctx, td) assert.ErrorIs(t, err, errTooManyBatchers) wg.Done() @@ -1140,7 +1390,9 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { func TestBatchZeroConfig(t *testing.T) { // This is a no-op configuration. No need for a timer, no // minimum, no mxaimum, just a pass through. - cfg := Config{} + cfg := Config{ + MaxInFlightBytes: defaultMaxBytes, + } require.NoError(t, cfg.Validate()) @@ -1182,6 +1434,7 @@ func TestBatchSplitOnly(t *testing.T) { cfg := Config{ SendBatchMaxSize: maxBatch, + MaxInFlightBytes: defaultMaxBytes, } require.NoError(t, cfg.Validate()) diff --git a/collector/processor/concurrentbatchprocessor/config.go b/collector/processor/concurrentbatchprocessor/config.go index 8b36e629..91c60ff7 100644 --- a/collector/processor/concurrentbatchprocessor/config.go +++ b/collector/processor/concurrentbatchprocessor/config.go @@ -44,6 +44,10 @@ type Config struct { // batcher instances that will be created through a distinct // combination of MetadataKeys. MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` + + // MaxInFlightBytes limits the number of bytes in queue waiting to be + // processed by the senders. + MaxInFlightBytes uint32 `mapstructure:"max_in_flight_bytes"` } var _ component.Config = (*Config)(nil) diff --git a/collector/processor/concurrentbatchprocessor/config_test.go b/collector/processor/concurrentbatchprocessor/config_test.go index 5263fab3..eaed5e30 100644 --- a/collector/processor/concurrentbatchprocessor/config_test.go +++ b/collector/processor/concurrentbatchprocessor/config_test.go @@ -35,6 +35,7 @@ func TestUnmarshalConfig(t *testing.T) { SendBatchMaxSize: uint32(11000), Timeout: time.Second * 10, MetadataCardinalityLimit: 1000, + MaxInFlightBytes: 12345, }, cfg) } diff --git a/collector/processor/concurrentbatchprocessor/factory.go b/collector/processor/concurrentbatchprocessor/factory.go index 74b8b3ca..7fe7b255 100644 --- a/collector/processor/concurrentbatchprocessor/factory.go +++ b/collector/processor/concurrentbatchprocessor/factory.go @@ -18,6 +18,8 @@ const ( defaultSendBatchSize = uint32(8192) defaultTimeout = 200 * time.Millisecond + // default inflight bytes is 2 MiB + defaultMaxBytes = 2 * 1048576 // defaultMetadataCardinalityLimit should be set to the number // of metadata configurations the user expects to submit to @@ -43,6 +45,7 @@ func createDefaultConfig() component.Config { return &Config{ SendBatchSize: defaultSendBatchSize, Timeout: defaultTimeout, + MaxInFlightBytes: defaultMaxBytes, MetadataCardinalityLimit: defaultMetadataCardinalityLimit, } } diff --git a/collector/processor/concurrentbatchprocessor/testdata/config.yaml b/collector/processor/concurrentbatchprocessor/testdata/config.yaml index a7316c8c..ec151d47 100644 --- a/collector/processor/concurrentbatchprocessor/testdata/config.yaml +++ b/collector/processor/concurrentbatchprocessor/testdata/config.yaml @@ -1,3 +1,4 @@ timeout: 10s send_batch_size: 10000 -send_batch_max_size: 11000 \ No newline at end of file +send_batch_max_size: 11000 +max_in_flight_bytes: 12345 \ No newline at end of file