Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(concurrentbatchprocessor): add in-flight byte limit to producer queue #93

Merged
9 changes: 0 additions & 9 deletions collector/cmd/otelarrowcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,3 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/otel-arrow => ../../..

replace github.com/open-telemetry/otel-arrow/collector => ../../

// ambiguous import: found package cloud.google.com/go/compute/metadata in multiple modules
replace cloud.google.com/go => cloud.google.com/go v0.110.2

replace github.com/lightstep/telemetry-generator/generatorreceiver => ../../../../telemetry-generator/generatorreceiver/
23 changes: 19 additions & 4 deletions collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,12 @@ func (bt *batchTraces) sizeBytes(data any) int {
return bt.sizer.TracesSize(data.(ptrace.Traces))
}

func (bt *batchTraces) export(ctx context.Context, req any) error {
func (bt *batchTraces) export(ctx context.Context, req any) (retErr error) {
defer func() {
if r := recover(); r != nil {
retErr = errors.New(r.(string))
}
}()
moh-osman3 marked this conversation as resolved.
Show resolved Hide resolved
td := req.(ptrace.Traces)
return bt.nextConsumer.ConsumeTraces(ctx, td)
}
Expand Down Expand Up @@ -605,7 +610,12 @@ func (bm *batchMetrics) sizeBytes(data any) int {
return bm.sizer.MetricsSize(data.(pmetric.Metrics))
}

func (bm *batchMetrics) export(ctx context.Context, req any) error {
func (bm *batchMetrics) export(ctx context.Context, req any) (retErr error) {
defer func() {
if r := recover(); r != nil {
retErr = errors.New(r.(string))
}
}()
md := req.(pmetric.Metrics)
return bm.nextConsumer.ConsumeMetrics(ctx, md)
}
Expand Down Expand Up @@ -657,7 +667,12 @@ func (bl *batchLogs) sizeBytes(data any) int {
return bl.sizer.LogsSize(data.(plog.Logs))
}

func (bl *batchLogs) export(ctx context.Context, req any) error {
func (bl *batchLogs) export(ctx context.Context, req any) (retErr error) {
defer func() {
if r := recover(); r != nil {
retErr = errors.New(r.(string))
}
}()
ld := req.(plog.Logs)
return bl.nextConsumer.ConsumeLogs(ctx, ld)
}
Expand Down Expand Up @@ -692,4 +707,4 @@ func (bl *batchLogs) add(item any) {
}
bl.logCount += newLogsCount
ld.ResourceLogs().MoveAndAppendTo(bl.logData.ResourceLogs())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func (pc *panicConsumer) ConsumeTraces(ctx context.Context, td ptrace.Traces) er
panic("testing panic")
return nil
}
func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, td pmetric.Metrics) error {
func (pc *panicConsumer) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
panic("testing panic")
return nil
}
func (pc *panicConsumer) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
panic("testing panic")
return nil
}
Expand Down Expand Up @@ -166,13 +170,13 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
require.NoError(t, bp.Shutdown(context.Background()))
}

func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
func TestBatchProcessorLogsPanicRecover(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bp, err := newBatchMetricsProcessor(creationSet, &panicConsumer{}, cfg, true)
bp, err := newBatchLogsProcessor(creationSet, &panicConsumer{}, cfg, true)

require.NoError(t, err)
require.NoError(t, bp.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -190,7 +194,7 @@ func TestBatchProcessorMetricsPanicRecover(t *testing.T) {
ld.ResourceLogs().At(0).CopyTo(sentResourceLogs.AppendEmpty())
wg.Add(1)
go func() {
err = batcher.ConsumeLogs(context.Background(), ld)
err = bp.ConsumeLogs(context.Background(), ld)
assert.Contains(t, err.Error(), "testing panic")
wg.Done()
}()
Expand Down Expand Up @@ -1343,4 +1347,4 @@ func TestBatchSplitOnly(t *testing.T) {
for _, ld := range receivedMds {
require.Equal(t, maxBatch, ld.LogRecordCount())
}
}
}