From 4e236cec0905106df2d291491e74db7500099560 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Thu, 9 Nov 2023 16:50:44 -0500 Subject: [PATCH 01/15] add config, semaphore, fix unit tests --- collector/cmd/otelarrowcol/go.mod | 15 ++++ collector/cmd/otelarrowcol/go.sum | 2 - .../examples/generator/hipster_shop.yaml | 10 +-- .../batch_processor.go | 77 +++++++++++++------ .../batch_processor_test.go | 20 ++++- .../concurrentbatchprocessor/config.go | 4 + .../concurrentbatchprocessor/config_test.go | 1 + .../concurrentbatchprocessor/factory.go | 3 + .../testdata/config.yaml | 3 +- 9 files changed, 102 insertions(+), 33 deletions(-) diff --git a/collector/cmd/otelarrowcol/go.mod b/collector/cmd/otelarrowcol/go.mod index b061fbb0..23477e62 100644 --- a/collector/cmd/otelarrowcol/go.mod +++ b/collector/cmd/otelarrowcol/go.mod @@ -4,7 +4,11 @@ module github.com/open-telemetry/otel-arrow/collector/cmd/otelarrowcol go 1.21 +<<<<<<< HEAD toolchain go1.21.3 +======= +toolchain go1.21.4 +>>>>>>> e367aca (add config, semaphore, fix unit tests) require ( github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0 @@ -171,6 +175,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) +<<<<<<< HEAD @@ -181,3 +186,13 @@ require ( +======= +replace github.com/open-telemetry/otel-arrow => ../../.. + +replace github.com/open-telemetry/otel-arrow/collector => ../../ + +// ambiguous import: found package cloud.google.com/go/compute/metadata in multiple modules +replace cloud.google.com/go => cloud.google.com/go v0.110.2 + +replace github.com/lightstep/telemetry-generator/generatorreceiver => ../../../../telemetry-generator/generatorreceiver/ +>>>>>>> e367aca (add config, semaphore, fix unit tests) diff --git a/collector/cmd/otelarrowcol/go.sum b/collector/cmd/otelarrowcol/go.sum index 782f103f..315a9a29 100644 --- a/collector/cmd/otelarrowcol/go.sum +++ b/collector/cmd/otelarrowcol/go.sum @@ -695,8 +695,6 @@ github.com/labstack/gommon v0.3.1/go.mod h1:uW6kP17uPlLJsD3ijUYn3/M5bAxtlZhMI6m3 github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0 h1:gYEMyJHTSczSIRbkiVYQDH1ScQxyQKNgXJG3WarmtOE= -github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0/go.mod h1:pXv7/nt9MWXKio5S2deXbgq0q8JEKvm8IWJTLpNolqQ= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/collector/examples/generator/hipster_shop.yaml b/collector/examples/generator/hipster_shop.yaml index a4872db6..ee106603 100644 --- a/collector/examples/generator/hipster_shop.yaml +++ b/collector/examples/generator/hipster_shop.yaml @@ -566,19 +566,19 @@ rootRoutes: tracesPerHour: 1400 - service: frontend route: /shipping - tracesPerHour: 480 + tracesPerHour: 4800 - service: frontend route: /currency - tracesPerHour: 200 + tracesPerHour: 2000 - service: frontend route: /checkout - tracesPerHour: 480 + tracesPerHour: 4800 - service: iOS route: /api/make-payment - tracesPerHour: 480 + tracesPerHour: 4800 - service: android route: /api/make-payment - tracesPerHour: 480 + tracesPerHour: 4800 config: kubernetes: diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index ff27ae85..9350cbb9 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.uber.org/multierr" "go.uber.org/zap" + "golang.org/x/sync/semaphore" "go.opentelemetry.io/collector/client" "go.opentelemetry.io/collector/component" @@ -45,6 +46,7 @@ type batchProcessor struct { timeout time.Duration sendBatchSize int sendBatchMaxSize int + maxInFlightBytes int // batchFunc is a factory for new batch objects corresponding // with the appropriate signal. @@ -91,6 +93,7 @@ type shard struct { // newItem is used to receive data items from producers. newItem chan dataItem + sem *semaphore.Weighted // batch is an in-flight data item containing one of the // underlying data types. batch batch @@ -115,13 +118,15 @@ type dataItem struct { type batch interface { // export the current batch export(ctx context.Context, req any) error - splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, bytes int, req any) + splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (sentBatchSize int, req any) // itemCount returns the size of the current batch itemCount() int // add item to the current batch add(item any) + + sizeBytes(data any) int } // countedError is useful when a producer adds items that are split @@ -161,6 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func shutdownC: make(chan struct{}, 1), metadataKeys: mks, metadataLimit: int(cfg.MetadataCardinalityLimit), + maxInFlightBytes: int(cfg.MaxInFlightBytes), } if len(bp.metadataKeys) == 0 { bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)} @@ -189,6 +195,7 @@ func (bp *batchProcessor) newShard(md map[string][]string) *shard { newItem: make(chan dataItem, runtime.NumCPU()), exportCtx: exportCtx, batch: bp.batchFunc(), + sem: semaphore.NewWeighted(int64(bp.maxInFlightBytes)), } b.processor.goroutines.Add(1) @@ -302,7 +309,8 @@ func (b *shard) resetTimer() { } func (b *shard) sendItems(trigger trigger) { - sent, bytes, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) + sent, req := b.batch.splitBatch(b.exportCtx, b.processor.sendBatchMaxSize, b.processor.telemetry.detailed) + bytes := int64(b.batch.sizeBytes(req)) var waiters []chan error var countItems []int @@ -344,7 +352,7 @@ func (b *shard) sendItems(trigger trigger) { if err != nil { b.processor.logger.Warn("Sender failed", zap.Error(err)) } else { - b.processor.telemetry.record(trigger, int64(sent), int64(bytes)) + b.processor.telemetry.record(trigger, int64(sent), bytes) } }() @@ -370,12 +378,37 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { item.count = telem.LogRecordCount() } + bytes := int64(b.batch.sizeBytes(data)) + err := b.sem.Acquire(b.exportCtx, bytes) + if err != nil { + return err + } + + defer func() { + if item.count == 0 { + b.sem.Release(bytes) + return + } + + go func() { + for newErr := range respCh { + unwrap := newErr.(countedError) + + item.count -= unwrap.count + if item.count != 0 { + continue + } + break + } + b.sem.Release(bytes) + }() + }() + select { case <-ctx.Done(): return ctx.Err() case b.newItem <- item: } - var err error for { select { @@ -530,15 +563,18 @@ func (bt *batchTraces) add(item any) { td.ResourceSpans().MoveAndAppendTo(bt.traceData.ResourceSpans()) } +func (bt *batchTraces) sizeBytes(data any) int { + return bt.sizer.TracesSize(data.(ptrace.Traces)) +} + func (bt *batchTraces) export(ctx context.Context, req any) error { td := req.(ptrace.Traces) return bt.nextConsumer.ConsumeTraces(ctx, td) } -func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) { +func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) { var req ptrace.Traces var sent int - var bytes int if sendBatchMaxSize > 0 && bt.itemCount() > sendBatchMaxSize { req = splitTraces(sendBatchMaxSize, bt.traceData) bt.spanCount -= sendBatchMaxSize @@ -549,10 +585,7 @@ func (bt *batchTraces) splitBatch(ctx context.Context, sendBatchMaxSize int, ret bt.traceData = ptrace.NewTraces() bt.spanCount = 0 } - if returnBytes { - bytes = bt.sizer.TracesSize(req) - } - return sent, bytes, req + return sent, req } func (bt *batchTraces) itemCount() int { @@ -570,15 +603,18 @@ func newBatchMetrics(nextConsumer consumer.Metrics) *batchMetrics { return &batchMetrics{nextConsumer: nextConsumer, metricData: pmetric.NewMetrics(), sizer: &pmetric.ProtoMarshaler{}} } +func (bm *batchMetrics) sizeBytes(data any) int { + return bm.sizer.MetricsSize(data.(pmetric.Metrics)) +} + func (bm *batchMetrics) export(ctx context.Context, req any) error { md := req.(pmetric.Metrics) return bm.nextConsumer.ConsumeMetrics(ctx, md) } -func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) { +func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) { var req pmetric.Metrics var sent int - var bytes int if sendBatchMaxSize > 0 && bm.dataPointCount > sendBatchMaxSize { req = splitMetrics(sendBatchMaxSize, bm.metricData) bm.dataPointCount -= sendBatchMaxSize @@ -590,10 +626,7 @@ func (bm *batchMetrics) splitBatch(ctx context.Context, sendBatchMaxSize int, re bm.dataPointCount = 0 } - if returnBytes { - bytes = bm.sizer.MetricsSize(req) - } - return sent, bytes, req + return sent, req } func (bm *batchMetrics) itemCount() int { @@ -622,15 +655,18 @@ func newBatchLogs(nextConsumer consumer.Logs) *batchLogs { return &batchLogs{nextConsumer: nextConsumer, logData: plog.NewLogs(), sizer: &plog.ProtoMarshaler{}} } +func (bl *batchLogs) sizeBytes(data any) int { + return bl.sizer.LogsSize(data.(plog.Logs)) +} + func (bl *batchLogs) export(ctx context.Context, req any) error { ld := req.(plog.Logs) return bl.nextConsumer.ConsumeLogs(ctx, ld) } -func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, int, any) { +func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, returnBytes bool) (int, any) { var req plog.Logs var sent int - var bytes int if sendBatchMaxSize > 0 && bl.logCount > sendBatchMaxSize { req = splitLogs(sendBatchMaxSize, bl.logData) @@ -642,10 +678,7 @@ func (bl *batchLogs) splitBatch(ctx context.Context, sendBatchMaxSize int, retur bl.logData = plog.NewLogs() bl.logCount = 0 } - if returnBytes { - bytes = bl.sizer.LogsSize(req) - } - return sent, bytes, req + return sent, req } func (bl *batchLogs) itemCount() int { diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 8ce40ffc..f944f99d 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -84,7 +84,7 @@ func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 - cfg.Timeout = 5 * time.Second + cfg.Timeout = 10 * time.Second creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed batcher, err := newBatchTracesProcessor(creationSet, sink, cfg, true) @@ -345,6 +345,7 @@ func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { cfg := Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, + MaxInFlightBytes: defaultMaxBytes, } sink := new(consumertest.TracesSink) @@ -379,6 +380,7 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { cfg := Config{ Timeout: 200 * time.Millisecond, SendBatchSize: 50, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 100 @@ -444,6 +446,7 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel cfg := Config{ Timeout: 2 * time.Second, SendBatchSize: 50, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 100 @@ -505,7 +508,7 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { batchMetrics.add(md) require.Equal(t, dataPointsPerMetric*metricsCount, batchMetrics.dataPointCount) - sent, _, req := batchMetrics.splitBatch(ctx, sendBatchMaxSize, true) + sent, req := batchMetrics.splitBatch(ctx, sendBatchMaxSize, true) sendErr := batchMetrics.export(ctx, req) require.NoError(t, sendErr) require.Equal(t, sendBatchMaxSize, sent) @@ -517,6 +520,7 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { cfg := Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 101, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 metricsPerRequest := 10 @@ -562,6 +566,7 @@ func TestBatchMetricProcessor_Shutdown(t *testing.T) { cfg := Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 metricsPerRequest := 10 @@ -654,6 +659,7 @@ func BenchmarkBatchMetricProcessor(b *testing.B) { cfg := Config{ Timeout: 100 * time.Millisecond, SendBatchSize: 2000, + MaxInFlightBytes: defaultMaxBytes, } runMetricsProcessorBenchmark(b, cfg) } @@ -664,6 +670,7 @@ func BenchmarkMultiBatchMetricProcessor(b *testing.B) { Timeout: 100 * time.Millisecond, SendBatchSize: 2000, MetadataKeys: []string{"test", "test2"}, + MaxInFlightBytes: defaultMaxBytes, } runMetricsProcessorBenchmark(b, cfg) } @@ -713,6 +720,7 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { cfg := Config{ Timeout: 200 * time.Millisecond, SendBatchSize: 50, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 100 @@ -778,6 +786,7 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo cfg := Config{ Timeout: 2 * time.Second, SendBatchSize: 50, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 100 @@ -829,6 +838,7 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { cfg := Config{ Timeout: 3 * time.Second, SendBatchSize: 100, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 logsPerRequest := 10 @@ -874,6 +884,7 @@ func TestBatchLogProcessor_Shutdown(t *testing.T) { cfg := Config{ Timeout: 3 * time.Second, SendBatchSize: 1000, + MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 logsPerRequest := 10 @@ -1140,7 +1151,9 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { func TestBatchZeroConfig(t *testing.T) { // This is a no-op configuration. No need for a timer, no // minimum, no mxaimum, just a pass through. - cfg := Config{} + cfg := Config{ + MaxInFlightBytes: defaultMaxBytes, + } require.NoError(t, cfg.Validate()) @@ -1182,6 +1195,7 @@ func TestBatchSplitOnly(t *testing.T) { cfg := Config{ SendBatchMaxSize: maxBatch, + MaxInFlightBytes: defaultMaxBytes, } require.NoError(t, cfg.Validate()) diff --git a/collector/processor/concurrentbatchprocessor/config.go b/collector/processor/concurrentbatchprocessor/config.go index 8b36e629..91c60ff7 100644 --- a/collector/processor/concurrentbatchprocessor/config.go +++ b/collector/processor/concurrentbatchprocessor/config.go @@ -44,6 +44,10 @@ type Config struct { // batcher instances that will be created through a distinct // combination of MetadataKeys. MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"` + + // MaxInFlightBytes limits the number of bytes in queue waiting to be + // processed by the senders. + MaxInFlightBytes uint32 `mapstructure:"max_in_flight_bytes"` } var _ component.Config = (*Config)(nil) diff --git a/collector/processor/concurrentbatchprocessor/config_test.go b/collector/processor/concurrentbatchprocessor/config_test.go index 5263fab3..eaed5e30 100644 --- a/collector/processor/concurrentbatchprocessor/config_test.go +++ b/collector/processor/concurrentbatchprocessor/config_test.go @@ -35,6 +35,7 @@ func TestUnmarshalConfig(t *testing.T) { SendBatchMaxSize: uint32(11000), Timeout: time.Second * 10, MetadataCardinalityLimit: 1000, + MaxInFlightBytes: 12345, }, cfg) } diff --git a/collector/processor/concurrentbatchprocessor/factory.go b/collector/processor/concurrentbatchprocessor/factory.go index 74b8b3ca..7fe7b255 100644 --- a/collector/processor/concurrentbatchprocessor/factory.go +++ b/collector/processor/concurrentbatchprocessor/factory.go @@ -18,6 +18,8 @@ const ( defaultSendBatchSize = uint32(8192) defaultTimeout = 200 * time.Millisecond + // default inflight bytes is 2 MiB + defaultMaxBytes = 2 * 1048576 // defaultMetadataCardinalityLimit should be set to the number // of metadata configurations the user expects to submit to @@ -43,6 +45,7 @@ func createDefaultConfig() component.Config { return &Config{ SendBatchSize: defaultSendBatchSize, Timeout: defaultTimeout, + MaxInFlightBytes: defaultMaxBytes, MetadataCardinalityLimit: defaultMetadataCardinalityLimit, } } diff --git a/collector/processor/concurrentbatchprocessor/testdata/config.yaml b/collector/processor/concurrentbatchprocessor/testdata/config.yaml index a7316c8c..ec151d47 100644 --- a/collector/processor/concurrentbatchprocessor/testdata/config.yaml +++ b/collector/processor/concurrentbatchprocessor/testdata/config.yaml @@ -1,3 +1,4 @@ timeout: 10s send_batch_size: 10000 -send_batch_max_size: 11000 \ No newline at end of file +send_batch_max_size: 11000 +max_in_flight_bytes: 12345 \ No newline at end of file From 890e8c8aac0ae431d50244d150c37e906e87b272 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 15 Nov 2023 13:20:29 -0500 Subject: [PATCH 02/15] delete comment --- .../processor/concurrentbatchprocessor/batch_processor.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 9350cbb9..77b0aa03 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -362,8 +362,6 @@ func (b *shard) sendItems(trigger trigger) { func (b *shard) consumeAndWait(ctx context.Context, data any) error { respCh := make(chan error, 1) - // TODO: add a semaphore to only write to channel if sizeof(data) keeps - // us below some configured inflight byte limit. item := dataItem{ data: data, responseCh: respCh, @@ -379,7 +377,7 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { } bytes := int64(b.batch.sizeBytes(data)) - err := b.sem.Acquire(b.exportCtx, bytes) + err := b.sem.Acquire(ctx, bytes) if err != nil { return err } From ebd7d0bb96bb886b293711a904f88c015bd24392 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 15 Nov 2023 13:27:14 -0500 Subject: [PATCH 03/15] fix rebase --- collector/cmd/otelarrowcol/go.mod | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/collector/cmd/otelarrowcol/go.mod b/collector/cmd/otelarrowcol/go.mod index 23477e62..1703961e 100644 --- a/collector/cmd/otelarrowcol/go.mod +++ b/collector/cmd/otelarrowcol/go.mod @@ -4,11 +4,7 @@ module github.com/open-telemetry/otel-arrow/collector/cmd/otelarrowcol go 1.21 -<<<<<<< HEAD toolchain go1.21.3 -======= -toolchain go1.21.4 ->>>>>>> e367aca (add config, semaphore, fix unit tests) require ( github.com/lightstep/telemetry-generator/generatorreceiver v0.15.0 @@ -175,18 +171,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect ) -<<<<<<< HEAD - - - - - - - - - - -======= replace github.com/open-telemetry/otel-arrow => ../../.. replace github.com/open-telemetry/otel-arrow/collector => ../../ @@ -195,4 +179,3 @@ replace github.com/open-telemetry/otel-arrow/collector => ../../ replace cloud.google.com/go => cloud.google.com/go v0.110.2 replace github.com/lightstep/telemetry-generator/generatorreceiver => ../../../../telemetry-generator/generatorreceiver/ ->>>>>>> e367aca (add config, semaphore, fix unit tests) From 677c45f94fac7c84e3827912316851e9e40de518 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 15 Nov 2023 17:10:37 -0500 Subject: [PATCH 04/15] batch_processor.go --- .../batch_processor_test.go | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index f944f99d..f8ad5f4a 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -80,6 +80,126 @@ func TestProcessorLifecycle(t *testing.T) { } } +type panicConsumer struct { +} + +func (pc *panicConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + panic("testing panic") + return nil +} +func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error { + panic("testing panic") + return nil +} + +func (pc *panicConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +func TestBatchProcessorSpansPanicRecover(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.Timeout = 10 * time.Second + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + bp, err := newBatchTracesProcessor(creationSet, &panicConsumer{}, cfg, true) + + require.NoError(t, err) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 10 + spansPerRequest := 100 + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + // ConsumeTraces is a blocking function and should be run in a go routine + // until batch size reached to unblock. + wg.Add(1) + go func() { + err = bp.ConsumeTraces(context.Background(), td) + assert.Contains(t, err.Error(), "testing panic") + wg.Done() + }() + } + + wg.Wait() + require.NoError(t, bp.Shutdown(context.Background())) +} + +func TestBatchProcessorMetricsPanicRecover(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.Timeout = 10 * time.Second + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true) + + require.NoError(t, err) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 10 + metricsPerRequest := 100 + sentResourceMetrics := pmetric.NewMetrics().ResourceMetrics() + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + md := testdata.GenerateMetrics(metricsPerRequest) + metrics := md.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + for metricIndex := 0; metricIndex < metricsPerRequest; metricIndex++ { + metrics.At(metricIndex).SetName(getTestMetricName(requestNum, metricIndex)) + } + md.ResourceMetrics().At(0).CopyTo(sentResourceMetrics.AppendEmpty()) + wg.Add(1) + go func() { + err = bp.ConsumeMetrics(context.Background(), md) + assert.Contains(t, err.Error(), "testing panic") + wg.Done() + }() + } + + wg.Wait() + require.NoError(t, bp.Shutdown(context.Background())) +} + +func TestBatchProcessorMetricsPanicRecover(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.Timeout = 10 * time.Second + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true) + + require.NoError(t, err) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 10 + logsPerRequest := 100 + sentResourceLogs := plog.NewLogs().ResourceLogs() + var wg sync.WaitGroup + for requestNum := 0; requestNum < requestCount; requestNum++ { + ld := testdata.GenerateLogs(logsPerRequest) + logs := ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords() + for logIndex := 0; logIndex < logsPerRequest; logIndex++ { + logs.At(logIndex).SetSeverityText(getTestLogSeverityText(requestNum, logIndex)) + } + ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) + wg.Add(1) + go func() { + err = batcher.ConsumeLogs(context.Background(), ld) + assert.Contains(t, err.Error(), "testing panic") + wg.Done() + }() + } + + wg.Wait() + require.NoError(t, bp.Shutdown(context.Background())) +} + func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) From 55c1ee7dc34cfb51061de18c9146938623fb1037 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Wed, 15 Nov 2023 17:37:26 -0500 Subject: [PATCH 05/15] fix tests --- collector/cmd/otelarrowcol/go.mod | 9 -------- .../batch_processor.go | 23 +++++++++++++++---- .../batch_processor_test.go | 14 +++++++---- 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/collector/cmd/otelarrowcol/go.mod b/collector/cmd/otelarrowcol/go.mod index 1703961e..e958cae0 100644 --- a/collector/cmd/otelarrowcol/go.mod +++ b/collector/cmd/otelarrowcol/go.mod @@ -170,12 +170,3 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) - -replace github.com/open-telemetry/otel-arrow => ../../.. - -replace github.com/open-telemetry/otel-arrow/collector => ../../ - -// ambiguous import: found package cloud.google.com/go/compute/metadata in multiple modules -replace cloud.google.com/go => cloud.google.com/go v0.110.2 - -replace github.com/lightstep/telemetry-generator/generatorreceiver => ../../../../telemetry-generator/generatorreceiver/ diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index 77b0aa03..e21a2a47 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -565,7 +565,12 @@ func (bt *batchTraces) sizeBytes(data any) int { return bt.sizer.TracesSize(data.(ptrace.Traces)) } -func (bt *batchTraces) export(ctx context.Context, req any) error { +func (bt *batchTraces) export(ctx context.Context, req any) (retErr error) { + defer func() { + if r := recover(); r != nil { + retErr = errors.New(r.(string)) + } + }() td := req.(ptrace.Traces) return bt.nextConsumer.ConsumeTraces(ctx, td) } @@ -605,7 +610,12 @@ func (bm *batchMetrics) sizeBytes(data any) int { return bm.sizer.MetricsSize(data.(pmetric.Metrics)) } -func (bm *batchMetrics) export(ctx context.Context, req any) error { +func (bm *batchMetrics) export(ctx context.Context, req any) (retErr error) { + defer func() { + if r := recover(); r != nil { + retErr = errors.New(r.(string)) + } + }() md := req.(pmetric.Metrics) return bm.nextConsumer.ConsumeMetrics(ctx, md) } @@ -657,7 +667,12 @@ func (bl *batchLogs) sizeBytes(data any) int { return bl.sizer.LogsSize(data.(plog.Logs)) } -func (bl *batchLogs) export(ctx context.Context, req any) error { +func (bl *batchLogs) export(ctx context.Context, req any) (retErr error) { + defer func() { + if r := recover(); r != nil { + retErr = errors.New(r.(string)) + } + }() ld := req.(plog.Logs) return bl.nextConsumer.ConsumeLogs(ctx, ld) } @@ -692,4 +707,4 @@ func (bl *batchLogs) add(item any) { } bl.logCount += newLogsCount ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs()) -} +} \ No newline at end of file diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index f8ad5f4a..625b3476 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -87,7 +87,11 @@ func (pc *panicConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) er panic("testing panic") return nil } -func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error { +func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + panic("testing panic") + return nil +} +func (pc *panicConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error { panic("testing panic") return nil } @@ -166,13 +170,13 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) { require.NoError(t, bp.Shutdown(context.Background())) } -func TestBatchProcessorMetricsPanicRecover(t *testing.T) { +func TestBatchProcessorLogsPanicRecover(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.Timeout = 10 * time.Second creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true) + bp, err := newBatchLogsProcessor(creationSet, &panicConsumer{}, cfg, true) require.NoError(t, err) require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) @@ -190,7 +194,7 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) { ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty()) wg.Add(1) go func() { - err = batcher.ConsumeLogs(context.Background(), ld) + err = bp.ConsumeLogs(context.Background(), ld) assert.Contains(t, err.Error(), "testing panic") wg.Done() }() @@ -1343,4 +1347,4 @@ func TestBatchSplitOnly(t *testing.T) { for _, ld := range receivedMds { require.Equal(t, maxBatch, ld.LogRecordCount()) } -} +} \ No newline at end of file From 92088d54b5b8956b316478e5b73d7740fb44b39f Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Thu, 16 Nov 2023 01:13:13 -0500 Subject: [PATCH 06/15] review feedback --- .../batch_processor.go | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor.go b/collector/processor/concurrentbatchprocessor/batch_processor.go index e21a2a47..75d9cc6b 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor.go @@ -382,12 +382,17 @@ func (b *shard) consumeAndWait(ctx context.Context, data any) error { return err } + // The purpose of this function is to ensure semaphore + // releases all previously acquired bytes defer func() { if item.count == 0 { b.sem.Release(bytes) return } + // context may have timed out before we received all + // responses. Start goroutine to wait and release + // all acquired bytes after the parent thread returns. go func() { for newErr := range respCh { unwrap := newErr.(countedError) @@ -537,6 +542,12 @@ func newBatchLogsProcessor(set processor.CreateSettings, next consumer.Logs, cfg return newBatchProcessor(set, cfg, func() batch { return newBatchLogs(next) }, useOtel) } +func recoverError(retErr *error) { + if r := recover(); r != nil { + *retErr = fmt.Errorf("%v", r) + } +} + type batchTraces struct { nextConsumer consumer.Traces traceData ptrace.Traces @@ -566,11 +577,7 @@ func (bt *batchTraces) sizeBytes(data any) int { } func (bt *batchTraces) export(ctx context.Context, req any) (retErr error) { - defer func() { - if r := recover(); r != nil { - retErr = errors.New(r.(string)) - } - }() + defer recoverError(&retErr) td := req.(ptrace.Traces) return bt.nextConsumer.ConsumeTraces(ctx, td) } @@ -611,11 +618,7 @@ func (bm *batchMetrics) sizeBytes(data any) int { } func (bm *batchMetrics) export(ctx context.Context, req any) (retErr error) { - defer func() { - if r := recover(); r != nil { - retErr = errors.New(r.(string)) - } - }() + defer recoverError(&retErr) md := req.(pmetric.Metrics) return bm.nextConsumer.ConsumeMetrics(ctx, md) } @@ -668,11 +671,7 @@ func (bl *batchLogs) sizeBytes(data any) int { } func (bl *batchLogs) export(ctx context.Context, req any) (retErr error) { - defer func() { - if r := recover(); r != nil { - retErr = errors.New(r.(string)) - } - }() + defer recoverError(&retErr) ld := req.(plog.Logs) return bl.nextConsumer.ConsumeLogs(ctx, ld) } From a7bc2c6d94bf67070ec15c0df91c926c2eb8f5c2 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Thu, 16 Nov 2023 01:20:23 -0500 Subject: [PATCH 07/15] revert yaml --- collector/examples/generator/hipster_shop.yaml | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/collector/examples/generator/hipster_shop.yaml b/collector/examples/generator/hipster_shop.yaml index ee106603..a4872db6 100644 --- a/collector/examples/generator/hipster_shop.yaml +++ b/collector/examples/generator/hipster_shop.yaml @@ -566,19 +566,19 @@ rootRoutes: tracesPerHour: 1400 - service: frontend route: /shipping - tracesPerHour: 4800 + tracesPerHour: 480 - service: frontend route: /currency - tracesPerHour: 2000 + tracesPerHour: 200 - service: frontend route: /checkout - tracesPerHour: 4800 + tracesPerHour: 480 - service: iOS route: /api/make-payment - tracesPerHour: 4800 + tracesPerHour: 480 - service: android route: /api/make-payment - tracesPerHour: 4800 + tracesPerHour: 480 config: kubernetes: From 3c6040340f901da0ed8e89c602d8eadf3f51554d Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Thu, 16 Nov 2023 17:31:06 -0500 Subject: [PATCH 08/15] add test for context cancellation --- .../batch_processor_test.go | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 625b3476..678d0222 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -8,12 +8,14 @@ import ( "errors" "fmt" "math" + "runtime" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/semaphore" "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata" "go.opentelemetry.io/collector/client" @@ -204,6 +206,106 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) { require.NoError(t, bp.Shutdown(context.Background())) } +func (b *shard) blockStart(done chan int) { + var items []dataItem + for { + select { + case item := <-b.newItem: + if item.data == nil { + continue + } + items = append(items, item) + case <-done: + for i := range items { + b.processItem(items[i]) + + } + return + } + } +} + +func newBlockingBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch, useOtel bool) (*batchProcessor, error, chan int) { + bp := &batchProcessor{ + logger: set.Logger, + + sendBatchSize: int(cfg.SendBatchSize), + sendBatchMaxSize: int(cfg.SendBatchMaxSize), + timeout: cfg.Timeout, + batchFunc: batchFunc, + shutdownC: make(chan struct{}, 1), + maxInFlightBytes: int(cfg.MaxInFlightBytes), + } + exportCtx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(nil), + }) + b := &shard{ + processor: bp, + newItem: make(chan dataItem, runtime.NumCPU()), + exportCtx: exportCtx, + batch: bp.batchFunc(), + sem: semaphore.NewWeighted(int64(bp.maxInFlightBytes)), + } + + b.processor.goroutines.Add(1) + done := make(chan int, 1) + go b.blockStart(done) + + bp.batcher = &singleShardBatcher{batcher: b} + + bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality, useOtel) + if err != nil { + return nil, fmt.Errorf("error creating batch processor telemetry: %w", err), done + } + bp.telemetry = bpt + + return bp, nil, done +} + +func TestBatchProcessorCancelContext(t *testing.T) { + sink := new(consumertest.TracesSink) + cfg := createDefaultConfig().(*Config) + cfg.SendBatchSize = 128 + cfg.Timeout = 10 * time.Second + creationSet := processortest.NewNopCreateSettings() + creationSet.MetricsLevel = configtelemetry.LevelDetailed + batcher, err, doneCh := newBlockingBatchProcessor(creationSet, cfg, func() batch { return newBatchTraces(sink) }, true) + require.NoError(t, err) + require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + + requestCount := 10 + spansPerRequest := 250 + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + var wg sync.WaitGroup + ctxTimeout, _ := context.WithTimeout(context.Background(), time.Second*1) + for requestNum := 0; requestNum < requestCount; requestNum++ { + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(requestNum, spanIndex)) + } + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) + // ConsumeTraces is a blocking function and should be run in a go routine + // until batch size reached to unblock. + wg.Add(1) + go func() { + assert.Error(t, batcher.ConsumeTraces(ctxTimeout, td)) + wg.Done() + }() + } + + wg.Wait() + + // signal to the sender to process and send records. + doneCh <- 1 + + + // Semaphore should be released once all requests are sent. Confirm we can acquire MaxInFlightBytes bytes. + require.Eventually(t, func() bool { + return batcher.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) + }, 5 * time.Second, 10 * time.Millisecond) +} + func TestBatchProcessorSpansDelivered(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) From 09f1fbbdd00227138e4f7fccb606c576ab3cba8b Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Thu, 16 Nov 2023 17:39:11 -0500 Subject: [PATCH 09/15] add comment --- .../concurrentbatchprocessor/batch_processor_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 678d0222..043b8ed2 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -262,6 +262,8 @@ func newBlockingBatchProcessor(set processor.CreateSettings, cfg *Config, batchF return bp, nil, done } +// This test is meant to confirm that semaphore is still +// released if the client context is canceled. func TestBatchProcessorCancelContext(t *testing.T) { sink := new(consumertest.TracesSink) cfg := createDefaultConfig().(*Config) @@ -289,11 +291,13 @@ func TestBatchProcessorCancelContext(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { - assert.Error(t, batcher.ConsumeTraces(ctxTimeout, td)) + err = batcher.ConsumeTraces(ctxTimeout, td) + assert.Contains(t, err.Error(), "context deadline exceeded") wg.Done() }() } + // wait until context deadline is exceeded. wg.Wait() // signal to the sender to process and send records. From ed5c906f87f53d862bd4b5aeacf9b6c8ee8b4c55 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Thu, 16 Nov 2023 18:37:35 -0500 Subject: [PATCH 10/15] change fn signature --- .../concurrentbatchprocessor/batch_processor_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 043b8ed2..a73c14a3 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -206,7 +206,7 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) { require.NoError(t, bp.Shutdown(context.Background())) } -func (b *shard) blockStart(done chan int) { +func blockStart(b *shard, done chan int) { var items []dataItem for { select { @@ -249,7 +249,7 @@ func newBlockingBatchProcessor(set processor.CreateSettings, cfg *Config, batchF b.processor.goroutines.Add(1) done := make(chan int, 1) - go b.blockStart(done) + go blockStart(b, done) bp.batcher = &singleShardBatcher{batcher: b} From b4ae2fa93cbc72d004a9a6975a04685125e8ac6d Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Fri, 17 Nov 2023 03:21:06 -0500 Subject: [PATCH 11/15] improve unit test add blockingConsumer --- .../batch_processor_test.go | 122 +++++++++--------- 1 file changed, 60 insertions(+), 62 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index a73c14a3..78f495be 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -8,14 +8,12 @@ import ( "errors" "fmt" "math" - "runtime" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/semaphore" "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/testdata" "go.opentelemetry.io/collector/client" @@ -206,80 +204,60 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) { require.NoError(t, bp.Shutdown(context.Background())) } -func blockStart(b *shard, done chan int) { - var items []dataItem - for { - select { - case item := <-b.newItem: - if item.data == nil { - continue - } - items = append(items, item) - case <-done: - for i := range items { - b.processItem(items[i]) - - } - return - } - } +type blockingConsumer struct { + numItems int + wg sync.WaitGroup } -func newBlockingBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func() batch, useOtel bool) (*batchProcessor, error, chan int) { - bp := &batchProcessor{ - logger: set.Logger, - - sendBatchSize: int(cfg.SendBatchSize), - sendBatchMaxSize: int(cfg.SendBatchMaxSize), - timeout: cfg.Timeout, - batchFunc: batchFunc, - shutdownC: make(chan struct{}, 1), - maxInFlightBytes: int(cfg.MaxInFlightBytes), - } - exportCtx := client.NewContext(context.Background(), client.Info{ - Metadata: client.NewMetadata(nil), - }) - b := &shard{ - processor: bp, - newItem: make(chan dataItem, runtime.NumCPU()), - exportCtx: exportCtx, - batch: bp.batchFunc(), - sem: semaphore.NewWeighted(int64(bp.maxInFlightBytes)), - } +func (bc *blockingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + bc.numItems += td.SpanCount() + bc.wg.Wait() + return nil +} - b.processor.goroutines.Add(1) - done := make(chan int, 1) - go blockStart(b, done) +func (bc *blockingConsumer) unblock() { + bc.wg.Done() +} - bp.batcher = &singleShardBatcher{batcher: b} +func (bc *blockingConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} - bpt, err := newBatchProcessorTelemetry(set, bp.batcher.currentMetadataCardinality, useOtel) - if err != nil { - return nil, fmt.Errorf("error creating batch processor telemetry: %w", err), done +// helper function to help determine a setting for cfg.MaxInFlightBytes based +// on the number of requests and number of spans per request. +func calculateMaxInFlightBytes(numRequests, spansPerRequest int) uint32 { + sentResourceSpans := ptrace.NewTraces().ResourceSpans() + td := testdata.GenerateTraces(spansPerRequest) + spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() + for spanIndex := 0; spanIndex < spansPerRequest; spanIndex++ { + spans.At(spanIndex).SetName(getTestSpanName(0, spanIndex)) } - bp.telemetry = bpt + td.ResourceSpans().At(0).CopyTo(sentResourceSpans.AppendEmpty()) - return bp, nil, done + szr := &ptrace.ProtoMarshaler{} + return uint32(szr.TracesSize(td) * numRequests) } // This test is meant to confirm that semaphore is still // released if the client context is canceled. func TestBatchProcessorCancelContext(t *testing.T) { - sink := new(consumertest.TracesSink) + requestCount := 10 + spansPerRequest := 250 cfg := createDefaultConfig().(*Config) cfg.SendBatchSize = 128 cfg.Timeout = 10 * time.Second + cfg.MaxInFlightBytes = calculateMaxInFlightBytes(requestCount, spansPerRequest) creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - batcher, err, doneCh := newBlockingBatchProcessor(creationSet, cfg, func() batch { return newBatchTraces(sink) }, true) + bc := &blockingConsumer{} + bc.wg.Add(1) + bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true) require.NoError(t, err) - require.NoError(t, batcher.Start(context.Background(), componenttest.NewNopHost())) + require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) - requestCount := 10 - spansPerRequest := 250 sentResourceSpans := ptrace.NewTraces().ResourceSpans() var wg sync.WaitGroup - ctxTimeout, _ := context.WithTimeout(context.Background(), time.Second*1) + ctx, cancel := context.WithCancel(context.Background()) for requestNum := 0; requestNum < requestCount; requestNum++ { td := testdata.GenerateTraces(spansPerRequest) spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -291,22 +269,42 @@ func TestBatchProcessorCancelContext(t *testing.T) { // until batch size reached to unblock. wg.Add(1) go func() { - err = batcher.ConsumeTraces(ctxTimeout, td) - assert.Contains(t, err.Error(), "context deadline exceeded") + err = bp.ConsumeTraces(ctx, td) + assert.Contains(t, err.Error(), "context canceled") wg.Done() }() } - // wait until context deadline is exceeded. + // check all spans arrived in blockingConsumer. + require.Eventually(t, func() bool { + numSpans := requestCount * spansPerRequest + return bc.numItems == numSpans + }, 5 * time.Second, 10 * time.Millisecond) + + // semaphore should be fully acquired at this point. + assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1))) + + wg.Add(1) + go func() { + td := testdata.GenerateTraces(spansPerRequest) + err = bp.ConsumeTraces(ctx, td) + assert.Contains(t, err.Error(), "context canceled") + wg.Done() + }() + + // cancel context and wait for ConsumeTraces to return. + cancel() wg.Wait() - // signal to the sender to process and send records. - doneCh <- 1 + // check sending another request does not change the semaphore count, even after ConsumeTraces returns. + assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1))) + // signal to the blockingConsumer to return response to waiters. + bc.unblock() - // Semaphore should be released once all requests are sent. Confirm we can acquire MaxInFlightBytes bytes. + // Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes. require.Eventually(t, func() bool { - return batcher.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) + return bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) }, 5 * time.Second, 10 * time.Millisecond) } From cd894e1ed82db2c5810343efd6b341185a3a09fc Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Fri, 17 Nov 2023 04:20:25 -0500 Subject: [PATCH 12/15] add shutdown to test --- .../processor/concurrentbatchprocessor/batch_processor_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 78f495be..49e097fd 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -306,6 +306,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { require.Eventually(t, func() bool { return bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) }, 5 * time.Second, 10 * time.Millisecond) + require.NoError(t, batcher.Shutdown(context.Background())) } func TestBatchProcessorSpansDelivered(t *testing.T) { From 5f5b8c4422b28887b3c108919b2554174d4c8342 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Fri, 17 Nov 2023 04:33:25 -0500 Subject: [PATCH 13/15] add shutdown to test --- .../processor/concurrentbatchprocessor/batch_processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 49e097fd..5b2f037c 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -306,7 +306,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { require.Eventually(t, func() bool { return bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) }, 5 * time.Second, 10 * time.Millisecond) - require.NoError(t, batcher.Shutdown(context.Background())) + require.NoError(t, bp.Shutdown(context.Background())) } func TestBatchProcessorSpansDelivered(t *testing.T) { From eed057fab15194dbb5d23a3f11ae484cb95d7053 Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Fri, 17 Nov 2023 04:50:12 -0500 Subject: [PATCH 14/15] synchronize, replace waitgroup with channel --- .../batch_processor_test.go | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 5b2f037c..4236bce4 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -205,18 +205,29 @@ func TestBatchProcessorLogsPanicRecover(t *testing.T) { } type blockingConsumer struct { + lock sync.Mutex numItems int - wg sync.WaitGroup + blocking chan struct{} +} + +func (bc *blockingConsumer) getItemsWaiting() int { + bc.lock.Lock() + defer bc.lock.Unlock() + return bc.numItems } func (bc *blockingConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { + bc.lock.Lock() bc.numItems += td.SpanCount() - bc.wg.Wait() + bc.lock.Unlock() + <-bc.blocking return nil } func (bc *blockingConsumer) unblock() { - bc.wg.Done() + bc.lock.Lock() + defer bc.lock.Unlock() + close(bc.blocking) } func (bc *blockingConsumer) Capabilities() consumer.Capabilities { @@ -249,8 +260,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { cfg.MaxInFlightBytes = calculateMaxInFlightBytes(requestCount, spansPerRequest) creationSet := processortest.NewNopCreateSettings() creationSet.MetricsLevel = configtelemetry.LevelDetailed - bc := &blockingConsumer{} - bc.wg.Add(1) + bc := &blockingConsumer{blocking: make(chan struct{}, 1)} bp, err := newBatchTracesProcessor(creationSet, bc, cfg, true) require.NoError(t, err) require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost())) @@ -278,7 +288,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { // check all spans arrived in blockingConsumer. require.Eventually(t, func() bool { numSpans := requestCount * spansPerRequest - return bc.numItems == numSpans + return bc.getItemsWaiting() == numSpans }, 5 * time.Second, 10 * time.Millisecond) // semaphore should be fully acquired at this point. From dbebc6e64237ba5de1951df1683258622bbd2a0a Mon Sep 17 00:00:00 2001 From: Mohamed Osman Date: Fri, 17 Nov 2023 04:50:39 -0500 Subject: [PATCH 15/15] gofmt --- .../batch_processor_test.go | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/collector/processor/concurrentbatchprocessor/batch_processor_test.go b/collector/processor/concurrentbatchprocessor/batch_processor_test.go index 4236bce4..01ebdfcd 100644 --- a/collector/processor/concurrentbatchprocessor/batch_processor_test.go +++ b/collector/processor/concurrentbatchprocessor/batch_processor_test.go @@ -289,7 +289,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { require.Eventually(t, func() bool { numSpans := requestCount * spansPerRequest return bc.getItemsWaiting() == numSpans - }, 5 * time.Second, 10 * time.Millisecond) + }, 5*time.Second, 10*time.Millisecond) // semaphore should be fully acquired at this point. assert.False(t, bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(1))) @@ -315,7 +315,7 @@ func TestBatchProcessorCancelContext(t *testing.T) { // Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes. require.Eventually(t, func() bool { return bp.batcher.(*singleShardBatcher).batcher.sem.TryAcquire(int64(cfg.MaxInFlightBytes)) - }, 5 * time.Second, 10 * time.Millisecond) + }, 5*time.Second, 10*time.Millisecond) require.NoError(t, bp.Shutdown(context.Background())) } @@ -527,9 +527,9 @@ func testBatchProcessorSentBySizeWithMaxSize(t *testing.T, tel testTelemetry, us require.EqualValues(t, expectedBatchesNum, len(receivedTraces)) tel.assertMetrics(t, expectedMetrics{ - sendCount: float64(expectedBatchesNum), - sendSizeSum: float64(sink.SpanCount()), - sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), + sendCount: float64(expectedBatchesNum), + sendSizeSum: float64(sink.SpanCount()), + sizeTrigger: math.Floor(float64(totalSpans) / float64(sendBatchMaxSize)), }) } @@ -582,8 +582,8 @@ func TestBatchProcessorSentByTimeout(t *testing.T) { func TestBatchProcessorTraceSendWhenClosing(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, + Timeout: 3 * time.Second, + SendBatchSize: 1000, MaxInFlightBytes: defaultMaxBytes, } sink := new(consumertest.TracesSink) @@ -617,8 +617,8 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 50, + Timeout: 200 * time.Millisecond, + SendBatchSize: 50, MaxInFlightBytes: defaultMaxBytes, } @@ -683,8 +683,8 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 2 * time.Second, - SendBatchSize: 50, + Timeout: 2 * time.Second, + SendBatchSize: 50, MaxInFlightBytes: defaultMaxBytes, } @@ -757,8 +757,8 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) { func TestBatchMetricsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 101, + Timeout: 100 * time.Millisecond, + SendBatchSize: 101, MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 @@ -803,8 +803,8 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) { func TestBatchMetricProcessor_Shutdown(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, + Timeout: 3 * time.Second, + SendBatchSize: 1000, MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 @@ -896,8 +896,8 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) { func BenchmarkBatchMetricProcessor(b *testing.B) { b.StopTimer() cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 2000, + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, MaxInFlightBytes: defaultMaxBytes, } runMetricsProcessorBenchmark(b, cfg) @@ -906,9 +906,9 @@ func BenchmarkBatchMetricProcessor(b *testing.B) { func BenchmarkMultiBatchMetricProcessor(b *testing.B) { b.StopTimer() cfg := Config{ - Timeout: 100 * time.Millisecond, - SendBatchSize: 2000, - MetadataKeys: []string{"test", "test2"}, + Timeout: 100 * time.Millisecond, + SendBatchSize: 2000, + MetadataKeys: []string{"test", "test2"}, MaxInFlightBytes: defaultMaxBytes, } runMetricsProcessorBenchmark(b, cfg) @@ -957,8 +957,8 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) { // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 200 * time.Millisecond, - SendBatchSize: 50, + Timeout: 200 * time.Millisecond, + SendBatchSize: 50, MaxInFlightBytes: defaultMaxBytes, } @@ -1023,8 +1023,8 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo // Instantiate the batch processor with low config values to test data // gets sent through the processor. cfg := Config{ - Timeout: 2 * time.Second, - SendBatchSize: 50, + Timeout: 2 * time.Second, + SendBatchSize: 50, MaxInFlightBytes: defaultMaxBytes, } @@ -1075,8 +1075,8 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo func TestBatchLogsProcessor_Timeout(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 100, + Timeout: 3 * time.Second, + SendBatchSize: 100, MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 @@ -1121,8 +1121,8 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) { func TestBatchLogProcessor_Shutdown(t *testing.T) { cfg := Config{ - Timeout: 3 * time.Second, - SendBatchSize: 1000, + Timeout: 3 * time.Second, + SendBatchSize: 1000, MaxInFlightBytes: defaultMaxBytes, } requestCount := 5 @@ -1377,7 +1377,7 @@ func TestBatchProcessorMetadataCardinalityLimit(t *testing.T) { }) wg.Add(1) - go func() { + go func() { err := batcher.ConsumeTraces(ctx, td) assert.ErrorIs(t, err, errTooManyBatchers) wg.Done() @@ -1462,4 +1462,4 @@ func TestBatchSplitOnly(t *testing.T) { for _, ld := range receivedMds { require.Equal(t, maxBatch, ld.LogRecordCount()) } -} \ No newline at end of file +}