Skip to content

Commit

Permalink
Fanout consumer does not need to mutate in some cases
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Dec 11, 2023
1 parent 9b6a18b commit e2ec348
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 9 deletions.
3 changes: 2 additions & 1 deletion internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type logsConsumer struct {
}

func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0}
// If all consumers are mutating, then the original data will be passed to one of them.
return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0 && len(lsc.readonly) == 0}
}

// ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one.
Expand Down
4 changes: 2 additions & 2 deletions internal/fanoutconsumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.True(t, lfc.Capabilities().MutatesData)
assert.False(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.LogsSink)

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.True(t, lfc.Capabilities().MutatesData)
assert.False(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down
3 changes: 2 additions & 1 deletion internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type metricsConsumer struct {
}

func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: len(msc.mutable) > 0}
// If all consumers are mutating, then the original data will be passed to one of them.
return consumer.Capabilities{MutatesData: len(msc.mutable) > 0 && len(msc.readonly) == 0}
}

// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
Expand Down
4 changes: 2 additions & 2 deletions internal/fanoutconsumer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestMetricsMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.True(t, mfc.Capabilities().MutatesData)
assert.False(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.MetricsSink)

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.True(t, mfc.Capabilities().MutatesData)
assert.False(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down
3 changes: 2 additions & 1 deletion internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type tracesConsumer struct {
}

func (tsc *tracesConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0}
// If all consumers are mutating, then the original data will be passed to one of them.
return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0 && len(tsc.readonly) == 0}
}

// ConsumeTraces exports the ptrace.Traces to all consumers wrapped by the current one.
Expand Down
4 changes: 2 additions & 2 deletions internal/fanoutconsumer/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestTracesMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.True(t, tfc.Capabilities().MutatesData)
assert.False(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestTracesMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.TracesSink)

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.True(t, tfc.Capabilities().MutatesData)
assert.False(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down

0 comments on commit e2ec348

Please sign in to comment.