Skip to content

Commit

Permalink
Fanout consumer does not need to mutate in some cases (#9062)
Browse files Browse the repository at this point in the history
Follow up to
#9053.

@dmitryax pointed out
[here](#9053 (comment))
that the fanout consumer will pass original data to a non-mutating
consumer if any is available. This PR incorporates that point and
updates test expectations accordingly.
  • Loading branch information
djaglowski authored Dec 11, 2023
1 parent 9b6a18b commit 7ec38e5
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 14 deletions.
3 changes: 2 additions & 1 deletion internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type logsConsumer struct {
}

func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0}
// If all consumers are mutating, then the original data will be passed to one of them.
return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0 && len(lsc.readonly) == 0}
}

// ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one.
Expand Down
4 changes: 2 additions & 2 deletions internal/fanoutconsumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.True(t, lfc.Capabilities().MutatesData)
assert.False(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.LogsSink)

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.True(t, lfc.Capabilities().MutatesData)
assert.False(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down
3 changes: 2 additions & 1 deletion internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type metricsConsumer struct {
}

func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: len(msc.mutable) > 0}
// If all consumers are mutating, then the original data will be passed to one of them.
return consumer.Capabilities{MutatesData: len(msc.mutable) > 0 && len(msc.readonly) == 0}
}

// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
Expand Down
4 changes: 2 additions & 2 deletions internal/fanoutconsumer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestMetricsMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.True(t, mfc.Capabilities().MutatesData)
assert.False(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.MetricsSink)

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.True(t, mfc.Capabilities().MutatesData)
assert.False(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down
3 changes: 2 additions & 1 deletion internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type tracesConsumer struct {
}

func (tsc *tracesConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0}
// If all consumers are mutating, then the original data will be passed to one of them.
return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0 && len(tsc.readonly) == 0}
}

// ConsumeTraces exports the ptrace.Traces to all consumers wrapped by the current one.
Expand Down
4 changes: 2 additions & 2 deletions internal/fanoutconsumer/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestTracesMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.True(t, tfc.Capabilities().MutatesData)
assert.False(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -187,7 +187,7 @@ func TestTracesMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.TracesSink)

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.True(t, tfc.Capabilities().MutatesData)
assert.False(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down
10 changes: 5 additions & 5 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,17 +573,17 @@ func TestConnectorPipelinesGraph(t *testing.T) {
pipelineConfigs: pipelines.Config{
component.NewIDWithName("traces", "in"): {
Receivers: []component.ID{component.NewID("examplereceiver")},
Processors: []component.ID{component.NewID("exampleprocessor")},
Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")},
Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")},
},
component.NewIDWithName("metrics", "in"): {
Receivers: []component.ID{component.NewID("examplereceiver")},
Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector
Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")},
Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")},
},
component.NewIDWithName("logs", "in"): {
Receivers: []component.ID{component.NewID("examplereceiver")},
Processors: []component.ID{component.NewID("exampleprocessor")},
Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")},
Exporters: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")},
},
component.NewIDWithName("traces", "out"): {
Expand All @@ -593,12 +593,12 @@ func TestConnectorPipelinesGraph(t *testing.T) {
},
component.NewIDWithName("metrics", "out"): {
Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")},
Processors: []component.ID{component.NewID("exampleprocessor")},
Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector
Exporters: []component.ID{component.NewID("exampleexporter")},
},
component.NewIDWithName("logs", "out"): {
Receivers: []component.ID{component.NewIDWithName("exampleconnector", "inherit_mutate")},
Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")},
Processors: []component.ID{component.NewIDWithName("exampleprocessor", "mutate")}, // mutate propagates upstream to connector
Exporters: []component.ID{component.NewID("exampleexporter")},
},
},
Expand Down

0 comments on commit 7ec38e5

Please sign in to comment.