Skip to content

Commit

Permalink
Fix bug where MutatesData would not correctly propogate through conne…
Browse files Browse the repository at this point in the history
…ctors (#9053)

This fixes two closely related problems. 
1. While fanoutconsumers do not themselves mutate data, they should
expose whether or not they are handing data off to consumers which may
do so. Otherwise, the service cannot correctly determine how to fan out
after a receiver. e.g. a receiver shared between two pipelines, one of
which contains an exporter or connector which mutates data.
2. Connectors can themselves mutate data but we were not taking this
into account when building the graph.
  • Loading branch information
djaglowski authored Dec 9, 2023
1 parent b79a7c9 commit 7c58e71
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 60 deletions.
25 changes: 25 additions & 0 deletions .chloggen/connectors-propogate-mutates-data.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix bug where MutatesData would not correctly propagate through connectors.

# One or more tracking issues or pull requests related to the change
issues: [9053]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
2 changes: 1 addition & 1 deletion internal/fanoutconsumer/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type logsConsumer struct {
}

func (lsc *logsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
return consumer.Capabilities{MutatesData: len(lsc.mutable) > 0}
}

// ConsumeLogs exports the plog.Logs to all consumers wrapped by the current one.
Expand Down
14 changes: 10 additions & 4 deletions internal/fanoutconsumer/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func TestLogsNotMultiplexing(t *testing.T) {
assert.Same(t, nop, lfc)
}

func TestLogsNotMultiplexingMutating(t *testing.T) {
p := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}
lfc := NewLogs([]consumer.Logs{p})
assert.True(t, lfc.Capabilities().MutatesData)
}

func TestLogsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.LogsSink)
p2 := new(consumertest.LogsSink)
Expand Down Expand Up @@ -68,7 +74,7 @@ func TestLogsMultiplexingMutating(t *testing.T) {
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
assert.True(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -105,7 +111,7 @@ func TestReadOnlyLogsMultiplexingMutating(t *testing.T) {
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
assert.True(t, lfc.Capabilities().MutatesData)
ldOrig := testdata.GenerateLogs(1)
ld := testdata.GenerateLogs(1)
ld.MarkReadOnly()
Expand Down Expand Up @@ -142,7 +148,7 @@ func TestLogsMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingLogsSink{LogsSink: new(consumertest.LogsSink)}

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
assert.True(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -180,7 +186,7 @@ func TestLogsMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.LogsSink)

lfc := NewLogs([]consumer.Logs{p1, p2, p3})
assert.False(t, lfc.Capabilities().MutatesData)
assert.True(t, lfc.Capabilities().MutatesData)
ld := testdata.GenerateLogs(1)

for i := 0; i < 2; i++ {
Expand Down
2 changes: 1 addition & 1 deletion internal/fanoutconsumer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type metricsConsumer struct {
}

func (msc *metricsConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
return consumer.Capabilities{MutatesData: len(msc.mutable) > 0}
}

// ConsumeMetrics exports the pmetric.Metrics to all consumers wrapped by the current one.
Expand Down
14 changes: 10 additions & 4 deletions internal/fanoutconsumer/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func TestMetricsNotMultiplexing(t *testing.T) {
assert.Same(t, nop, mfc)
}

func TestMetricssNotMultiplexingMutating(t *testing.T) {
p := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}
lfc := NewMetrics([]consumer.Metrics{p})
assert.True(t, lfc.Capabilities().MutatesData)
}

func TestMetricsMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.MetricsSink)
p2 := new(consumertest.MetricsSink)
Expand Down Expand Up @@ -68,7 +74,7 @@ func TestMetricsMultiplexingMutating(t *testing.T) {
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
assert.True(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -105,7 +111,7 @@ func TestReadOnlyMetricsMultiplexingMixFirstMutating(t *testing.T) {
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
assert.True(t, mfc.Capabilities().MutatesData)
mdOrig := testdata.GenerateMetrics(1)
md := testdata.GenerateMetrics(1)
md.MarkReadOnly()
Expand Down Expand Up @@ -142,7 +148,7 @@ func TestMetricsMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingMetricsSink{MetricsSink: new(consumertest.MetricsSink)}

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
assert.True(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -180,7 +186,7 @@ func TestMetricsMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.MetricsSink)

mfc := NewMetrics([]consumer.Metrics{p1, p2, p3})
assert.False(t, mfc.Capabilities().MutatesData)
assert.True(t, mfc.Capabilities().MutatesData)
md := testdata.GenerateMetrics(1)

for i := 0; i < 2; i++ {
Expand Down
2 changes: 1 addition & 1 deletion internal/fanoutconsumer/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type tracesConsumer struct {
}

func (tsc *tracesConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
return consumer.Capabilities{MutatesData: len(tsc.mutable) > 0}
}

// ConsumeTraces exports the ptrace.Traces to all consumers wrapped by the current one.
Expand Down
14 changes: 10 additions & 4 deletions internal/fanoutconsumer/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func TestTracesNotMultiplexing(t *testing.T) {
assert.Same(t, nop, tfc)
}

func TestTracesNotMultiplexingMutating(t *testing.T) {
p := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}
lfc := NewTraces([]consumer.Traces{p})
assert.True(t, lfc.Capabilities().MutatesData)
}

func TestTracesMultiplexingNonMutating(t *testing.T) {
p1 := new(consumertest.TracesSink)
p2 := new(consumertest.TracesSink)
Expand Down Expand Up @@ -68,7 +74,7 @@ func TestTracesMultiplexingMutating(t *testing.T) {
p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.False(t, tfc.Capabilities().MutatesData)
assert.True(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -105,7 +111,7 @@ func TestReadOnlyTracesMultiplexingMutating(t *testing.T) {
p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.False(t, tfc.Capabilities().MutatesData)
assert.True(t, tfc.Capabilities().MutatesData)

tdOrig := testdata.GenerateTraces(1)
td := testdata.GenerateTraces(1)
Expand Down Expand Up @@ -143,7 +149,7 @@ func TestTracesMultiplexingMixLastMutating(t *testing.T) {
p3 := &mutatingTracesSink{TracesSink: new(consumertest.TracesSink)}

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.False(t, tfc.Capabilities().MutatesData)
assert.True(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down Expand Up @@ -181,7 +187,7 @@ func TestTracesMultiplexingMixLastNonMutating(t *testing.T) {
p3 := new(consumertest.TracesSink)

tfc := NewTraces([]consumer.Traces{p1, p2, p3})
assert.False(t, tfc.Capabilities().MutatesData)
assert.True(t, tfc.Capabilities().MutatesData)
td := testdata.GenerateTraces(1)

for i := 0; i < 2; i++ {
Expand Down
6 changes: 4 additions & 2 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,10 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
case *connectorNode:
err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID()))
case *capabilitiesNode:
capability := consumer.Capabilities{MutatesData: false}
capability := consumer.Capabilities{
// The fanOutNode represents the aggregate capabilities of the exporters in the pipeline.
MutatesData: g.pipelines[n.pipelineID].fanOutNode.getConsumer().Capabilities().MutatesData,
}
for _, proc := range g.pipelines[n.pipelineID].processors {
capability.MutatesData = capability.MutatesData || proc.getConsumer().Capabilities().MutatesData
}
Expand Down Expand Up @@ -319,7 +322,6 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error {
case component.DataTypeMetrics:
consumers := make([]consumer.Metrics, 0, len(nexts))
for _, next := range nexts {

consumers = append(consumers, next.(consumer.Metrics))
}
n.baseConsumer = fanoutconsumer.NewMetrics(consumers)
Expand Down
Loading

0 comments on commit 7c58e71

Please sign in to comment.